Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2013-07-03 00:03:10 -0700 (Wed, 03 Jul 2013) $'
10
 * '$Revision: 7841 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.SystemMetadata;
43

    
44
import com.hazelcast.config.Config;
45
import com.hazelcast.config.FileSystemXmlConfig;
46
import com.hazelcast.core.EntryEvent;
47
import com.hazelcast.core.EntryListener;
48
import com.hazelcast.core.Hazelcast;
49
import com.hazelcast.core.HazelcastInstance;
50
import com.hazelcast.core.ILock;
51
import com.hazelcast.core.IMap;
52
import com.hazelcast.core.ISet;
53
import com.hazelcast.core.ItemEvent;
54
import com.hazelcast.core.ItemListener;
55
import com.hazelcast.core.LifecycleEvent;
56
import com.hazelcast.core.LifecycleListener;
57
import com.hazelcast.core.Member;
58
import com.hazelcast.core.MembershipEvent;
59
import com.hazelcast.core.MembershipListener;
60
import com.hazelcast.partition.Partition;
61
import com.hazelcast.partition.PartitionService;
62

    
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.common.index.event.IndexEvent;
66
import edu.ucsb.nceas.metacat.index.IndexEventEntryListener;
67
import edu.ucsb.nceas.metacat.properties.PropertyService;
68
import edu.ucsb.nceas.metacat.shared.BaseService;
69
import edu.ucsb.nceas.metacat.shared.ServiceException;
70
import edu.ucsb.nceas.metacat.util.DocumentUtil;
71
import edu.ucsb.nceas.utilities.FileUtil;
72
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
73
/**
74
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
75
 */
76
public class HazelcastService extends BaseService
77
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
78
  
79
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
80

    
81
  private static final String MISSING_PID_PREFIX = "missing-";
82

    
83
/* The instance of the logging class */
84
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
85
  
86
  /* The singleton instance of the hazelcast service */
87
  private static HazelcastService hzService = null;
88
  
89
  /* The Hazelcast configuration */
90
  private Config hzConfig;
91
  
92
  /* The name of the system metadata map */
93
  private String systemMetadataMap;
94
  
95
  /* The Hazelcast distributed system metadata map */
96
  private IMap<Identifier, SystemMetadata> systemMetadata;
97
  
98
  /* The name of the identifiers set */
99
  private String identifiersSet;
100
  
101
  /* The Hazelcast distributed identifiers set */
102
  private ISet<Identifier> identifiers;
103
  
104
  /* The Hazelcast distributed missing identifiers set */
105
  private ISet<Identifier> missingIdentifiers;
106
  
107
  /* The Hazelcast distributed index queue */
108
  private String hzIndexQueue;
109
  private ISet<SystemMetadata> indexQueue;
110
  
111
  /* The Hazelcast distributed index event map */
112
  private String hzIndexEventMap;
113
  private IMap<Identifier, IndexEvent> indexEventMap;
114

    
115
  private HazelcastInstance hzInstance;
116
      
117
  /*
118
   * Constructor: Creates an instance of the hazelcast service. Since
119
   * this uses a singleton pattern, use getInstance() to gain the instance.
120
   */
121
  private HazelcastService() {
122
    
123
    super();
124
    _serviceName="HazelcastService";
125
    
126
    try {
127
      init();
128
      
129
    } catch (ServiceException se) {
130
      logMetacat.error("There was a problem creating the HazelcastService. " +
131
                       "The error message was: " + se.getMessage());
132
      
133
    }
134
    
135
  }
136
  
137
  /**
138
   *  Get the instance of the HazelcastService that has been instantiated,
139
   *  or instantiate one if it has not been already.
140
   *
141
   * @return hazelcastService - The instance of the hazelcast service
142
   */
143
  public static HazelcastService getInstance(){
144
    
145
    if ( hzService == null ) {
146
      
147
      hzService = new HazelcastService();
148
      
149
    }
150
    return hzService;
151
  }
