Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2013-06-20 16:49:33 -0700 (Thu, 20 Jun 2013) $'
10
 * '$Revision: 7812 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.SystemMetadata;
43

    
44
import com.hazelcast.config.Config;
45
import com.hazelcast.config.FileSystemXmlConfig;
46
import com.hazelcast.core.EntryEvent;
47
import com.hazelcast.core.EntryListener;
48
import com.hazelcast.core.Hazelcast;
49
import com.hazelcast.core.HazelcastInstance;
50
import com.hazelcast.core.ILock;
51
import com.hazelcast.core.IMap;
52
import com.hazelcast.core.ISet;
53
import com.hazelcast.core.ItemEvent;
54
import com.hazelcast.core.ItemListener;
55
import com.hazelcast.core.LifecycleEvent;
56
import com.hazelcast.core.LifecycleListener;
57
import com.hazelcast.core.Member;
58
import com.hazelcast.core.MembershipEvent;
59
import com.hazelcast.core.MembershipListener;
60
import com.hazelcast.partition.Partition;
61
import com.hazelcast.partition.PartitionService;
62

    
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.shared.BaseService;
67
import edu.ucsb.nceas.metacat.shared.ServiceException;
68
import edu.ucsb.nceas.metacat.util.DocumentUtil;
69
import edu.ucsb.nceas.utilities.FileUtil;
70
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
71
/**
72
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
73
 */
74
public class HazelcastService extends BaseService
75
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
76
  
77
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
78

    
79
  private static final String MISSING_PID_PREFIX = "missing-";
80

    
81
/* The instance of the logging class */
82
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
83
  
84
  /* The singleton instance of the hazelcast service */
85
  private static HazelcastService hzService = null;
86
  
87
  /* The Hazelcast configuration */
88
  private Config hzConfig;
89
  
90
  /* The name of the system metadata map */
91
  private String systemMetadataMap;
92
  
93
  /* The Hazelcast distributed system metadata map */
94
  private IMap<Identifier, SystemMetadata> systemMetadata;
95
  
96
  /* The name of the identifiers set */
97
  private String identifiersSet;
98
  
99
  /* The Hazelcast distributed identifiers set */
100
  private ISet<Identifier> identifiers;
101
  
102
  /* The Hazelcast distributed missing identifiers set */
103
  private ISet<Identifier> missingIdentifiers;
104
  
105
  /* The Hazelcast distributed index queue */
106
  private String hzIndexQueue;
107
  private ISet<SystemMetadata> indexQueue;
108

    
109
  private HazelcastInstance hzInstance;
110
      
111
  /*
112
   * Constructor: Creates an instance of the hazelcast service. Since
113
   * this uses a singleton pattern, use getInstance() to gain the instance.
114
   */
115
  private HazelcastService() {
116
    
117
    super();
118
    _serviceName="HazelcastService";
119
    
120
    try {
121
      init();
122
      
123
    } catch (ServiceException se) {
124
      logMetacat.error("There was a problem creating the HazelcastService. " +
125
                       "The error message was: " + se.getMessage());
126
      
127
    }
128
    
129
  }
130
  
131
  /**
132
   *  Get the instance of the HazelcastService that has been instantiated,
133
   *  or instantiate one if it has not been already.
134
   *
135
   * @return hazelcastService - The instance of the hazelcast service
136
   */
137
  public static HazelcastService getInstance(){
138
    
139
    if ( hzService == null ) {
140
      
141
      hzService = new HazelcastService();
142
      
143
    }
144
    return hzService;
145
  }
146
  
147
  /**
148
   * Initializes the Hazelcast service
149
   */
