Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2013-07-03 00:28:54 -0700 (Wed, 03 Jul 2013) $'
10
 * '$Revision: 7842 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.SystemMetadata;
43

    
44
import com.hazelcast.config.Config;
45
import com.hazelcast.config.FileSystemXmlConfig;
46
import com.hazelcast.core.EntryEvent;
47
import com.hazelcast.core.EntryListener;
48
import com.hazelcast.core.Hazelcast;
49
import com.hazelcast.core.HazelcastInstance;
50
import com.hazelcast.core.ILock;
51
import com.hazelcast.core.IMap;
52
import com.hazelcast.core.ISet;
53
import com.hazelcast.core.ItemEvent;
54
import com.hazelcast.core.ItemListener;
55
import com.hazelcast.core.LifecycleEvent;
56
import com.hazelcast.core.LifecycleListener;
57
import com.hazelcast.core.Member;
58
import com.hazelcast.core.MembershipEvent;
59
import com.hazelcast.core.MembershipListener;
60
import com.hazelcast.partition.Partition;
61
import com.hazelcast.partition.PartitionService;
62

    
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.common.index.event.IndexEvent;
66
import edu.ucsb.nceas.metacat.properties.PropertyService;
67
import edu.ucsb.nceas.metacat.shared.BaseService;
68
import edu.ucsb.nceas.metacat.shared.ServiceException;
69
import edu.ucsb.nceas.metacat.util.DocumentUtil;
70
import edu.ucsb.nceas.utilities.FileUtil;
71
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
72
/**
73
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
74
 */
75
public class HazelcastService extends BaseService
76
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
77
  
78
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
79

    
80
  private static final String MISSING_PID_PREFIX = "missing-";
81

    
82
/* The instance of the logging class */
83
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
84
  
85
  /* The singleton instance of the hazelcast service */
86
  private static HazelcastService hzService = null;
87
  
88
  /* The Hazelcast configuration */
89
  private Config hzConfig;
90
  
91
  /* The name of the system metadata map */
92
  private String systemMetadataMap;
93
  
94
  /* The Hazelcast distributed system metadata map */
95
  private IMap<Identifier, SystemMetadata> systemMetadata;
96
  
97
  /* The name of the identifiers set */
98
  private String identifiersSet;
99
  
100
  /* The Hazelcast distributed identifiers set */
101
  private ISet<Identifier> identifiers;
102
  
103
  /* The Hazelcast distributed missing identifiers set */
104
  private ISet<Identifier> missingIdentifiers;
105
  
106
  /* The Hazelcast distributed index queue */
107
  private String hzIndexQueue;
108
  private ISet<SystemMetadata> indexQueue;
109
  
110
  /* The Hazelcast distributed index event map */
111
  private String hzIndexEventMap;
112
  private IMap<Identifier, IndexEvent> indexEventMap;
113

    
114
  private HazelcastInstance hzInstance;
115
      
116
  /*
117
   * Constructor: Creates an instance of the hazelcast service. Since
118
   * this uses a singleton pattern, use getInstance() to gain the instance.
119
   */
120
  private HazelcastService() {
121
    
122
    super();
123
    _serviceName="HazelcastService";
124
    
125
    try {
126
      init();
127
      
128
    } catch (ServiceException se) {
129
      logMetacat.error("There was a problem creating the HazelcastService. " +
130
                       "The error message was: " + se.getMessage());
131
      
132
    }
133
    
134
  }
135
  
136
  /**
137
   *  Get the instance of the HazelcastService that has been instantiated,
138
   *  or instantiate one if it has not been already.
139
   *
140
   * @return hazelcastService - The instance of the hazelcast service
141
   */
142
  public static HazelcastService getInstance(){
143
    
144
    if ( hzService == null ) {
145
      
146
      hzService = new HazelcastService();
147
      
148
    }
149
    return hzService;
150
  }
151
  
