Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2014-07-23 16:19:48 -0700 (Wed, 23 Jul 2014) $'
10
 * '$Revision: 8810 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v2.SystemMetadata;
43

    
44
import com.hazelcast.config.Config;
45
import com.hazelcast.config.FileSystemXmlConfig;
46
import com.hazelcast.core.EntryEvent;
47
import com.hazelcast.core.EntryListener;
48
import com.hazelcast.core.Hazelcast;
49
import com.hazelcast.core.HazelcastInstance;
50
import com.hazelcast.core.ILock;
51
import com.hazelcast.core.IMap;
52
import com.hazelcast.core.ISet;
53
import com.hazelcast.core.ItemEvent;
54
import com.hazelcast.core.ItemListener;
55
import com.hazelcast.core.LifecycleEvent;
56
import com.hazelcast.core.LifecycleListener;
57
import com.hazelcast.core.Member;
58
import com.hazelcast.core.MembershipEvent;
59
import com.hazelcast.core.MembershipListener;
60
import com.hazelcast.partition.Partition;
61
import com.hazelcast.partition.PartitionService;
62

    
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.common.index.IndexTask;
66
import edu.ucsb.nceas.metacat.common.index.event.IndexEvent;
67
import edu.ucsb.nceas.metacat.properties.PropertyService;
68
import edu.ucsb.nceas.metacat.shared.BaseService;
69
import edu.ucsb.nceas.metacat.shared.ServiceException;
70
import edu.ucsb.nceas.metacat.util.DocumentUtil;
71
import edu.ucsb.nceas.utilities.FileUtil;
72
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
73
/**
74
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
75
 */
76
public class HazelcastService extends BaseService
77
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
78
  
79
  private static final String MISSING_PID_PREFIX = "missing-";
80

    
81
/* The instance of the logging class */
82
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
83
  
84
  /* The singleton instance of the hazelcast service */
85
  private static HazelcastService hzService = null;
86
  
87
  /* The Hazelcast configuration */
88
  private Config hzConfig;
89
  
90
  /* The name of the system metadata map */
91
  private String systemMetadataMap;
92
  
93
  /* The Hazelcast distributed system metadata map */
94
  private IMap<Identifier, SystemMetadata> systemMetadata;
95
  
96
  /* The name of the identifiers set */
97
  private String identifiersSet;
98
  
99
  /* The Hazelcast distributed identifiers set */
100
  private ISet<Identifier> identifiers;
101
  
102
  /* The Hazelcast distributed missing identifiers set */
103
  private ISet<Identifier> missingIdentifiers;
104
  
105
  /* The Hazelcast distributed index queue */
106
  private String hzIndexQueue;
107
  private IMap<Identifier, IndexTask> indexQueue;
108
  
109
  /* The Hazelcast distributed index event map */
110
  private String hzIndexEventMap;
111
  private IMap<Identifier, IndexEvent> indexEventMap;
112

    
113
  private HazelcastInstance hzInstance;
114
      
115
  /*
116
   * Constructor: Creates an instance of the hazelcast service. Since
117
   * this uses a singleton pattern, use getInstance() to gain the instance.
118
   */
119
  private HazelcastService() {
120
    
121
    super();
122
    _serviceName="HazelcastService";
123
    
124
    try {
125
      init();
126
      
127
    } catch (ServiceException se) {
128
      logMetacat.error("There was a problem creating the HazelcastService. " +
129
                       "The error message was: " + se.getMessage());
130
      
131
    }
132
    
133
  }
134
  
135
  /**
136
   *  Get the instance of the HazelcastService that has been instantiated,
137
   *  or instantiate one if it has not been already.
138
   *
139
   * @return hazelcastService - The instance of the hazelcast service
140
   */
141
  public static HazelcastService getInstance(){
142
    
143
    if ( hzService == null ) {
144
      
145
      hzService = new HazelcastService();
146
      
147
    }
148
    return hzService;
149
  }
