Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-05-23 16:41:39 -0700 (Wed, 23 May 2012) $'
10
 * '$Revision: 7188 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.ArrayList;
32
import java.util.Collection;
33
import java.util.Date;
34
import java.util.HashSet;
35
import java.util.List;
36
import java.util.Set;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.Node;
43
import org.dataone.service.types.v1.NodeReference;
44
import org.dataone.service.types.v1.ObjectInfo;
45
import org.dataone.service.types.v1.ObjectList;
46
import org.dataone.service.types.v1.SystemMetadata;
47

    
48
import com.hazelcast.config.Config;
49
import com.hazelcast.config.FileSystemXmlConfig;
50
import com.hazelcast.core.EntryEvent;
51
import com.hazelcast.core.EntryListener;
52
import com.hazelcast.core.Hazelcast;
53
import com.hazelcast.core.HazelcastInstance;
54
import com.hazelcast.core.IMap;
55
import com.hazelcast.core.ISet;
56
import com.hazelcast.core.LifecycleEvent;
57
import com.hazelcast.core.LifecycleListener;
58
import com.hazelcast.core.Member;
59
import com.hazelcast.core.MembershipEvent;
60
import com.hazelcast.core.MembershipListener;
61
import com.hazelcast.partition.Partition;
62
import com.hazelcast.partition.PartitionService;
63
import com.hazelcast.query.EntryObject;
64
import com.hazelcast.query.Predicate;
65
import com.hazelcast.query.PredicateBuilder;
66

    
67
import edu.ucsb.nceas.metacat.IdentifierManager;
68
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
69
import edu.ucsb.nceas.metacat.properties.PropertyService;
70
import edu.ucsb.nceas.metacat.shared.BaseService;
71
import edu.ucsb.nceas.metacat.shared.ServiceException;
72
import edu.ucsb.nceas.metacat.util.DocumentUtil;
73
import edu.ucsb.nceas.utilities.FileUtil;
74
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
75
/**
76
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
77
 */