150
  public void init() throws ServiceException {
151
    
152
    logMetacat.debug("HazelcastService.init() called.");
153
    
154
	String configFileName = null;
155
	try {
156
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
157
		hzConfig = new FileSystemXmlConfig(configFileName);
158
	} catch (Exception e) {
159
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
160
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
161
		// make sure we have the config
162
		try {
163
			hzConfig = new FileSystemXmlConfig(configFileName);
164
		} catch (FileNotFoundException e1) {
165
			String msg = e.getMessage();
166
			logMetacat.error(msg);
167
			throw new ServiceException(msg);
168
		}
169
	}
170

    
171
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
172
  
173
  	logMetacat.debug("Initialized hzInstance");
174

    
175
    // Get configuration properties on instantiation
176
    try {
177
      systemMetadataMap = 
178
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
179
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
180

    
181
      // Get a reference to the shared system metadata map as a cluster member
182
      // NOTE: this loads the map from the backing store and can take a long time for large collections
183
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
184
      
185
      logMetacat.debug("Initialized systemMetadata");
186

    
187
      // Get a reference to the shared identifiers set as a cluster member
188
      // NOTE: this takes a long time to complete
189
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
190
      identifiers = this.hzInstance.getSet(identifiersSet);
191
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
192
      
193
      // for publishing the "PIDs Wanted" list
194
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
195
      
196
      missingIdentifiers.addItemListener(this, true);
197
      
198
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
199
      indexQueue = this.hzInstance.getSet(hzIndexQueue);
200

    
201
      // Listen for changes to the system metadata map
202
      systemMetadata.addEntryListener(this, true);
203
      
204
      // Listen for members added/removed
205
      hzInstance.getCluster().addMembershipListener(this);
206
      
207
      // Listen for lifecycle state changes
208
      hzInstance.getLifecycleService().addLifecycleListener(this);
209
      
210
    } catch (PropertyNotFoundException e) {
211

    
212
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
213
        "The error message was: " + e.getMessage();
214
      logMetacat.error(msg);
215
      
216
    }
217
    
218
    // make sure we have all metadata locally
219
    try {
220
    	// synch on restart
221
        resynchInThread();
222
	} catch (Exception e) {
223
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
224
		logMetacat.error(msg, e);
225
	}
226
        
227
  }
228
  
229
  /**
230
   * Get the system metadata map
231
   * 
232
   * @return systemMetadata - the hazelcast map of system metadata
233
   * @param identifier - the identifier of the object as a string
234
   */
235
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
236
	  return systemMetadata;
237
  }
238
  
239
  /**
240
   * Get the identifiers set
241
   * @return identifiers - the set of unique DataONE identifiers in the cluster
242
   */
243
  public ISet<Identifier> getIdentifiers() {
244
      return identifiers;
245
      
246
  }
247

    
248
  /**
249
   * Get the index queue
250
   * @return the set of SystemMetadata to be indexed
251
   */
252
  public ISet<SystemMetadata> getIndexQueue() {
253
      return indexQueue;
254
      
255
  }
256
  
257
  /**
258
   * When Metacat changes the underlying store, we need to refresh the
259
   * in-memory representation of it.
260
   * @param guid
261
   */
262
  public void refreshSystemMetadataEntry(String guid) {
263
	Identifier identifier = new Identifier();
264
	identifier.setValue(guid);
265
	// force hazelcast to update system metadata in memory from the store
266
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
267
	HazelcastService.getInstance().getSystemMetadataMap().get(identifier);
268
  }
269

    
270
  public Lock getLock(String identifier) {
271
    
272
    Lock lock = null;
273
    
274
    try {
275
        lock = getInstance().getHazelcastInstance().getLock(identifier);
276
        
277
    } catch (RuntimeException e) {
278
        logMetacat.info("Couldn't get a lock for identifier " + 
279
            identifier + " !!");
280
    }
281
    return lock;
282
      
283
  }
284
  
285
  /**
286
   * Get the DataONE hazelcast node map
287
   * @return nodes - the hazelcast map of nodes
288
   */
289
//  public IMap<NodeReference, Node> getNodesMap() {
290
//	  return nodes;
291
//  }
292
  
293
  /**
294
   * Indicate whether or not this service is refreshable.
295
   *
296
   * @return refreshable - the boolean refreshable status
297
   */
298
  public boolean refreshable() {
299
    // TODO: Determine the consequences of restarting the Hazelcast instance
300
    // Set this to true if it's okay to drop from the cluster, lose the maps,
301
    // and start back up again
302
    return false;
303
    
304
  }
305
  
306
  /**
307
   * Stop the HazelcastService. When stopped, the service will no longer
308
   * respond to requests.
309
   */
310
  public void stop() throws ServiceException {
311
    
312
	  this.hzInstance.getLifecycleService().shutdown();
313
    
314
  }
315

    
316
  public HazelcastInstance getHazelcastInstance() {
317
      return this.hzInstance;
318
      
319
  }
320
  
321
  /**
322
   * Refresh the Hazelcast service by restarting it
323
   */
324
  @Override
325
  protected void doRefresh() throws ServiceException {
326

    
327
    // TODO: verify that the correct config file is still used
328
	  this.hzInstance.getLifecycleService().restart();
329
    
330
  }
331
  
332
  /**
333
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
334
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
335
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
336
	 * 
337
	 * @param event - The EntryEvent that occurred
338
	 */
339
	@Override
340
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
341
	  
342
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
343
	      event.getKey().getValue());
344
		// handle as update - that method will create if necessary
345
		entryUpdated(event);
346

    
347
	}
348

    
349
	/**
350
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
351
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
352
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
353
	 * 
354
	 * @param event - The EntryEvent that occurred
355
	 */
356
	@Override