150
  
151
  /**
152
   * Initializes the Hazelcast service
153
   */
154
  public void init() throws ServiceException {
155
    
156
    logMetacat.debug("HazelcastService.init() called.");
157
    
158
	String configFileName = null;
159
	try {
160
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
161
		hzConfig = new FileSystemXmlConfig(configFileName);
162
	} catch (Exception e) {
163
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
164
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
165
		// make sure we have the config
166
		try {
167
			hzConfig = new FileSystemXmlConfig(configFileName);
168
		} catch (FileNotFoundException e1) {
169
			String msg = e.getMessage();
170
			logMetacat.error(msg);
171
			throw new ServiceException(msg);
172
		}
173
	}
174

    
175
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
176
  
177
  	logMetacat.debug("Initialized hzInstance");
178

    
179
    // Get configuration properties on instantiation
180
    try {
181
      systemMetadataMap = 
182
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
183
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
184

    
185
      // Get a reference to the shared system metadata map as a cluster member
186
      // NOTE: this loads the map from the backing store and can take a long time for large collections
187
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
188
      
189
      logMetacat.debug("Initialized systemMetadata");
190

    
191
      // Get a reference to the shared identifiers set as a cluster member
192
      // NOTE: this takes a long time to complete
193
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
194
      identifiers = this.hzInstance.getSet(identifiersSet);
195
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
196
      
197
      // for publishing the "PIDs Wanted" list
198
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
199
      
200
      missingIdentifiers.addItemListener(this, true);
201

    
202
      // for index tasks
203
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
204
      indexQueue = this.hzInstance.getMap(hzIndexQueue);
205

    
206
      // for index events (failures)
207
      hzIndexEventMap = PropertyService.getProperty("index.hazelcast.indexeventmap");
208
      indexEventMap = this.hzInstance.getMap(hzIndexEventMap);
209
      
210
      // Listen for changes to the system metadata map
211
      systemMetadata.addEntryListener(this, true);
212
      
213
      // Listen for members added/removed
214
      hzInstance.getCluster().addMembershipListener(this);
215
      
216
      // Listen for lifecycle state changes
217
      hzInstance.getLifecycleService().addLifecycleListener(this);
218
      
219
    } catch (PropertyNotFoundException e) {
220

    
221
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
222
        "The error message was: " + e.getMessage();
223
      logMetacat.error(msg);
224
      
225
    }
226
    
227
    // make sure we have all metadata locally
228
    try {
229
    	// synch on restart
230
        resynchInThread();
231
	} catch (Exception e) {
232
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
233
		logMetacat.error(msg, e);
234
	}
235
        
236
  }
237
  
238
  /**
239
   * Get the system metadata map
240
   * 
241
   * @return systemMetadata - the hazelcast map of system metadata
242
   * @param identifier - the identifier of the object as a string
243
   */
244
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
245
	  return systemMetadata;
246
  }
247
  
248
  /**
249
   * Get the identifiers set
250
   * @return identifiers - the set of unique DataONE identifiers in the cluster
251
   */
252
  public ISet<Identifier> getIdentifiers() {
253
      return identifiers;
254
      
255
  }
256

    
257
  /**
258
   * Get the index queue
259
   * @return the set of SystemMetadata to be indexed
260
   */
261
  public IMap<Identifier, IndexTask> getIndexQueue() {
262
      return indexQueue;
263
  }
264
  
265
  /**
266
   * Get the index event map
267
   * @return indexEventMap - the hazelcast map of index events
268
   */
269
  public IMap<Identifier, IndexEvent> getIndexEventMap() {
270
	  return indexEventMap;
271
  }
272
  
273
  /**
274
   * When Metacat changes the underlying store, we need to refresh the
275
   * in-memory representation of it.
276
   * @param guid
277
   */
278
  public void refreshSystemMetadataEntry(String guid) {
279
	Identifier identifier = new Identifier();
280
	identifier.setValue(guid);
281
	// force hazelcast to update system metadata in memory from the store
282
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
283
  }