152
  
153
  /**
154
   * Initializes the Hazelcast service
155
   */
156
  public void init() throws ServiceException {
157
    
158
    logMetacat.debug("HazelcastService.init() called.");
159
    
160
	String configFileName = null;
161
	try {
162
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
163
		hzConfig = new FileSystemXmlConfig(configFileName);
164
	} catch (Exception e) {
165
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
166
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
167
		// make sure we have the config
168
		try {
169
			hzConfig = new FileSystemXmlConfig(configFileName);
170
		} catch (FileNotFoundException e1) {
171
			String msg = e.getMessage();
172
			logMetacat.error(msg);
173
			throw new ServiceException(msg);
174
		}
175
	}
176

    
177
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
178
  
179
  	logMetacat.debug("Initialized hzInstance");
180

    
181
    // Get configuration properties on instantiation
182
    try {
183
      systemMetadataMap = 
184
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
185
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
186

    
187
      // Get a reference to the shared system metadata map as a cluster member
188
      // NOTE: this loads the map from the backing store and can take a long time for large collections
189
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
190
      
191
      logMetacat.debug("Initialized systemMetadata");
192

    
193
      // Get a reference to the shared identifiers set as a cluster member
194
      // NOTE: this takes a long time to complete
195
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
196
      identifiers = this.hzInstance.getSet(identifiersSet);
197
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
198
      
199
      // for publishing the "PIDs Wanted" list
200
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
201
      
202
      missingIdentifiers.addItemListener(this, true);
203

    
204
      // for index tasks
205
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
206
      indexQueue = this.hzInstance.getSet(hzIndexQueue);
207

    
208
      // for index events (failures)
209
      hzIndexEventMap = PropertyService.getProperty("index.hazelcast.indexeventmap");
210
      indexEventMap = this.hzInstance.getMap(hzIndexEventMap);
211
      
212
      // Listen for changes to the system metadata map
213
      systemMetadata.addEntryListener(this, true);
214
      
215
      // Listen for members added/removed
216
      hzInstance.getCluster().addMembershipListener(this);
217
      
218
      // Listen for lifecycle state changes
219
      hzInstance.getLifecycleService().addLifecycleListener(this);
220
      
221
    } catch (PropertyNotFoundException e) {
222

    
223
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
224
        "The error message was: " + e.getMessage();
225
      logMetacat.error(msg);
226
      
227
    }
228
    
229
    // make sure we have all metadata locally
230
    try {
231
    	// synch on restart
232
        resynchInThread();
233
	} catch (Exception e) {
234
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
235
		logMetacat.error(msg, e);
236
	}
237
        
238
  }
239
  
240
  /**
241
   * Get the system metadata map
242
   * 
243
   * @return systemMetadata - the hazelcast map of system metadata
244
   * @param identifier - the identifier of the object as a string
245
   */
246
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
247
	  return systemMetadata;
248
  }
249
  
250
  /**
251
   * Get the identifiers set
252
   * @return identifiers - the set of unique DataONE identifiers in the cluster
253
   */
254
  public ISet<Identifier> getIdentifiers() {
255
      return identifiers;
256
      
257
  }
258

    
259
  /**
260
   * Get the index queue
261
   * @return the set of SystemMetadata to be indexed
262
   */
263
  public ISet<SystemMetadata> getIndexQueue() {
264
      return indexQueue;
265
  }
266
  
267
  /**
268
   * Get the index event map
269
   * @return indexEventMap - the hazelcast map of index events
270
   */
271
  public IMap<Identifier, IndexEvent> getIndexEventMap() {
272
	  return indexEventMap;
273
  }
274
  
275
  /**
276
   * When Metacat changes the underlying store, we need to refresh the
277
   * in-memory representation of it.
278
   * @param guid
279
   */
280
  public void refreshSystemMetadataEntry(String guid) {
281
	Identifier identifier = new Identifier();
282
	identifier.setValue(guid);
283
	// force hazelcast to update system metadata in memory from the store
284
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
285
  }