152
  /**
153
   * Initializes the Hazelcast service
154
   */
155
  public void init() throws ServiceException {
156
    
157
    logMetacat.debug("HazelcastService.init() called.");
158
    
159
	String configFileName = null;
160
	try {
161
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
162
		hzConfig = new FileSystemXmlConfig(configFileName);
163
	} catch (Exception e) {
164
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
165
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
166
		// make sure we have the config
167
		try {
168
			hzConfig = new FileSystemXmlConfig(configFileName);
169
		} catch (FileNotFoundException e1) {
170
			String msg = e.getMessage();
171
			logMetacat.error(msg);
172
			throw new ServiceException(msg);
173
		}
174
	}
175

    
176
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
177
  
178
  	logMetacat.debug("Initialized hzInstance");
179

    
180
    // Get configuration properties on instantiation
181
    try {
182
      systemMetadataMap = 
183
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
184
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
185

    
186
      // Get a reference to the shared system metadata map as a cluster member
187
      // NOTE: this loads the map from the backing store and can take a long time for large collections
188
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
189
      
190
      logMetacat.debug("Initialized systemMetadata");
191

    
192
      // Get a reference to the shared identifiers set as a cluster member
193
      // NOTE: this takes a long time to complete
194
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
195
      identifiers = this.hzInstance.getSet(identifiersSet);
196
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
197
      
198
      // for publishing the "PIDs Wanted" list
199
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
200
      
201
      missingIdentifiers.addItemListener(this, true);
202

    
203
      // for index tasks
204
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
205
      indexQueue = this.hzInstance.getSet(hzIndexQueue);
206

    
207
      // for index events (failures)
208
      hzIndexEventMap = PropertyService.getProperty("index.hazelcast.indexeventmap");
209
      indexEventMap = this.hzInstance.getMap(hzIndexEventMap);
210
      
211
      // Listen for changes to the system metadata map
212
      systemMetadata.addEntryListener(this, true);
213
      
214
      // Listen for members added/removed
215
      hzInstance.getCluster().addMembershipListener(this);
216
      
217
      // Listen for lifecycle state changes
218
      hzInstance.getLifecycleService().addLifecycleListener(this);
219
      
220
    } catch (PropertyNotFoundException e) {
221

    
222
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
223
        "The error message was: " + e.getMessage();
224
      logMetacat.error(msg);
225
      
226
    }
227
    
228
    // make sure we have all metadata locally
229
    try {
230
    	// synch on restart
231
        resynchInThread();
232
	} catch (Exception e) {
233
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
234
		logMetacat.error(msg, e);
235
	}
236
        
237
  }
238
  
239
  /**
240
   * Get the system metadata map
241
   * 
242
   * @return systemMetadata - the hazelcast map of system metadata
243
   * @param identifier - the identifier of the object as a string
244
   */
245
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
246
	  return systemMetadata;
247
  }
248
  
249
  /**
250
   * Get the identifiers set
251
   * @return identifiers - the set of unique DataONE identifiers in the cluster
252
   */
253
  public ISet<Identifier> getIdentifiers() {
254
      return identifiers;
255
      
256
  }
257

    
258
  /**
259
   * Get the index queue
260
   * @return the set of SystemMetadata to be indexed
261
   */
262
  public ISet<SystemMetadata> getIndexQueue() {
263
      return indexQueue;
264
  }
265
  
266
  /**
267
   * Get the index event map
268
   * @return indexEventMap - the hazelcast map of index events
269
   */
270
  public IMap<Identifier, IndexEvent> getIndexEventMap() {
271
	  return indexEventMap;
272
  }
273
  
274
  /**
275
   * When Metacat changes the underlying store, we need to refresh the
276
   * in-memory representation of it.
277
   * @param guid
278
   */
279
  public void refreshSystemMetadataEntry(String guid) {
280
	Identifier identifier = new Identifier();
281
	identifier.setValue(guid);
282
	// force hazelcast to update system metadata in memory from the store
283
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
284
  }