284

    
285
  public Lock getLock(String identifier) {
286
    
287
    Lock lock = null;
288
    
289
    try {
290
        lock = getInstance().getHazelcastInstance().getLock(identifier);
291
        
292
    } catch (RuntimeException e) {
293
        logMetacat.info("Couldn't get a lock for identifier " + 
294
            identifier + " !!");
295
    }
296
    return lock;
297
      
298
  }
299
  
300
  /**
301
   * Get the DataONE hazelcast node map
302
   * @return nodes - the hazelcast map of nodes
303
   */
304
//  public IMap<NodeReference, Node> getNodesMap() {
305
//	  return nodes;
306
//  }
307
  
308
  /**
309
   * Indicate whether or not this service is refreshable.
310
   *
311
   * @return refreshable - the boolean refreshable status
312
   */
313
  public boolean refreshable() {
314
    // TODO: Determine the consequences of restarting the Hazelcast instance
315
    // Set this to true if it's okay to drop from the cluster, lose the maps,
316
    // and start back up again
317
    return false;
318
    
319
  }
320
  
321
  /**
322
   * Stop the HazelcastService. When stopped, the service will no longer
323
   * respond to requests.
324
   */
325
  public void stop() throws ServiceException {
326
    
327
	  this.hzInstance.getLifecycleService().shutdown();
328
    
329
  }
330

    
331
  public HazelcastInstance getHazelcastInstance() {
332
      return this.hzInstance;
333
      
334
  }
335
  
336
  /**
337
   * Refresh the Hazelcast service by restarting it
338
   */
339
  @Override
340
  protected void doRefresh() throws ServiceException {
341

    
342
    // TODO: verify that the correct config file is still used
343
	  this.hzInstance.getLifecycleService().restart();
344
    
345
  }
346
  
347
  /**
348
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
349
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
350
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
351
	 * 
352
	 * @param event - The EntryEvent that occurred
353
	 */
354
	@Override
355
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
356
	  
357
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
358
	      event.getKey().getValue());
359
		// handle as update - that method will create if necessary
360
		entryUpdated(event);
361

    
362
	}
363

    
364
	/**
365
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
366
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
367
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
368
	 * 
369
	 * @param event - The EntryEvent that occurred
370
	 */
371
	@Override
372
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
373

    
374
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
375
          event.getKey().getValue());
376
      
377
	    // ensure identifiers are listed in the hzIdentifiers set
378
      if ( !identifiers.contains(event.getKey()) ) {
379
          identifiers.add(event.getKey());
380
      }
381
	  
382
	}
383
	
384
	/**
385
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
386
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
387
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
388
	 * 
389
	 * @param event - The EntryEvent that occurred
390
	 */
391
	@Override
392
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
393
		
394
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
395
        event.getKey().getValue());
396

    
397
	  // we typically don't remove objects in Metacat, but can remove System Metadata
398
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
399

    
400
    // keep the hzIdentifiers set in sync with the systemmetadata table
401
    if ( identifiers.contains(event.getKey()) ) {
402
        identifiers.remove(event.getKey());
403
        
404
    }
405

    
406
	}
407
	
408
	/**
409
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
410
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
411
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
412
	 * 
413
	 * @param event - The EntryEvent that occurred
414
	 */
415
	@Override
416
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
417

    
418
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
419
		PartitionService partitionService = this.hzInstance.getPartitionService();
420
		Partition partition = partitionService.getPartition(event.getKey());
421
		Member ownerMember = partition.getOwner();
422
		SystemMetadata sysmeta = event.getValue();
423
		if (!ownerMember.localMember()) {
424
			if (sysmeta == null) {
425
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
426
				sysmeta = getSystemMetadataMap().get(event.getKey());
427
				if (sysmeta == null) {
428
					// this is a problem
429
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
430
					// TODO: should probably return at this point since the save will fail
431
				}
432
			}
433
			// need to pull the entry into the local store
434
			saveLocally(event.getValue());
435
		}