286

    
287
  public Lock getLock(String identifier) {
288
    
289
    Lock lock = null;
290
    
291
    try {
292
        lock = getInstance().getHazelcastInstance().getLock(identifier);
293
        
294
    } catch (RuntimeException e) {
295
        logMetacat.info("Couldn't get a lock for identifier " + 
296
            identifier + " !!");
297
    }
298
    return lock;
299
      
300
  }
301
  
302
  /**
303
   * Get the DataONE hazelcast node map
304
   * @return nodes - the hazelcast map of nodes
305
   */
306
//  public IMap<NodeReference, Node> getNodesMap() {
307
//	  return nodes;
308
//  }
309
  
310
  /**
311
   * Indicate whether or not this service is refreshable.
312
   *
313
   * @return refreshable - the boolean refreshable status
314
   */
315
  public boolean refreshable() {
316
    // TODO: Determine the consequences of restarting the Hazelcast instance
317
    // Set this to true if it's okay to drop from the cluster, lose the maps,
318
    // and start back up again
319
    return false;
320
    
321
  }
322
  
323
  /**
324
   * Stop the HazelcastService. When stopped, the service will no longer
325
   * respond to requests.
326
   */
327
  public void stop() throws ServiceException {
328
    
329
	  this.hzInstance.getLifecycleService().shutdown();
330
    
331
  }
332

    
333
  public HazelcastInstance getHazelcastInstance() {
334
      return this.hzInstance;
335
      
336
  }
337
  
338
  /**
339
   * Refresh the Hazelcast service by restarting it
340
   */
341
  @Override
342
  protected void doRefresh() throws ServiceException {
343

    
344
    // TODO: verify that the correct config file is still used
345
	  this.hzInstance.getLifecycleService().restart();
346
    
347
  }
348
  
349
  /**
350
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
351
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
352
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
353
	 * 
354
	 * @param event - The EntryEvent that occurred
355
	 */
356
	@Override
357
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
358
	  
359
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
360
	      event.getKey().getValue());
361
		// handle as update - that method will create if necessary
362
		entryUpdated(event);
363

    
364
	}
365

    
366
	/**
367
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
368
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
369
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
370
	 * 
371
	 * @param event - The EntryEvent that occurred
372
	 */
373
	@Override
374
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
375

    
376
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
377
          event.getKey().getValue());
378
      
379
	    // ensure identifiers are listed in the hzIdentifiers set
380
      if ( !identifiers.contains(event.getKey()) ) {
381
          identifiers.add(event.getKey());
382
      }
383
	  
384
	}
385
	
386
	/**
387
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
388
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
389
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
390
	 * 
391
	 * @param event - The EntryEvent that occurred
392
	 */
393
	@Override
394
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
395
		
396
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
397
        event.getKey().getValue());
398

    
399
	  // we typically don't remove objects in Metacat, but can remove System Metadata
400
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
401

    
402
    // keep the hzIdentifiers set in sync with the systemmetadata table
403
    if ( identifiers.contains(event.getKey()) ) {
404
        identifiers.remove(event.getKey());
405
        
406
    }
407

    
408
	}
409
	
410
	/**
411
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
412
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
413
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
414
	 * 
415
	 * @param event - The EntryEvent that occurred
416
	 */
417
	@Override
418
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
419

    
420
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
421
		PartitionService partitionService = this.hzInstance.getPartitionService();
422
		Partition partition = partitionService.getPartition(event.getKey());
423
		Member ownerMember = partition.getOwner();
424
		SystemMetadata sysmeta = event.getValue();
425
		if (!ownerMember.localMember()) {
426
			if (sysmeta == null) {
427
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
428
				sysmeta = getSystemMetadataMap().get(event.getKey());
429
				if (sysmeta == null) {
430
					// this is a problem
431
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
432
					// TODO: should probably return at this point since the save will fail
433
				}
434
			}
435
			// need to pull the entry into the local store
436
			saveLocally(event.getValue());
437
		}