285

    
286
  public Lock getLock(String identifier) {
287
    
288
    Lock lock = null;
289
    
290
    try {
291
        lock = getInstance().getHazelcastInstance().getLock(identifier);
292
        
293
    } catch (RuntimeException e) {
294
        logMetacat.info("Couldn't get a lock for identifier " + 
295
            identifier + " !!");
296
    }
297
    return lock;
298
      
299
  }
300
  
301
  /**
302
   * Get the DataONE hazelcast node map
303
   * @return nodes - the hazelcast map of nodes
304
   */
305
//  public IMap<NodeReference, Node> getNodesMap() {
306
//	  return nodes;
307
//  }
308
  
309
  /**
310
   * Indicate whether or not this service is refreshable.
311
   *
312
   * @return refreshable - the boolean refreshable status
313
   */
314
  public boolean refreshable() {
315
    // TODO: Determine the consequences of restarting the Hazelcast instance
316
    // Set this to true if it's okay to drop from the cluster, lose the maps,
317
    // and start back up again
318
    return false;
319
    
320
  }
321
  
322
  /**
323
   * Stop the HazelcastService. When stopped, the service will no longer
324
   * respond to requests.
325
   */
326
  public void stop() throws ServiceException {
327
    
328
	  this.hzInstance.getLifecycleService().shutdown();
329
    
330
  }
331

    
332
  public HazelcastInstance getHazelcastInstance() {
333
      return this.hzInstance;
334
      
335
  }
336
  
337
  /**
338
   * Refresh the Hazelcast service by restarting it
339
   */
340
  @Override
341
  protected void doRefresh() throws ServiceException {
342

    
343
    // TODO: verify that the correct config file is still used
344
	  this.hzInstance.getLifecycleService().restart();
345
    
346
  }
347
  
348
  /**
349
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
350
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
351
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
352
	 * 
353
	 * @param event - The EntryEvent that occurred
354
	 */
355
	@Override
356
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
357
	  
358
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
359
	      event.getKey().getValue());
360
		// handle as update - that method will create if necessary
361
		entryUpdated(event);
362

    
363
	}
364

    
365
	/**
366
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
367
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
368
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
369
	 * 
370
	 * @param event - The EntryEvent that occurred
371
	 */
372
	@Override
373
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
374

    
375
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
376
          event.getKey().getValue());
377
      
378
	    // ensure identifiers are listed in the hzIdentifiers set
379
      if ( !identifiers.contains(event.getKey()) ) {
380
          identifiers.add(event.getKey());
381
      }
382
	  
383
	}
384
	
385
	/**
386
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
387
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
388
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
389
	 * 
390
	 * @param event - The EntryEvent that occurred
391
	 */
392
	@Override
393
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
394
		
395
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
396
        event.getKey().getValue());
397

    
398
	  // we typically don't remove objects in Metacat, but can remove System Metadata
399
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
400

    
401
    // keep the hzIdentifiers set in sync with the systemmetadata table
402
    if ( identifiers.contains(event.getKey()) ) {
403
        identifiers.remove(event.getKey());
404
        
405
    }
406

    
407
	}
408
	
409
	/**
410
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
411
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
412
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
413
	 * 
414
	 * @param event - The EntryEvent that occurred
415
	 */
416
	@Override
417
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
418

    
419
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
420
		PartitionService partitionService = this.hzInstance.getPartitionService();
421
		Partition partition = partitionService.getPartition(event.getKey());
422
		Member ownerMember = partition.getOwner();
423
		SystemMetadata sysmeta = event.getValue();
424
		if (!ownerMember.localMember()) {
425
			if (sysmeta == null) {
426
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
427
				sysmeta = getSystemMetadataMap().get(event.getKey());
428
				if (sysmeta == null) {
429
					// this is a problem
430
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
431
					// TODO: should probably return at this point since the save will fail
432
				}
433
			}
434
			// need to pull the entry into the local store
435
			saveLocally(event.getValue());
436
		}