436

    
437
		// ensure identifiers are listed in the hzIdentifiers set
438
		if (!identifiers.contains(event.getKey())) {
439
			identifiers.add(event.getKey());
440
		}
441

    
442
	}
443
	
444
	/**
445
	 * Save SystemMetadata to local store if needed
446
	 * @param sm
447
	 */
448
	private void saveLocally(SystemMetadata sm) {
449
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
450
		try {
451

    
452
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
453

    
454
		} catch (McdbDocNotFoundException e) {
455
			logMetacat.error("Could not save System Metadata to local store.", e);
456
			
457
		} catch (SQLException e) {
458
	      logMetacat.error("Could not save System Metadata to local store.", e);
459
	      
460
	    } catch (InvalidSystemMetadata e) {
461
	        logMetacat.error("Could not save System Metadata to local store.", e);
462
	        
463
	    }
464
	}
465
	
466
	/**
467
	 * Checks the local backing store for missing SystemMetadata,
468
	 * retrieves those entries from the shared map if they exist,
469
	 * and saves them locally.
470
	 */
471
	private void synchronizeLocalStore() {
472
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
473
		if (localIds != null) {
474
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
475
			for (String localId: localIds) {
476
				logMetacat.debug("Processing system metadata for localId: " + localId);
477
				try {
478
					String docid = DocumentUtil.getSmartDocId(localId);
479
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
480
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
481
					logMetacat.debug("Found mapped guid: " + guid);
482
					Identifier pid = new Identifier();
483
					pid.setValue(guid);
484
					SystemMetadata sm = systemMetadata.get(pid);
485
					logMetacat.debug("Found shared system metadata for guid: " + guid);
486
					saveLocally(sm);
487
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
488
				} catch (Exception e) {
489
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
490
				}
491
			}
492
		}
493
	}
494
	
495
	
496
	/**
497
	 * Make sure we have a copy of every entry in the shared map.
498
	 * We use lazy loading and therefore the CNs may not all be in sync when one
499
	 * comes back online after an extended period of being offline
500
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
501
	 * and makes sure each one is present on the shared map.
502
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in 
503
	 * null values where the owner does not have a complete backing store.
504
	 * This will be an expensive routine and should be run in a background process so that
505
	 * the server can continue to service other requests during the synch
506
	 * @throws Exception
507
	 */
508
	private void resynchToRemote() {
509
		
510
		// the local identifiers not already present in the shared map
511
		Set<Identifier> localIdKeys = loadAllKeys();
512
		
513
		//  the PIDs missing locally
514
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
515
				
516
		// only contribute PIDs that are not already shared
517
		Iterator<Identifier> idIter = identifiers.iterator();
518
		int processedCount = 0;
519
		while (idIter.hasNext()) {
520
			Identifier pid = idIter.next();
521
			if (localIdKeys.contains(pid)) {
522
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
523
				localIdKeys.remove(pid);
524
			} else {
525
				// we don't have this locally, so we should try to get it
526
				missingIdKeys.add(pid);
527
			}
528
			processedCount++;
529
		}
530
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
531

    
532
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
533

    
534
		//identifiers.addAll(idKeys);
535
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
536
		for (Identifier key: localIdKeys) {
537
			if (!identifiers.contains(key)) {
538
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
539
				identifiers.add(key);
540
			}
541
		}
542
		logMetacat.warn("Initialized identifiers with missing local keys");
543
		
544
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
545
		
546
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
547
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
548
		while (missingPids.hasNext()) {
549
			Identifier pid = missingPids.next();
550
			// publish that we need this SM entry
551
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
552
			missingIdentifiers.add(pid);
553
		}
554
		
555
	}
556
	
