Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-03-30 17:24:55 -0700 (Fri, 30 Mar 2012) $'
10
 * '$Revision: 7108 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.Collection;
32
import java.util.Date;
33
import java.util.List;
34
import java.util.concurrent.locks.Lock;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.service.exceptions.InvalidSystemMetadata;
38
import org.dataone.service.types.v1.Identifier;
39
import org.dataone.service.types.v1.Node;
40
import org.dataone.service.types.v1.NodeReference;
41
import org.dataone.service.types.v1.SystemMetadata;
42

    
43
import com.hazelcast.config.Config;
44
import com.hazelcast.config.FileSystemXmlConfig;
45
import com.hazelcast.core.EntryEvent;
46
import com.hazelcast.core.EntryListener;
47
import com.hazelcast.core.Hazelcast;
48
import com.hazelcast.core.HazelcastInstance;
49
import com.hazelcast.core.IMap;
50
import com.hazelcast.core.ISet;
51
import com.hazelcast.core.LifecycleEvent;
52
import com.hazelcast.core.LifecycleListener;
53
import com.hazelcast.core.Member;
54
import com.hazelcast.core.MembershipEvent;
55
import com.hazelcast.core.MembershipListener;
56
import com.hazelcast.partition.Partition;
57
import com.hazelcast.partition.PartitionService;
58
import com.hazelcast.query.EntryObject;
59
import com.hazelcast.query.Predicate;
60
import com.hazelcast.query.PredicateBuilder;
61

    
62
import edu.ucsb.nceas.metacat.IdentifierManager;
63
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
64
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
65
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.shared.BaseService;
67
import edu.ucsb.nceas.metacat.shared.ServiceException;
68
import edu.ucsb.nceas.metacat.util.DocumentUtil;
69
import edu.ucsb.nceas.utilities.FileUtil;
70
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
71
/**
72
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
73
 */
