Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: jones $'
9
 *     '$Date: 2011-09-20 14:08:06 -0700 (Tue, 20 Sep 2011) $'
10
 * '$Revision: 6471 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.util.Collection;
30
import java.util.Date;
31

    
32
import org.apache.log4j.Logger;
33
import org.dataone.service.types.v1.Identifier;
34
import org.dataone.service.types.v1.Node;
35
import org.dataone.service.types.v1.NodeReference;
36
import org.dataone.service.types.v1.SystemMetadata;
37

    
38
import com.hazelcast.client.HazelcastClient;
39
import com.hazelcast.config.Config;
40
import com.hazelcast.config.FileSystemXmlConfig;
41
import com.hazelcast.core.EntryEvent;
42
import com.hazelcast.core.EntryListener;
43
import com.hazelcast.core.Hazelcast;
44
import com.hazelcast.core.IMap;
45
import com.hazelcast.core.InstanceEvent;
46
import com.hazelcast.core.InstanceListener;
47
import com.hazelcast.core.Member;
48
import com.hazelcast.partition.Partition;
49
import com.hazelcast.partition.PartitionService;
50
import com.hazelcast.query.EntryObject;
51
import com.hazelcast.query.Predicate;
52
import com.hazelcast.query.PredicateBuilder;
53

    
54
import edu.ucsb.nceas.metacat.IdentifierManager;
55
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
56
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
57
import edu.ucsb.nceas.metacat.properties.PropertyService;
58
import edu.ucsb.nceas.metacat.shared.BaseService;
59
import edu.ucsb.nceas.metacat.shared.ServiceException;
60
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
61
/**
62
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
63
 */
64
public class HazelcastService extends BaseService
65
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
66
  
67
  private static final String SINCE_PROPERTY = "dateSysMetaModified";
68

    
69
/* The instance of the logging class */
70
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
71
  
72
  /* The singleton instance of the hazelcast service */
73
  private static HazelcastService hzService = null;
74
  
75
  /* The Hazelcast configuration */
76
  private Config hzConfig;
77
  
78
  /* The instance of the Hazelcast client */
79
  private HazelcastClient hzClient;
80

    
81
  /* The name of the DataONE Hazelcast cluster group */
82
  private String groupName;
83

    
84
  /* The name of the DataONE Hazelcast cluster password */
85
  private String groupPassword;
86
  
87
  /* The name of the DataONE Hazelcast cluster IP addresses */
88
  private String addressList;
89
  
90
  /* The name of the node map */
91
  private String nodeMap;
92

    
93
  /* The name of the system metadata map */
94
  private String systemMetadataMap;
95
  
96
  /* The Hazelcast distributed task id generator namespace */
97
  private String taskIds;
98
  
99
  /* The Hazelcast distributed node map */
100
  private IMap<NodeReference, Node> nodes;
101

    
102
  /* The Hazelcast distributed system metadata map */
103
  private IMap<Identifier, SystemMetadata> systemMetadata;
104
    
105
  /*
106
   * Constructor: Creates an instance of the hazelcast service. Since
107
   * this uses a singleton pattern, use getInstance() to gain the instance.
108
   */
109
  private HazelcastService() {
110
    
111
    super();
112
    _serviceName="HazelcastService";
113
    
114
    try {
115
      init();
116
      
117
    } catch (ServiceException se) {
118
      logMetacat.error("There was a problem creating the HazelcastService. " +
119
                       "The error message was: " + se.getMessage());
120
      
121
    }
122
    
123
  }
124
  
125
  /**
126
   *  Get the instance of the HazelcastService that has been instantiated,
127
   *  or instantiate one if it has not been already.
128
   *
129
   * @return hazelcastService - The instance of the hazelcast service
130
   */
131
  public static HazelcastService getInstance(){
132
    
133
    if ( hzService == null ) {
134
      
135
      hzService = new HazelcastService();
136
      
137
    }
138
    return hzService;
139
  }
140
  
141
  /**
142
   * Initializes the Hazelcast service
143
   */
144
  public void init() throws ServiceException {
145
    
146
    logMetacat.debug("HazelcastService.doRefresh() called.");
147
    
148
    try {
149
    	String configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
150
//    	System.setProperty("hazelcast.config", configFileName);
151
		Config config = new FileSystemXmlConfig(configFileName);
152
		Hazelcast.init(config);
153
    } catch (Exception e) {
154
      String msg = e.getMessage();
155
      logMetacat.error(msg);
156
      throw new ServiceException(msg);
157
    }
158
    
159
    // Get configuration properties on instantiation
160
    try {
161
      groupName = 
162
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
163
      groupPassword = 
164
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
165
      addressList = 
166
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
167
      nodeMap = 
168
        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
169
      systemMetadataMap = 
170
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
171

    
172
      // Become a DataONE-process cluster client
173
      String[] addresses = addressList.split(",");
174
      hzClient = 
175
        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
176
      nodes = hzClient.getMap(nodeMap);
177
      
178
      // Get a reference to the shared system metadata map as a cluster member
179
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
180
      
181
      // Listen for changes to the system metadata map
182
      systemMetadata.addEntryListener(this, true);
183
      
184
    } catch (PropertyNotFoundException e) {
185

    
186
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
187
        "The error message was: " + e.getMessage();
188
      logMetacat.error(msg);
189
      
190
    }
191
    
192
    // make sure we have all metadata locally
193
    try {
194
	    // add index for resynch() method
195
	    systemMetadata.addIndex(SINCE_PROPERTY, true);
196
		resynch();
197
	} catch (Exception e) {
198
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
199
		logMetacat.error(msg, e);
200
	}
201
        
202
  }