437

    
438
		// ensure identifiers are listed in the hzIdentifiers set
439
		if (!identifiers.contains(event.getKey())) {
440
			identifiers.add(event.getKey());
441
		}
442

    
443
	}
444
	
445
	/**
446
	 * Save SystemMetadata to local store if needed
447
	 * @param sm
448
	 */
449
	private void saveLocally(SystemMetadata sm) {
450
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
451
		try {
452

    
453
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
454

    
455
		} catch (McdbDocNotFoundException e) {
456
			logMetacat.error("Could not save System Metadata to local store.", e);
457
			
458
		} catch (SQLException e) {
459
	      logMetacat.error("Could not save System Metadata to local store.", e);
460
	      
461
	    } catch (InvalidSystemMetadata e) {
462
	        logMetacat.error("Could not save System Metadata to local store.", e);
463
	        
464
	    }
465
	}
466
	
467
	/**
468
	 * Checks the local backing store for missing SystemMetadata,
469
	 * retrieves those entries from the shared map if they exist,
470
	 * and saves them locally.
471
	 */
472
	private void synchronizeLocalStore() {
473
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
474
		if (localIds != null) {
475
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
476
			for (String localId: localIds) {
477
				logMetacat.debug("Processing system metadata for localId: " + localId);
478
				try {
479
					String docid = DocumentUtil.getSmartDocId(localId);
480
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
481
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
482
					logMetacat.debug("Found mapped guid: " + guid);
483
					Identifier pid = new Identifier();
484
					pid.setValue(guid);
485
					SystemMetadata sm = systemMetadata.get(pid);
486
					logMetacat.debug("Found shared system metadata for guid: " + guid);
487
					saveLocally(sm);
488
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
489
				} catch (Exception e) {
490
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
491
				}
492
			}
493
		}
494
	}
495
	
496
	
497
	/**
498
	 * Make sure we have a copy of every entry in the shared map.
499
	 * We use lazy loading and therefore the CNs may not all be in sync when one
500
	 * comes back online after an extended period of being offline
501
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
502
	 * and makes sure each one is present on the shared map.
503
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in 
504
	 * null values where the owner does not have a complete backing store.
505
	 * This will be an expensive routine and should be run in a background process so that
506
	 * the server can continue to service other requests during the synch
507
	 * @throws Exception
508
	 */
509
	private void resynchToRemote() {
510
		
511
		// the local identifiers not already present in the shared map
512
		Set<Identifier> localIdKeys = loadAllKeys();
513
		
514
		//  the PIDs missing locally
515
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
516
				
517
		// only contribute PIDs that are not already shared
518
		Iterator<Identifier> idIter = identifiers.iterator();
519
		int processedCount = 0;
520
		while (idIter.hasNext()) {
521
			Identifier pid = idIter.next();
522
			if (localIdKeys.contains(pid)) {
523
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
524
				localIdKeys.remove(pid);
525
			} else {
526
				// we don't have this locally, so we should try to get it
527
				missingIdKeys.add(pid);
528
			}
529
			processedCount++;
530
		}
531
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
532

    
533
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
534

    
535
		//identifiers.addAll(idKeys);
536
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
537
		for (Identifier key: localIdKeys) {
538
			if (!identifiers.contains(key)) {
539
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
540
				identifiers.add(key);
541
			}
542
		}
543
		logMetacat.warn("Initialized identifiers with missing local keys");
544
		
545
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
546
		
547
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
548
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
549
		while (missingPids.hasNext()) {
550
			Identifier pid = missingPids.next();
551
			// publish that we need this SM entry
552
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
553
			missingIdentifiers.add(pid);
554
		}
555
		
556
	}
557
	