74
public class HazelcastService extends BaseService
75
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener {
76
  
77
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
78

    
79
/* The instance of the logging class */
80
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
81
  
82
  /* The singleton instance of the hazelcast service */
83
  private static HazelcastService hzService = null;
84
  
85
  /* The Hazelcast configuration */
86
  private Config hzConfig;
87
  
88
  /* The instance of the Hazelcast client */
89
//  private HazelcastClient hzClient;
90

    
91
  /* The name of the DataONE Hazelcast cluster group */
92
  private String groupName;
93

    
94
  /* The name of the DataONE Hazelcast cluster password */
95
  private String groupPassword;
96
  
97
  /* The name of the DataONE Hazelcast cluster IP addresses */
98
  private String addressList;
99
  
100
  /* The name of the node map */
101
  private String nodeMap;
102

    
103
  /* The name of the system metadata map */
104
  private String systemMetadataMap;
105
  
106
  /* The Hazelcast distributed task id generator namespace */
107
  private String taskIds;
108
  
109
  /* The Hazelcast distributed node map */
110
  private IMap<NodeReference, Node> nodes;
111

    
112
  /* The Hazelcast distributed system metadata map */
113
  private IMap<Identifier, SystemMetadata> systemMetadata;
114
  
115
  /* The name of the identifiers set */
116
  private String identifiersSet;
117
  
118
  /* The Hazelcast distributed identifiers set */
119
  private ISet<Identifier> identifiers;
120

    
121
  private HazelcastInstance hzInstance;
122
      
123
  /*
124
   * Constructor: Creates an instance of the hazelcast service. Since
125
   * this uses a singleton pattern, use getInstance() to gain the instance.
126
   */
127
  private HazelcastService() {
128
    
129
    super();
130
    _serviceName="HazelcastService";
131
    
132
    try {
133
      init();
134
      
135
    } catch (ServiceException se) {
136
      logMetacat.error("There was a problem creating the HazelcastService. " +
137
                       "The error message was: " + se.getMessage());
138
      
139
    }
140
    
141
  }
142
  
143
  /**
144
   *  Get the instance of the HazelcastService that has been instantiated,
145
   *  or instantiate one if it has not been already.
146
   *
147
   * @return hazelcastService - The instance of the hazelcast service
148
   */
149
  public static HazelcastService getInstance(){
150
    
151
    if ( hzService == null ) {
152
      
153
      hzService = new HazelcastService();
154
      
155
    }
156
    return hzService;
157
  }
158
  
159
  /**
160
   * Initializes the Hazelcast service
161
   */
162
  public void init() throws ServiceException {
163
    
164
    logMetacat.debug("HazelcastService.init() called.");
165
    
166
	String configFileName = null;
167
	Config config = null;
168
	try {
169
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
170
		config = new FileSystemXmlConfig(configFileName);
171
	} catch (Exception e) {
172
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
173
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
174
		// make sure we have the config
175
		try {
176
			config = new FileSystemXmlConfig(configFileName);
177
		} catch (FileNotFoundException e1) {
178
			String msg = e.getMessage();
179
			logMetacat.error(msg);
180
			throw new ServiceException(msg);
181
		}
182
	}
183

    
184
	Hazelcast.init(config);
185
  this.hzInstance = Hazelcast.getDefaultInstance();
186
  
187
    // Get configuration properties on instantiation
188
    try {
189
      groupName = 
190
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
191
      groupPassword = 
192
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
193
      addressList = 
194
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
195
      systemMetadataMap = 
196
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
197
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
198
//    nodeMap = 
199
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
200
      // Become a DataONE-process cluster client
201
//      String[] addresses = addressList.split(",");
202
//      hzClient = 
203
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
204
//      nodes = hzClient.getMap(nodeMap);
205
      
206
      // Get a reference to the shared system metadata map as a cluster member
207
      // NOTE: this loads the map from the backing store and can take a long time for large collections
208
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
209
      
210
      // Get a reference to the shared identifiers set as a cluster member
211
      identifiers = Hazelcast.getSet(identifiersSet);
212
      
213
      // Listen for changes to the system metadata map
214
      systemMetadata.addEntryListener(this, true);
215
      
216
      // Listen for members added/removed
217
      hzInstance.getCluster().addMembershipListener(this);
218
      
219
      // Listen for lifecycle state changes
220
      hzInstance.getLifecycleService().addLifecycleListener(this);
221
      
222
    } catch (PropertyNotFoundException e) {
223

    
224
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
225
        "The error message was: " + e.getMessage();
226
      logMetacat.error(msg);
227
      
228
    }
229
    
230
    // make sure we have all metadata locally
231
    try {
232
	    // add index for resynch() method
233
    	// can only be added once, TODO: figure out how this works
234
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
235
		//resynch();
236
	} catch (Exception e) {
237
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
238
		logMetacat.error(msg, e);
239
	}
240
        
241
  }
242
  
243
  /**
244
   * Get the system metadata map
245
   * 
246
   * @return systemMetadata - the hazelcast map of system metadata
247
   * @param identifier - the identifier of the object as a string
248
   */
249
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
250
	  return systemMetadata;
251
  }
252
  
253
  /**
254
   * Get the identifiers set
255
   * @return identifiers - the set of unique DataONE identifiers in the cluster
256
   */
257
  public ISet<Identifier> getIdentifiers() {
258
      return identifiers;
259
      
260
  }
261

    
262
  /**
263
   * When Metacat changes the underlying store, we need to refresh the
264
   * in-memory representation of it.
265
   * @param guid
266
   */
267
  public void refreshSystemMetadataEntry(String guid) {
268
	Identifier identifier = new Identifier();
269
	identifier.setValue(guid);
270
	// force hazelcast to update system metadata in memory from the store
271
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
272
	
273
  }