78
public class HazelcastService extends BaseService
79
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener {
80
  
81
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
82

    
83
/* The instance of the logging class */
84
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
85
  
86
  /* The singleton instance of the hazelcast service */
87
  private static HazelcastService hzService = null;
88
  
89
  /* The Hazelcast configuration */
90
  private Config hzConfig;
91
  
92
  /* The instance of the Hazelcast client */
93
//  private HazelcastClient hzClient;
94

    
95
  /* The name of the DataONE Hazelcast cluster group */
96
  private String groupName;
97

    
98
  /* The name of the DataONE Hazelcast cluster password */
99
  private String groupPassword;
100
  
101
  /* The name of the DataONE Hazelcast cluster IP addresses */
102
  private String addressList;
103
  
104
  /* The name of the node map */
105
  private String nodeMap;
106

    
107
  /* The name of the system metadata map */
108
  private String systemMetadataMap;
109
  
110
  /* The Hazelcast distributed task id generator namespace */
111
  private String taskIds;
112
  
113
  /* The Hazelcast distributed node map */
114
  private IMap<NodeReference, Node> nodes;
115

    
116
  /* The Hazelcast distributed system metadata map */
117
  private IMap<Identifier, SystemMetadata> systemMetadata;
118
  
119
  /* The name of the identifiers set */
120
  private String identifiersSet;
121
  
122
  /* The Hazelcast distributed identifiers set */
123
  private ISet<Identifier> identifiers;
124

    
125
  private HazelcastInstance hzInstance;
126
      
127
  /*
128
   * Constructor: Creates an instance of the hazelcast service. Since
129
   * this uses a singleton pattern, use getInstance() to gain the instance.
130
   */
131
  private HazelcastService() {
132
    
133
    super();
134
    _serviceName="HazelcastService";
135
    
136
    try {
137
      init();
138
      
139
    } catch (ServiceException se) {
140
      logMetacat.error("There was a problem creating the HazelcastService. " +
141
                       "The error message was: " + se.getMessage());
142
      
143
    }
144
    
145
  }
146
  
147
  /**
148
   *  Get the instance of the HazelcastService that has been instantiated,
149
   *  or instantiate one if it has not been already.
150
   *
151
   * @return hazelcastService - The instance of the hazelcast service
152
   */
153
  public static HazelcastService getInstance(){
154
    
155
    if ( hzService == null ) {
156
      
157
      hzService = new HazelcastService();
158
      
159
    }
160
    return hzService;
161
  }
162
  
163
  /**
164
   * Initializes the Hazelcast service
165
   */
166
  public void init() throws ServiceException {
167
    
168
    logMetacat.debug("HazelcastService.init() called.");
169
    
170
	String configFileName = null;
171
	Config config = null;
172
	try {
173
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
174
		config = new FileSystemXmlConfig(configFileName);
175
	} catch (Exception e) {
176
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
177
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
178
		// make sure we have the config
179
		try {
180
			config = new FileSystemXmlConfig(configFileName);
181
		} catch (FileNotFoundException e1) {
182
			String msg = e.getMessage();
183
			logMetacat.error(msg);
184
			throw new ServiceException(msg);
185
		}
186
	}
187

    
188
	Hazelcast.init(config);
189
  this.hzInstance = Hazelcast.getDefaultInstance();
190
  
191
    // Get configuration properties on instantiation
192
    try {
193
      groupName = 
194
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
195
      groupPassword = 
196
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
197
      addressList = 
198
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
199
      systemMetadataMap = 
200
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
201
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
202
//    nodeMap = 
203
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
204
      // Become a DataONE-process cluster client
205
//      String[] addresses = addressList.split(",");
206
//      hzClient = 
207
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
208
//      nodes = hzClient.getMap(nodeMap);
209
      
210
      // Get a reference to the shared system metadata map as a cluster member
211
      // NOTE: this loads the map from the backing store and can take a long time for large collections
212
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
213
      
214
      // Get a reference to the shared identifiers set as a cluster member
215
      identifiers = Hazelcast.getSet(identifiersSet);
216
      identifiers.addAll(loadAllKeys());
217
      
218
      // Listen for changes to the system metadata map
219
      systemMetadata.addEntryListener(this, true);
220
      
221
      // Listen for members added/removed
222
      hzInstance.getCluster().addMembershipListener(this);
223
      
224
      // Listen for lifecycle state changes
225
      hzInstance.getLifecycleService().addLifecycleListener(this);
226
      
227
    } catch (PropertyNotFoundException e) {
228

    
229
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
230
        "The error message was: " + e.getMessage();
231
      logMetacat.error(msg);
232
      
233
    }
234
    
235
    // make sure we have all metadata locally
236
    try {
237
    	// synch on restart
238
    	// BRL: this can be problematic, commenting out 20120404
239
        //synchronizeLocalStore();
240
	} catch (Exception e) {
241
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
242
		logMetacat.error(msg, e);
243
	}
244
        
245
  }
246
  
247
  /**
248
   * Get the system metadata map
249
   * 
250
   * @return systemMetadata - the hazelcast map of system metadata
251
   * @param identifier - the identifier of the object as a string
252
   */
253
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
254
	  return systemMetadata;
255
  }
256
  
257
  /**
258
   * Get the identifiers set
259
   * @return identifiers - the set of unique DataONE identifiers in the cluster
260
   */
261
  public ISet<Identifier> getIdentifiers() {
262
      return identifiers;
263
      
264
  }
265

    
266
  /**
267
   * When Metacat changes the underlying store, we need to refresh the
268
   * in-memory representation of it.
269
   * @param guid
270
   */
271
  public void refreshSystemMetadataEntry(String guid) {
272
	Identifier identifier = new Identifier();
273
	identifier.setValue(guid);
274
	// force hazelcast to update system metadata in memory from the store
275
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
276
	
277
  }
278

    
279
  public Lock getLock(String identifier) {
280
    
281
    Lock lock = null;
282
    
283
    try {
284
        lock = getInstance().getHazelcastInstance().getLock(identifier);
285
        
286
    } catch (RuntimeException e) {
287
        logMetacat.info("Couldn't get a lock for identifier " + 
288
            identifier + " !!");
289
    }
290
    return lock;
291
      
292
  }
293
  
294
  /**
295
   * Get the DataONE hazelcast node map
296
   * @return nodes - the hazelcast map of nodes
297
   */