557
	public void resynchInThread() {
558
		logMetacat.debug("launching system metadata resynch in a thread");
559
		ExecutorService executor = Executors.newSingleThreadExecutor();
560
		executor.execute(new Runnable() {
561
			@Override
562
			public void run() {
563
				try {
564
					// this is a push mechanism
565
					resynchToRemote();
566
				} catch (Exception e) {
567
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
568
				}
569
			}
570
		});
571
		executor.shutdown();
572
	}
573

    
574
	/**
575
	 * When there is missing SystemMetadata on the local member,
576
	 * we retrieve it from the shared map and add it to the local
577
	 * backing store for safe keeping.
578
	 */
579
	@Override
580
	public void memberAdded(MembershipEvent event) {
581
		Member member = event.getMember();
582
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
583
		boolean isLocal = member.localMember();
584
		if (isLocal) {
585
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
586
			synchronizeLocalStore();
587
		}
588
	}
589

    
590
	@Override
591
	public void memberRemoved(MembershipEvent event) {
592
		// TODO Auto-generated method stub
593
		
594
	}
595

    
596
	/**
597
	 * In cases where this cluster is paused, we want to 
598
	 * check that the local store accurately reflects the shared 
599
	 * SystemMetadata map
600
	 * @param event
601
	 */
602
	@Override
603
	public void stateChanged(LifecycleEvent event) {
604
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
605
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
606
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
607
			synchronizeLocalStore();
608
		}
609
	}
610

    
611
	/**
612
	 * Load all System Metadata keys from the backing store
613
	 * @return set of pids
614
	 */
615
	private Set<Identifier> loadAllKeys() {
616

    
617
		Set<Identifier> pids = new HashSet<Identifier>();
618
		
619
		try {
620
			
621
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
622
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
623
//					null, //startTime, 
624
//					null, //endTime, 
625
//					null, //objectFormatId, 
626
//					false, //replicaStatus, 
627
//					0, //start, 
628
//					-1 //count
629
//					);
630
//			for (ObjectInfo o: ol.getObjectInfoList()) {
631
//				Identifier pid = o.getIdentifier();
632
//				if ( !pids.contains(pid) ) {
633
//					pids.add(pid);
634
//				}				
635
//			}
636
			
637
			// ALTERNATIVE method: look up all the Identifiers from the table
638
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
639
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
640
			for (String guid: guids){
641
				Identifier pid = new Identifier();
642
				pid.setValue(guid);
643
				pids.add(pid);
644
			}
645
			
646
		} catch (Exception e) {
647
			throw new RuntimeException(e.getMessage(), e);
648
			
649
		}
650
		
651
		return pids;
652
	}
653

    
654
	/**
655
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
656
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
657
	 * 
658
	 * @param pid   the identifier of the event
659
	 */
660
	@Override
661
	public void itemAdded(ItemEvent<Identifier> event) {
662
		
663
		Identifier pid = (Identifier) event.getItem();
664
		// publish the SM for the pid if we have it locally
665
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
666
		
667
		// lock this event, only if we have a local copy to contribute
668
		ILock lock = null;
669
		try {
670
			// look up the local copy of the SM
671
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
672
			if (sm != null) {
673
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
674
				
675
				if ( lock.tryLock() ) {
676
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
677
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
678
			        systemMetadata.put(pid, sm);
679
			        
680
			        // remove the entry since we processed it
681
			        missingIdentifiers.remove(pid);
682
			      
683
				  } else {
684
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
685
				  }
686
			} else {
687
				// can't help here
688
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
689
			}
690
		} catch (Exception e) {
691
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
692
		} finally {
693
			if ( lock != null ) {
694
				lock.unlock();
695
			}
696
        }
697
	}
698

    
699
	/**
700
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
701
   * 
702
   * @param pid   the identifier of the event
703
   */
704
	@Override
705
	public void itemRemoved(ItemEvent<Identifier> event) {
706
		// do nothing since someone probably handled the wanted PID
707
		
708
	}
709

    
710
}
(1-1/3)