438

    
439
		// ensure identifiers are listed in the hzIdentifiers set
440
		if (!identifiers.contains(event.getKey())) {
441
			identifiers.add(event.getKey());
442
		}
443

    
444
	}
445
	
446
	/**
447
	 * Save SystemMetadata to local store if needed
448
	 * @param sm
449
	 */
450
	private void saveLocally(SystemMetadata sm) {
451
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
452
		try {
453

    
454
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
455

    
456
		} catch (McdbDocNotFoundException e) {
457
			logMetacat.error("Could not save System Metadata to local store.", e);
458
			
459
		} catch (SQLException e) {
460
	      logMetacat.error("Could not save System Metadata to local store.", e);
461
	      
462
	    } catch (InvalidSystemMetadata e) {
463
	        logMetacat.error("Could not save System Metadata to local store.", e);
464
	        
465
	    }
466
	}
467
	
468
	/**
469
	 * Checks the local backing store for missing SystemMetadata,
470
	 * retrieves those entries from the shared map if they exist,
471
	 * and saves them locally.
472
	 */
473
	private void synchronizeLocalStore() {
474
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
475
		if (localIds != null) {
476
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
477
			for (String localId: localIds) {
478
				logMetacat.debug("Processing system metadata for localId: " + localId);
479
				try {
480
					String docid = DocumentUtil.getSmartDocId(localId);
481
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
482
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
483
					logMetacat.debug("Found mapped guid: " + guid);
484
					Identifier pid = new Identifier();
485
					pid.setValue(guid);
486
					SystemMetadata sm = systemMetadata.get(pid);
487
					logMetacat.debug("Found shared system metadata for guid: " + guid);
488
					saveLocally(sm);
489
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
490
				} catch (Exception e) {
491
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
492
				}
493
			}
494
		}
495
	}
496
	
497
	
498
	/**
499
	 * Make sure we have a copy of every entry in the shared map.
500
	 * We use lazy loading and therefore the CNs may not all be in sync when one
501
	 * comes back online after an extended period of being offline
502
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
503
	 * and makes sure each one is present on the shared map.
504
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in 
505
	 * null values where the owner does not have a complete backing store.
506
	 * This will be an expensive routine and should be run in a background process so that
507
	 * the server can continue to service other requests during the synch
508
	 * @throws Exception
509
	 */
510
	private void resynchToRemote() {
511
		
512
		// the local identifiers not already present in the shared map
513
		Set<Identifier> localIdKeys = loadAllKeys();
514
		
515
		//  the PIDs missing locally
516
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
517
				
518
		// only contribute PIDs that are not already shared
519
		Iterator<Identifier> idIter = identifiers.iterator();
520
		int processedCount = 0;
521
		while (idIter.hasNext()) {
522
			Identifier pid = idIter.next();
523
			if (localIdKeys.contains(pid)) {
524
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
525
				localIdKeys.remove(pid);
526
			} else {
527
				// we don't have this locally, so we should try to get it
528
				missingIdKeys.add(pid);
529
			}
530
			processedCount++;
531
		}
532
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
533

    
534
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
535

    
536
		//identifiers.addAll(idKeys);
537
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
538
		for (Identifier key: localIdKeys) {
539
			if (!identifiers.contains(key)) {
540
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
541
				identifiers.add(key);
542
			}
543
		}
544
		logMetacat.warn("Initialized identifiers with missing local keys");
545
		
546
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
547
		
548
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
549
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
550
		while (missingPids.hasNext()) {
551
			Identifier pid = missingPids.next();
552
			// publish that we need this SM entry
553
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
554
			missingIdentifiers.add(pid);
555
		}
556
		
557
	}
558
	