558
	public void resynchInThread() {
559
		logMetacat.debug("launching system metadata resynch in a thread");
560
		ExecutorService executor = Executors.newSingleThreadExecutor();
561
		executor.execute(new Runnable() {
562
			@Override
563
			public void run() {
564
				try {
565
					// this is a push mechanism
566
					resynchToRemote();
567
				} catch (Exception e) {
568
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
569
				}
570
			}
571
		});
572
		executor.shutdown();
573
	}
574

    
575
	/**
576
	 * When there is missing SystemMetadata on the local member,
577
	 * we retrieve it from the shared map and add it to the local
578
	 * backing store for safe keeping.
579
	 */
580
	@Override
581
	public void memberAdded(MembershipEvent event) {
582
		Member member = event.getMember();
583
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
584
		boolean isLocal = member.localMember();
585
		if (isLocal) {
586
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
587
			synchronizeLocalStore();
588
		}
589
	}
590

    
591
	@Override
592
	public void memberRemoved(MembershipEvent event) {
593
		// TODO Auto-generated method stub
594
		
595
	}
596

    
597
	/**
598
	 * In cases where this cluster is paused, we want to 
599
	 * check that the local store accurately reflects the shared 
600
	 * SystemMetadata map
601
	 * @param event
602
	 */
603
	@Override
604
	public void stateChanged(LifecycleEvent event) {
605
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
606
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
607
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
608
			synchronizeLocalStore();
609
		}
610
	}
611

    
612
	/**
613
	 * Load all System Metadata keys from the backing store
614
	 * @return set of pids
615
	 */
616
	private Set<Identifier> loadAllKeys() {
617

    
618
		Set<Identifier> pids = new HashSet<Identifier>();
619
		
620
		try {
621
			
622
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
623
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
624
//					null, //startTime, 
625
//					null, //endTime, 
626
//					null, //objectFormatId, 
627
//					false, //replicaStatus, 
628
//					0, //start, 
629
//					-1 //count
630
//					);
631
//			for (ObjectInfo o: ol.getObjectInfoList()) {
632
//				Identifier pid = o.getIdentifier();
633
//				if ( !pids.contains(pid) ) {
634
//					pids.add(pid);
635
//				}				
636
//			}
637
			
638
			// ALTERNATIVE method: look up all the Identifiers from the table
639
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
640
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
641
			for (String guid: guids){
642
				Identifier pid = new Identifier();
643
				pid.setValue(guid);
644
				pids.add(pid);
645
			}
646
			
647
		} catch (Exception e) {
648
			throw new RuntimeException(e.getMessage(), e);
649
			
650
		}
651
		
652
		return pids;
653
	}
654

    
655
	/**
656
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
657
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
658
	 * 
659
	 * @param pid   the identifier of the event
660
	 */
661
	@Override
662
	public void itemAdded(ItemEvent<Identifier> event) {
663
		
664
		Identifier pid = (Identifier) event.getItem();
665
		// publish the SM for the pid if we have it locally
666
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
667
		
668
		// lock this event, only if we have a local copy to contribute
669
		ILock lock = null;
670
		try {
671
			// look up the local copy of the SM
672
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
673
			if (sm != null) {
674
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
675
				
676
				if ( lock.tryLock() ) {
677
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
678
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
679
			        systemMetadata.put(pid, sm);
680
			        
681
			        // remove the entry since we processed it
682
			        missingIdentifiers.remove(pid);
683
			      
684
				  } else {
685
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
686
				  }
687
			} else {
688
				// can't help here
689
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
690
			}
691
		} catch (Exception e) {
692
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
693
		} finally {
694
			if ( lock != null ) {
695
				lock.unlock();
696
			}
697
        }
698
	}
699

    
700
	/**
701
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
702
   * 
703
   * @param pid   the identifier of the event
704
   */
705
	@Override
706
	public void itemRemoved(ItemEvent<Identifier> event) {
707
		// do nothing since someone probably handled the wanted PID
708
		
709
	}
710

    
711
}
(1-1/3)