274

    
275
  public Lock getLock(String identifier) {
276
    
277
    Lock lock = null;
278
    
279
    try {
280
        lock = getInstance().getHazelcastInstance().getLock(identifier);
281
        
282
    } catch (RuntimeException e) {
283
        logMetacat.info("Couldn't get a lock for identifier " + 
284
            identifier + " !!");
285
    }
286
    return lock;
287
      
288
  }
289
  
290
  /**
291
   * Get the DataONE hazelcast node map
292
   * @return nodes - the hazelcast map of nodes
293
   */
294
//  public IMap<NodeReference, Node> getNodesMap() {
295
//	  return nodes;
296
//  }
297
  
298
  /**
299
   * Indicate whether or not this service is refreshable.
300
   *
301
   * @return refreshable - the boolean refreshable status
302
   */
303
  public boolean refreshable() {
304
    // TODO: Determine the consequences of restarting the Hazelcast instance
305
    // Set this to true if it's okay to drop from the cluster, lose the maps,
306
    // and start back up again
307
    return false;
308
    
309
  }
310
  
311
  /**
312
   * Stop the HazelcastService. When stopped, the service will no longer
313
   * respond to requests.
314
   */
315
  public void stop() throws ServiceException {
316
    
317
    Hazelcast.getLifecycleService().shutdown();
318
    
319
  }
320

    
321
  public HazelcastInstance getHazelcastInstance() {
322
      return this.hzInstance;
323
      
324
  }
325
  
326
  /**
327
   * Refresh the Hazelcast service by restarting it
328
   */
329
  @Override
330
  protected void doRefresh() throws ServiceException {
331

    
332
    // TODO: verify that the correct config file is still used
333
    Hazelcast.getLifecycleService().restart();
334
    
335
  }
336
  
337
  /**
338
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
339
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
340
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
341
	 * 
342
	 * @param event - The EntryEvent that occurred
343
	 */
344
	@Override
345
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
346
	  
347
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
348
	      event.getKey().getValue());
349
		// handle as update - that method will create if necessary
350
		entryUpdated(event);
351

    
352
	}
353

    
354
	/**
355
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
356
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
357
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
358
	 * 
359
	 * @param event - The EntryEvent that occurred
360
	 */
361
	@Override
362
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
363

    
364
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
365
          event.getKey().getValue());
366
      
367
	    // ensure identifiers are listed in the hzIdentifiers set
368
      if ( !identifiers.contains(event.getKey()) ) {
369
          identifiers.add(event.getKey());
370
      }
371
	  
372
	}
373
	
374
	/**
375
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
376
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
377
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
378
	 * 
379
	 * @param event - The EntryEvent that occurred
380
	 */
381
	@Override
382
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
383
		
384
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
385
        event.getKey().getValue());
386

    
387
	  // we typically don't remove objects in Metacat, but can remove System Metadata
388
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
389

    
390
    // keep the hzIdentifiers set in sync with the systemmetadata table
391
    if ( identifiers.contains(event.getKey()) ) {
392
        identifiers.remove(event.getKey());
393
        
394
    }
395

    
396
	}
397
	
398
	/**
399
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
400
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
401
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
402
	 * 
403
	 * @param event - The EntryEvent that occurred
404
	 */
405
	@Override
406
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
407
	
408
			logMetacat.debug("Entry added/updated to System Metadata map: " + 
409
			    event.getKey().getValue());
410
			PartitionService partitionService = Hazelcast.getPartitionService();
411
			Partition partition = partitionService.getPartition(event.getKey());
412
			Member ownerMember = partition.getOwner();
413
			if (!ownerMember.localMember()) {
414
				// need to pull the entry into the local store
415
				saveLocally(event.getValue());
416
			}
417
	
418
			// TODO evaluate the type of system metadata change, decide if it
419
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
420
			// iteratively lock the PID, create and submit the tasks, and expect a
421
			// result back. Deal with exceptions.
422
			SystemMetadata sysmeta = event.getValue();
423
			if (sysmeta != null) {
424
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
425
				// TODO: do we need to do anything explicit here?
426
			}