559
	public void resynchInThread() {
560
		logMetacat.debug("launching system metadata resynch in a thread");
561
		ExecutorService executor = Executors.newSingleThreadExecutor();
562
		executor.execute(new Runnable() {
563
			@Override
564
			public void run() {
565
				try {
566
					// this is a push mechanism
567
					resynchToRemote();
568
				} catch (Exception e) {
569
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
570
				}
571
			}
572
		});
573
		executor.shutdown();
574
	}
575

    
576
	/**
577
	 * When there is missing SystemMetadata on the local member,
578
	 * we retrieve it from the shared map and add it to the local
579
	 * backing store for safe keeping.
580
	 */
581
	@Override
582
	public void memberAdded(MembershipEvent event) {
583
		Member member = event.getMember();
584
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
585
		boolean isLocal = member.localMember();
586
		if (isLocal) {
587
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
588
			synchronizeLocalStore();
589
		}
590
	}
591

    
592
	@Override
593
	public void memberRemoved(MembershipEvent event) {
594
		// TODO Auto-generated method stub
595
		
596
	}
597

    
598
	/**
599
	 * In cases where this cluster is paused, we want to 
600
	 * check that the local store accurately reflects the shared 
601
	 * SystemMetadata map
602
	 * @param event
603
	 */
604
	@Override
605
	public void stateChanged(LifecycleEvent event) {
606
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
607
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
608
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
609
			synchronizeLocalStore();
610
		}
611
	}
612

    
613
	/**
614
	 * Load all System Metadata keys from the backing store
615
	 * @return set of pids
616
	 */
617
	private Set<Identifier> loadAllKeys() {
618

    
619
		Set<Identifier> pids = new HashSet<Identifier>();
620
		
621
		try {
622
			
623
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
624
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
625
//					null, //startTime, 
626
//					null, //endTime, 
627
//					null, //objectFormatId, 
628
//					false, //replicaStatus, 
629
//					0, //start, 
630
//					-1 //count
631
//					);
632
//			for (ObjectInfo o: ol.getObjectInfoList()) {
633
//				Identifier pid = o.getIdentifier();
634
//				if ( !pids.contains(pid) ) {
635
//					pids.add(pid);
636
//				}				
637
//			}
638
			
639
			// ALTERNATIVE method: look up all the Identifiers from the table
640
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
641
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
642
			for (String guid: guids){
643
				Identifier pid = new Identifier();
644
				pid.setValue(guid);
645
				pids.add(pid);
646
			}
647
			
648
		} catch (Exception e) {
649
			throw new RuntimeException(e.getMessage(), e);
650
			
651
		}
652
		
653
		return pids;
654
	}
655

    
656
	/**
657
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
658
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
659
	 * 
660
	 * @param pid   the identifier of the event
661
	 */
662
	@Override
663
	public void itemAdded(ItemEvent<Identifier> event) {
664
		
665
		Identifier pid = (Identifier) event.getItem();
666
		// publish the SM for the pid if we have it locally
667
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
668
		
669
		// lock this event, only if we have a local copy to contribute
670
		ILock lock = null;
671
		try {
672
			// look up the local copy of the SM
673
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
674
			if (sm != null) {
675
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
676
				
677
				if ( lock.tryLock() ) {
678
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
679
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
680
			        systemMetadata.put(pid, sm);
681
			        
682
			        // remove the entry since we processed it
683
			        missingIdentifiers.remove(pid);
684
			      
685
				  } else {
686
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
687
				  }
688
			} else {
689
				// can't help here
690
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
691
			}
692
		} catch (Exception e) {
693
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
694
		} finally {
695
			if ( lock != null ) {
696
				lock.unlock();
697
			}
698
        }
699
	}
700

    
701
	/**
702
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
703
   * 
704
   * @param pid   the identifier of the event
705
   */
706
	@Override
707
	public void itemRemoved(ItemEvent<Identifier> event) {
708
		// do nothing since someone probably handled the wanted PID
709
		
710
	}
711

    
712
}
(1-1/3)