357
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
358

    
359
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
360
          event.getKey().getValue());
361
      
362
	    // ensure identifiers are listed in the hzIdentifiers set
363
      if ( !identifiers.contains(event.getKey()) ) {
364
          identifiers.add(event.getKey());
365
      }
366
	  
367
	}
368
	
369
	/**
370
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
371
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
372
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
373
	 * 
374
	 * @param event - The EntryEvent that occurred
375
	 */
376
	@Override
377
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
378
		
379
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
380
        event.getKey().getValue());
381

    
382
	  // we typically don't remove objects in Metacat, but can remove System Metadata
383
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
384

    
385
    // keep the hzIdentifiers set in sync with the systemmetadata table
386
    if ( identifiers.contains(event.getKey()) ) {
387
        identifiers.remove(event.getKey());
388
        
389
    }
390

    
391
	}
392
	
393
	/**
394
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
395
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
396
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
397
	 * 
398
	 * @param event - The EntryEvent that occurred
399
	 */
400
	@Override
401
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
402

    
403
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
404
		PartitionService partitionService = this.hzInstance.getPartitionService();
405
		Partition partition = partitionService.getPartition(event.getKey());
406
		Member ownerMember = partition.getOwner();
407
		SystemMetadata sysmeta = event.getValue();
408
		if (!ownerMember.localMember()) {
409
			if (sysmeta == null) {
410
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
411
				sysmeta = getSystemMetadataMap().get(event.getKey());
412
				if (sysmeta == null) {
413
					// this is a problem
414
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
415
					// TODO: should probably return at this point since the save will fail
416
				}
417
			}
418
			// need to pull the entry into the local store
419
			saveLocally(event.getValue());
420
		}
421

    
422
		// ensure identifiers are listed in the hzIdentifiers set
423
		if (!identifiers.contains(event.getKey())) {
424
			identifiers.add(event.getKey());
425
		}
426

    
427
	}
428
	
429
	/**
430
	 * Save SystemMetadata to local store if needed
431
	 * @param sm
432
	 */
433
	private void saveLocally(SystemMetadata sm) {
434
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
435
		try {
436

    
437
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
438

    
439
		} catch (McdbDocNotFoundException e) {
440
			logMetacat.error("Could not save System Metadata to local store.", e);
441
			
442
		} catch (SQLException e) {
443
	      logMetacat.error("Could not save System Metadata to local store.", e);
444
	      
445
	    } catch (InvalidSystemMetadata e) {
446
	        logMetacat.error("Could not save System Metadata to local store.", e);
447
	        
448
	    }
449
	}
450
	
451
	/**
452
	 * Checks the local backing store for missing SystemMetadata,
453
	 * retrieves those entries from the shared map if they exist,
454
	 * and saves them locally.
455
	 */
456
	private void synchronizeLocalStore() {
457
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
458
		if (localIds != null) {
459
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
460
			for (String localId: localIds) {
461
				logMetacat.debug("Processing system metadata for localId: " + localId);
462
				try {
463
					String docid = DocumentUtil.getSmartDocId(localId);
464
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
465
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
466
					logMetacat.debug("Found mapped guid: " + guid);
467
					Identifier pid = new Identifier();
468
					pid.setValue(guid);
469
					SystemMetadata sm = systemMetadata.get(pid);
470
					logMetacat.debug("Found shared system metadata for guid: " + guid);
471
					saveLocally(sm);
472
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
473
				} catch (Exception e) {
474
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
475
				}
476
			}
477
		}
478
	}
479
	
480
	
481
	/**
482
	 * Make sure we have a copy of every entry in the shared map.
483
	 * We use lazy loading and therefore the CNs may not all be in sync when one
484
	 * comes back online after an extended period of being offline
485
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
486
	 * and makes sure each one is present on the shared map.
487
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in 
488
	 * null values where the owner does not have a complete backing store.
489
	 * This will be an expensive routine and should be run in a background process so that
490
	 * the server can continue to service other requests during the synch
491
	 * @throws Exception
492
	 */
493
	private void resynchToRemote() {
494
		
495
		// the local identifiers not already present in the shared map
496
		Set<Identifier> localIdKeys = loadAllKeys();
497
		
498
		//  the PIDs missing locally
499
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
500
				
501
		// only contribute PIDs that are not already shared
502
		Iterator<Identifier> idIter = identifiers.iterator();
503
		int processedCount = 0;
504
		while (idIter.hasNext()) {
505
			Identifier pid = idIter.next();
506
			if (localIdKeys.contains(pid)) {
507
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
508
				localIdKeys.remove(pid);
509
			} else {
510
				// we don't have this locally, so we should try to get it
511
				missingIdKeys.add(pid);
512
			}
513
			processedCount++;
514
		}
515
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
516

    
517
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
518

    
519
		//identifiers.addAll(idKeys);
520
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
521
		for (Identifier key: localIdKeys) {
522
			if (!identifiers.contains(key)) {
523
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
524
				identifiers.add(key);
525
			}
526
		}
527
		logMetacat.warn("Initialized identifiers with missing local keys");
528
		
529
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
530
		
531
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
532
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
533
		while (missingPids.hasNext()) {
534
			Identifier pid = missingPids.next();
535
			// publish that we need this SM entry
536
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
537
			missingIdentifiers.add(pid);
538
		}
539
		
540
	}