427

    
428
	     // ensure identifiers are listed in the hzIdentifiers set
429
      if ( !identifiers.contains(event.getKey()) ) {
430
          identifiers.add(event.getKey());
431
      }
432

    
433
	}
434
	
435
	/**
436
	 * Save SystemMetadata to local store if needed
437
	 * @param sm
438
	 */
439
	private void saveLocally(SystemMetadata sm) {
440
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
441
		try {
442
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
443
				IdentifierManager.getInstance().insertSystemMetadata(sm);
444
				
445
			} else {
446
				IdentifierManager.getInstance().updateSystemMetadata(sm);
447
				
448
			}
449
		} catch (McdbDocNotFoundException e) {
450
			logMetacat.error("Could not save System Metadata to local store.", e);
451
			
452
		} catch (SQLException e) {
453
	      logMetacat.error("Could not save System Metadata to local store.", e);
454
	      
455
	    } catch (InvalidSystemMetadata e) {
456
	        logMetacat.error("Could not save System Metadata to local store.", e);
457
	        
458
	    }
459
	}
460
	
461
	/**
462
	 * Checks the local backing store for missing SystemMetadata,
463
	 * retrieves those entries from the shared map if they exist,
464
	 * and saves them locally.
465
	 */
466
	private void synchronizeLocalStore() {
467
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true);
468
		if (localIds != null) {
469
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
470
			for (String localId: localIds) {
471
				logMetacat.debug("Processing system metadata for localId: " + localId);
472
				try {
473
					String docid = DocumentUtil.getSmartDocId(localId);
474
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
475
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
476
					logMetacat.debug("Found mapped guid: " + guid);
477
					Identifier pid = new Identifier();
478
					pid.setValue(guid);
479
					SystemMetadata sm = systemMetadata.get(pid);
480
					logMetacat.debug("Found shared system metadata for guid: " + guid);
481
					saveLocally(sm);
482
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
483
				} catch (Exception e) {
484
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
485
				}
486
			}
487
		}
488
	}
489
	
490
	public void resynch() throws Exception {
491
		
492
		// get the CN that is online
493
		// TODO: do we even need to use a specific CN?
494
		// All the System Metadata records should be available via the shared map
495
//		NodeList allNodes = CNodeService.getInstance().listNodes();
496
//		Node onlineCN = null;
497
//		for (Node node: allNodes.getNodeList()) {
498
//			if (node.getType().equals(NodeType.CN)) {
499
//				if (node.getState().equals(NodeState.UP)) {
500
//					onlineCN = node;
501
//					break;
502
//				}
503
//			}
504
//		}
505
		
506
		// get the list of items that have changed since X
507
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
508
		EntryObject e = new PredicateBuilder().getEntryObject();
509
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
510
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
511
		for (SystemMetadata sm: updatedSystemMetadata) {
512
			saveLocally(sm);
513
		}
514
	}
515

    
516
	/**
517
	 * When there is missing SystemMetadata on the local member,
518
	 * we retrieve it from the shared map and add it to the local
519
	 * backing store for safe keeping.
520
	 */
521
	@Override
522
	public void memberAdded(MembershipEvent event) {
523
		Member member = event.getMember();
524
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
525
		boolean isLocal = member.localMember();
526
		if (isLocal) {
527
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
528
			synchronizeLocalStore();
529
		}
530
	}
531

    
532
	@Override
533
	public void memberRemoved(MembershipEvent event) {
534
		// TODO Auto-generated method stub
535
		
536
	}
537

    
538
	/**
539
	 * In cases where this cluster is paused, we want to 
540
	 * check that the local store accurately reflects the shared 
541
	 * SystemMetadata map
542
	 * @param event
543
	 */
544
	@Override
545
	public void stateChanged(LifecycleEvent event) {
546
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
547
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
548
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
549
			synchronizeLocalStore();
550
		}
551
	}
552

    
553
}
(1-1/3)