203
  
204
  /**
205
   * Get the system metadata map
206
   * 
207
   * @return systemMetadata - the hazelcast map of system metadata
208
   */
209
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
210
	  return systemMetadata;
211
  }
212
  
213
  /**
214
   * Get the DataONE hazelcast node map
215
   * @return nodes - the hazelcast map of nodes
216
   */
217
  public IMap<NodeReference, Node> getNodesMap() {
218
	  return nodes;
219
  }
220
  
221
  /**
222
   * Indicate whether or not this service is refreshable.
223
   *
224
   * @return refreshable - the boolean refreshable status
225
   */
226
  public boolean refreshable() {
227
    // TODO: Determine the consequences of restarting the Hazelcast instance
228
    // Set this to true if it's okay to drop from the cluster, lose the maps,
229
    // and start back up again
230
    return false;
231
    
232
  }
233
  
234
  /**
235
   * Stop the HazelcastService. When stopped, the service will no longer
236
   * respond to requests.
237
   */
238
  public void stop() throws ServiceException {
239
    
240
    Hazelcast.getLifecycleService().shutdown();
241
    
242
  }
243

    
244
  /**
245
   * Listen for new Hazelcast member events
246
   */
247
  @Override
248
  public void instanceCreated(InstanceEvent event) {
249
    logMetacat.info("New Hazelcast instance created: " +
250
      event.getInstance().getId() + ", " +
251
      event.getInstance().getInstanceType());
252
    
253
  }
254

    
255
  @Override
256
  public void instanceDestroyed(InstanceEvent event) {
257
    logMetacat.info("Hazelcast instance removed: " +
258
        event.getInstance().getId() + ", " +
259
        event.getInstance().getInstanceType());
260
    
261
  }
262
  
263
  /**
264
   * Refresh the Hazelcast service by restarting it
265
   */
266
  @Override
267
  protected void doRefresh() throws ServiceException {
268

    
269
    // TODO: verify that the correct config file is still used
270
    Hazelcast.getLifecycleService().restart();
271
    
272
  }
273
  
274
  /**
275
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
276
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
277
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
278
	 * 
279
	 * @param event - The EntryEvent that occurred
280
	 */
281
	@Override
282
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
283
		// handle as update - that method will create if necessary
284
		entryUpdated(event);
285
	}
286

    
287
	/**
288
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
289
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
290
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
291
	 * 
292
	 * @param event - The EntryEvent that occurred
293
	 */
294
	@Override
295
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
296
	  // nothing to do, entries are still in the backing store
297
	  
298
	}
299
	
300
	/**
301
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
302
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
303
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
304
	 * 
305
	 * @param event - The EntryEvent that occurred
306
	 */
307
	@Override
308
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
309
	  // we don't remove objects
310
	  
311
	}
312
	
313
	/**
314
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
315
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
316
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
317
	 * 
318
	 * @param event - The EntryEvent that occurred
319
	 */
320
	@Override
321
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
322
	
323
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
324
			PartitionService partitionService = Hazelcast.getPartitionService();
325
			Partition partition = partitionService.getPartition(event.getKey());
326
			Member ownerMember = partition.getOwner();
327
			if (!ownerMember.localMember()) {
328
				// need to pull the entry into the local store
329
				saveLocally(event.getValue());
330
			}
331
	
332
			// TODO evaluate the type of system metadata change, decide if it
333
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
334
			// iteratively lock the PID, create and submit the tasks, and expect a
335
			// result back. Deal with exceptions.
336
			boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
337
			// TODO: do we need to do anything explicit here?
338
	  
339
	}
340
	
341
	/**
342
	 * Save SystemMetadata to local store if needed
343
	 * @param sm
344
	 */
345
	private void saveLocally(SystemMetadata sm) {
346
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
347
		try {
348
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
349
				IdentifierManager.getInstance().createSystemMetadata(sm);
350
			} else {
351
				IdentifierManager.getInstance().updateSystemMetadata(sm);
352
			}
353
		} catch (McdbDocNotFoundException e) {
354
			logMetacat.error(
355
					"Could not save System Metadata to local store.", e);
356
		}
357
	}
358
	
359
	public void resynch() throws Exception {
360
		
361
		// get the CN that is online
362
		// TODO: do we even need to use a specific CN?
363
		// All the System Metadata records should be available via the shared map
364
//		NodeList allNodes = CNodeService.getInstance().listNodes();
365
//		Node onlineCN = null;
366
//		for (Node node: allNodes.getNodeList()) {
367
//			if (node.getType().equals(NodeType.CN)) {
368
//				if (node.getState().equals(NodeState.UP)) {
369
//					onlineCN = node;
370
//					break;
371
//				}
372
//			}
373
//		}
374
		
375
		// get the list of items that have changed since X
376
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
377
		EntryObject e = new PredicateBuilder().getEntryObject();
378
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
379
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
380
		for (SystemMetadata sm: updatedSystemMetadata) {
381
			saveLocally(sm);
382
		}
383
	}
384

    
385
}
(1-1/3)