298
//  public IMap<NodeReference, Node> getNodesMap() {
299
//	  return nodes;
300
//  }
301
  
302
  /**
303
   * Indicate whether or not this service is refreshable.
304
   *
305
   * @return refreshable - the boolean refreshable status
306
   */
307
  public boolean refreshable() {
308
    // TODO: Determine the consequences of restarting the Hazelcast instance
309
    // Set this to true if it's okay to drop from the cluster, lose the maps,
310
    // and start back up again
311
    return false;
312
    
313
  }
314
  
315
  /**
316
   * Stop the HazelcastService. When stopped, the service will no longer
317
   * respond to requests.
318
   */
319
  public void stop() throws ServiceException {
320
    
321
    Hazelcast.getLifecycleService().shutdown();
322
    
323
  }
324

    
325
  public HazelcastInstance getHazelcastInstance() {
326
      return this.hzInstance;
327
      
328
  }
329
  
330
  /**
331
   * Refresh the Hazelcast service by restarting it
332
   */
333
  @Override
334
  protected void doRefresh() throws ServiceException {
335

    
336
    // TODO: verify that the correct config file is still used
337
    Hazelcast.getLifecycleService().restart();
338
    
339
  }
340
  
341
  /**
342
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
343
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
344
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
345
	 * 
346
	 * @param event - The EntryEvent that occurred
347
	 */
348
	@Override
349
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
350
	  
351
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
352
	      event.getKey().getValue());
353
		// handle as update - that method will create if necessary
354
		entryUpdated(event);
355

    
356
	}
357

    
358
	/**
359
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
360
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
361
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
362
	 * 
363
	 * @param event - The EntryEvent that occurred
364
	 */
365
	@Override
366
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
367

    
368
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
369
          event.getKey().getValue());
370
      
371
	    // ensure identifiers are listed in the hzIdentifiers set
372
      if ( !identifiers.contains(event.getKey()) ) {
373
          identifiers.add(event.getKey());
374
      }
375
	  
376
	}
377
	
378
	/**
379
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
380
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
381
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
382
	 * 
383
	 * @param event - The EntryEvent that occurred
384
	 */
385
	@Override
386
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
387
		
388
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
389
        event.getKey().getValue());
390

    
391
	  // we typically don't remove objects in Metacat, but can remove System Metadata
392
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
393

    
394
    // keep the hzIdentifiers set in sync with the systemmetadata table
395
    if ( identifiers.contains(event.getKey()) ) {
396
        identifiers.remove(event.getKey());
397
        
398
    }
399

    
400
	}
401
	
402
	/**
403
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
404
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
405
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
406
	 * 
407
	 * @param event - The EntryEvent that occurred
408
	 */
409
	@Override
410
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
411

    
412
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
413
		PartitionService partitionService = Hazelcast.getPartitionService();
414
		Partition partition = partitionService.getPartition(event.getKey());
415
		Member ownerMember = partition.getOwner();
416
		SystemMetadata sysmeta = event.getValue();
417
		if (!ownerMember.localMember()) {
418
			if (sysmeta == null) {
419
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
420
				sysmeta = getSystemMetadataMap().get(event.getKey());
421
				if (sysmeta == null) {
422
					// this is a problem
423
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
424
					// TODO: should probably return at this point since the save will fail
425
				}
426
			}
427
			// need to pull the entry into the local store
428
			saveLocally(event.getValue());
429
		}
430

    
431
		// ensure identifiers are listed in the hzIdentifiers set
432
		if (!identifiers.contains(event.getKey())) {
433
			identifiers.add(event.getKey());
434
		}
435

    
436
	}
437
	
438
	/**
439
	 * Save SystemMetadata to local store if needed
440
	 * @param sm
441
	 */
442
	private void saveLocally(SystemMetadata sm) {
443
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
444
		try {
445

    
446
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
447

    
448
		} catch (McdbDocNotFoundException e) {
449
			logMetacat.error("Could not save System Metadata to local store.", e);
450
			
451
		} catch (SQLException e) {
452
	      logMetacat.error("Could not save System Metadata to local store.", e);
453
	      
454
	    } catch (InvalidSystemMetadata e) {
455
	        logMetacat.error("Could not save System Metadata to local store.", e);
456
	        
457
	    }
458
	}