541
	
542
	public void resynchInThread() {
543
		logMetacat.debug("launching system metadata resynch in a thread");
544
		ExecutorService executor = Executors.newSingleThreadExecutor();
545
		executor.execute(new Runnable() {
546
			@Override
547
			public void run() {
548
				try {
549
					// this is a push mechanism
550
					resynchToRemote();
551
				} catch (Exception e) {
552
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
553
				}
554
			}
555
		});
556
		executor.shutdown();
557
	}
558

    
559
	/**
560
	 * When there is missing SystemMetadata on the local member,
561
	 * we retrieve it from the shared map and add it to the local
562
	 * backing store for safe keeping.
563
	 */
564
	@Override
565
	public void memberAdded(MembershipEvent event) {
566
		Member member = event.getMember();
567
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
568
		boolean isLocal = member.localMember();
569
		if (isLocal) {
570
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
571
			synchronizeLocalStore();
572
		}
573
	}
574

    
575
	@Override
576
	public void memberRemoved(MembershipEvent event) {
577
		// TODO Auto-generated method stub
578
		
579
	}
580

    
581
	/**
582
	 * In cases where this cluster is paused, we want to 
583
	 * check that the local store accurately reflects the shared 
584
	 * SystemMetadata map
585
	 * @param event
586
	 */
587
	@Override
588
	public void stateChanged(LifecycleEvent event) {
589
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
590
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
591
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
592
			synchronizeLocalStore();
593
		}
594
	}
595

    
596
	/**
597
	 * Load all System Metadata keys from the backing store
598
	 * @return set of pids
599
	 */
600
	private Set<Identifier> loadAllKeys() {
601

    
602
		Set<Identifier> pids = new HashSet<Identifier>();
603
		
604
		try {
605
			
606
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
607
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
608
//					null, //startTime, 
609
//					null, //endTime, 
610
//					null, //objectFormatId, 
611
//					false, //replicaStatus, 
612
//					0, //start, 
613
//					-1 //count
614
//					);
615
//			for (ObjectInfo o: ol.getObjectInfoList()) {
616
//				Identifier pid = o.getIdentifier();
617
//				if ( !pids.contains(pid) ) {
618
//					pids.add(pid);
619
//				}				
620
//			}
621
			
622
			// ALTERNATIVE method: look up all the Identifiers from the table
623
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
624
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
625
			for (String guid: guids){
626
				Identifier pid = new Identifier();
627
				pid.setValue(guid);
628
				pids.add(pid);
629
			}
630
			
631
		} catch (Exception e) {
632
			throw new RuntimeException(e.getMessage(), e);
633
			
634
		}
635
		
636
		return pids;
637
	}
638

    
639
	/**
640
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
641
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
642
	 * 
643
	 * @param pid   the identifier of the event
644
	 */
645
	@Override
646
	public void itemAdded(ItemEvent<Identifier> event) {
647
		
648
		Identifier pid = (Identifier) event.getItem();
649
		// publish the SM for the pid if we have it locally
650
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
651
		
652
		// lock this event, only if we have a local copy to contribute
653
		ILock lock = null;
654
		try {
655
			// look up the local copy of the SM
656
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
657
			if (sm != null) {
658
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
659
				
660
				if ( lock.tryLock() ) {
661
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
662
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
663
			        systemMetadata.put(pid, sm);
664
			        
665
			        // remove the entry since we processed it
666
			        missingIdentifiers.remove(pid);
667
			      
668
				  } else {
669
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
670
				  }
671
			} else {
672
				// can't help here
673
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
674
			}
675
		} catch (Exception e) {
676
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
677
		} finally {
678
			if ( lock != null ) {
679
				lock.unlock();
680
			}
681
        }
682
	}
683

    
684
	/**
685
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
686
   * 
687
   * @param pid   the identifier of the event
688
   */
689
	@Override
690
	public void itemRemoved(ItemEvent<Identifier> event) {
691
		// do nothing since someone probably handled the wanted PID
692
		
693
	}
694

    
695
}
(1-1/3)