459
	
460
	/**
461
	 * Checks the local backing store for missing SystemMetadata,
462
	 * retrieves those entries from the shared map if they exist,
463
	 * and saves them locally.
464
	 */
465
	private void synchronizeLocalStore() {
466
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
467
		if (localIds != null) {
468
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
469
			for (String localId: localIds) {
470
				logMetacat.debug("Processing system metadata for localId: " + localId);
471
				try {
472
					String docid = DocumentUtil.getSmartDocId(localId);
473
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
474
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
475
					logMetacat.debug("Found mapped guid: " + guid);
476
					Identifier pid = new Identifier();
477
					pid.setValue(guid);
478
					SystemMetadata sm = systemMetadata.get(pid);
479
					logMetacat.debug("Found shared system metadata for guid: " + guid);
480
					saveLocally(sm);
481
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
482
				} catch (Exception e) {
483
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
484
				}
485
			}
486
		}
487
	}
488
	
489
	public void resynch() throws Exception {
490
		
491
		// get the CN that is online
492
		// TODO: do we even need to use a specific CN?
493
		// All the System Metadata records should be available via the shared map
494
//		NodeList allNodes = CNodeService.getInstance().listNodes();
495
//		Node onlineCN = null;
496
//		for (Node node: allNodes.getNodeList()) {
497
//			if (node.getType().equals(NodeType.CN)) {
498
//				if (node.getState().equals(NodeState.UP)) {
499
//					onlineCN = node;
500
//					break;
501
//				}
502
//			}
503
//		}
504
		
505
		// get the list of items that have changed since X
506
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
507
		EntryObject e = new PredicateBuilder().getEntryObject();
508
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
509
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
510
		for (SystemMetadata sm: updatedSystemMetadata) {
511
			saveLocally(sm);
512
		}
513
	}
514

    
515
	/**
516
	 * When there is missing SystemMetadata on the local member,
517
	 * we retrieve it from the shared map and add it to the local
518
	 * backing store for safe keeping.
519
	 */
520
	@Override
521
	public void memberAdded(MembershipEvent event) {
522
		Member member = event.getMember();
523
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
524
		boolean isLocal = member.localMember();
525
		if (isLocal) {
526
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
527
			synchronizeLocalStore();
528
		}
529
	}
530

    
531
	@Override
532
	public void memberRemoved(MembershipEvent event) {
533
		// TODO Auto-generated method stub
534
		
535
	}
536

    
537
	/**
538
	 * In cases where this cluster is paused, we want to 
539
	 * check that the local store accurately reflects the shared 
540
	 * SystemMetadata map
541
	 * @param event
542
	 */
543
	@Override
544
	public void stateChanged(LifecycleEvent event) {
545
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
546
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
547
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
548
			synchronizeLocalStore();
549
		}
550
	}
551

    
552
	/**
553
	 * Load all System Metadata keys from the backing store
554
	 * @return set of pids
555
	 */
556
	private Set<Identifier> loadAllKeys() {
557

    
558
		Set<Identifier> pids = new HashSet<Identifier>();
559
		
560
		try {
561
			
562
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
563
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
564
//					null, //startTime, 
565
//					null, //endTime, 
566
//					null, //objectFormatId, 
567
//					false, //replicaStatus, 
568
//					0, //start, 
569
//					-1 //count
570
//					);
571
//			for (ObjectInfo o: ol.getObjectInfoList()) {
572
//				Identifier pid = o.getIdentifier();
573
//				if ( !pids.contains(pid) ) {
574
//					pids.add(pid);
575
//				}				
576
//			}
577
			
578
			// ALTERNATIVE method: look up all the Identifiers from the table
579
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
580
			for (String guid: guids){
581
				Identifier pid = new Identifier();
582
				pid.setValue(guid);
583
				pids.add(pid);
584
			}
585
			
586
		} catch (Exception e) {
587
			throw new RuntimeException(e.getMessage(), e);
588
			
589
		}
590
		
591
		return pids;
592
	}
593

    
594
}
(1-1/3)