Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-08-02 17:50:05 -0700 (Thu, 02 Aug 2012) $'
10
 * '$Revision: 7340 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.List;
34
import java.util.Set;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37
import java.util.concurrent.locks.Lock;
38

    
39
import org.apache.log4j.Logger;
40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.Node;
43
import org.dataone.service.types.v1.NodeReference;
44
import org.dataone.service.types.v1.SystemMetadata;
45

    
46
import com.hazelcast.config.Config;
47
import com.hazelcast.config.FileSystemXmlConfig;
48
import com.hazelcast.core.EntryEvent;
49
import com.hazelcast.core.EntryListener;
50
import com.hazelcast.core.Hazelcast;
51
import com.hazelcast.core.HazelcastInstance;
52
import com.hazelcast.core.IMap;
53
import com.hazelcast.core.ISet;
54
import com.hazelcast.core.ItemListener;
55
import com.hazelcast.core.LifecycleEvent;
56
import com.hazelcast.core.LifecycleListener;
57
import com.hazelcast.core.Member;
58
import com.hazelcast.core.MembershipEvent;
59
import com.hazelcast.core.MembershipListener;
60
import com.hazelcast.partition.Partition;
61
import com.hazelcast.partition.PartitionService;
62

    
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.shared.BaseService;
67
import edu.ucsb.nceas.metacat.shared.ServiceException;
68
import edu.ucsb.nceas.metacat.util.DocumentUtil;
69
import edu.ucsb.nceas.utilities.FileUtil;
70
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
71
/**
72
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
73
 */
74
public class HazelcastService extends BaseService
75
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
76
  
77
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
78

    
79
/* The instance of the logging class */
80
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
81
  
82
  /* The singleton instance of the hazelcast service */
83
  private static HazelcastService hzService = null;
84
  
85
  /* The Hazelcast configuration */
86
  private Config hzConfig;
87
  
88
  /* The instance of the Hazelcast client */
89
//  private HazelcastClient hzClient;
90

    
91
  /* The name of the DataONE Hazelcast cluster group */
92
  private String groupName;
93

    
94
  /* The name of the DataONE Hazelcast cluster password */
95
  private String groupPassword;
96
  
97
  /* The name of the DataONE Hazelcast cluster IP addresses */
98
  private String addressList;
99
  
100
  /* The name of the node map */
101
  private String nodeMap;
102

    
103
  /* The name of the system metadata map */
104
  private String systemMetadataMap;
105
  
106
  /* The Hazelcast distributed task id generator namespace */
107
  private String taskIds;
108
  
109
  /* The Hazelcast distributed node map */
110
  private IMap<NodeReference, Node> nodes;
111

    
112
  /* The Hazelcast distributed system metadata map */
113
  private IMap<Identifier, SystemMetadata> systemMetadata;
114
  
115
  /* The name of the identifiers set */
116
  private String identifiersSet;
117
  
118
  /* The Hazelcast distributed identifiers set */
119
  private ISet<Identifier> identifiers;
120
  
121
  /* The Hazelcast distributed missing identifiers set */
122
  private ISet<Identifier> missingIdentifiers;
123

    
124
  private HazelcastInstance hzInstance;
125
      
126
  /*
127
   * Constructor: Creates an instance of the hazelcast service. Since
128
   * this uses a singleton pattern, use getInstance() to gain the instance.
129
   */
130
  private HazelcastService() {
131
    
132
    super();
133
    _serviceName="HazelcastService";
134
    
135
    try {
136
      init();
137
      
138
    } catch (ServiceException se) {
139
      logMetacat.error("There was a problem creating the HazelcastService. " +
140
                       "The error message was: " + se.getMessage());
141
      
142
    }
143
    
144
  }
145
  
146
  /**
147
   *  Get the instance of the HazelcastService that has been instantiated,
148
   *  or instantiate one if it has not been already.
149
   *
150
   * @return hazelcastService - The instance of the hazelcast service
151
   */
152
  public static HazelcastService getInstance(){
153
    
154
    if ( hzService == null ) {
155
      
156
      hzService = new HazelcastService();
157
      
158
    }
159
    return hzService;
160
  }
161
  
162
  /**
163
   * Initializes the Hazelcast service
164
   */
165
  public void init() throws ServiceException {
166
    
167
    logMetacat.debug("HazelcastService.init() called.");
168
    
169
	String configFileName = null;
170
	Config config = null;
171
	try {
172
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
173
		config = new FileSystemXmlConfig(configFileName);
174
	} catch (Exception e) {
175
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
176
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
177
		// make sure we have the config
178
		try {
179
			config = new FileSystemXmlConfig(configFileName);
180
		} catch (FileNotFoundException e1) {
181
			String msg = e.getMessage();
182
			logMetacat.error(msg);
183
			throw new ServiceException(msg);
184
		}
185
	}
186

    
187
	Hazelcast.init(config);
188
	this.hzInstance = Hazelcast.getDefaultInstance();
189
  
190
  	logMetacat.debug("Initialized hzInstance");
191

    
192
    // Get configuration properties on instantiation
193
    try {
194
      groupName = 
195
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
196
      groupPassword = 
197
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
198
      addressList = 
199
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
200
      systemMetadataMap = 
201
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
202
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
203
//    nodeMap = 
204
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
205
      // Become a DataONE-process cluster client
206
//      String[] addresses = addressList.split(",");
207
//      hzClient = 
208
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
209
//      nodes = hzClient.getMap(nodeMap);
210
      
211
      // Get a reference to the shared system metadata map as a cluster member
212
      // NOTE: this loads the map from the backing store and can take a long time for large collections
213
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
214
      
215
      logMetacat.debug("Initialized systemMetadata");
216

    
217
      // Get a reference to the shared identifiers set as a cluster member
218
      // NOTE: this takes a long time to complete
219
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
220
      identifiers = Hazelcast.getSet(identifiersSet);
221
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
222
      
223
      // for publishing the "PIDs Wanted" list
224
      missingIdentifiers = Hazelcast.getSet("hzMissingIdentifiersSet");
225
      
226
      missingIdentifiers.addItemListener(this, true);
227
      
228
      // Listen for changes to the system metadata map
229
      systemMetadata.addEntryListener(this, true);
230
      
231
      // Listen for members added/removed
232
      hzInstance.getCluster().addMembershipListener(this);
233
      
234
      // Listen for lifecycle state changes
235
      hzInstance.getLifecycleService().addLifecycleListener(this);
236
      
237
    } catch (PropertyNotFoundException e) {
238

    
239
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
240
        "The error message was: " + e.getMessage();
241
      logMetacat.error(msg);
242
      
243
    }
244
    
245
    // make sure we have all metadata locally
246
    try {
247
    	// synch on restart
248
        resynchInThread();
249
	} catch (Exception e) {
250
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
251
		logMetacat.error(msg, e);
252
	}
253
        
254
  }
255
  
256
  /**
257
   * Get the system metadata map
258
   * 
259
   * @return systemMetadata - the hazelcast map of system metadata
260
   * @param identifier - the identifier of the object as a string
261
   */
262
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
263
	  return systemMetadata;
264
  }
265
  
266
  /**
267
   * Get the identifiers set
268
   * @return identifiers - the set of unique DataONE identifiers in the cluster
269
   */
270
  public ISet<Identifier> getIdentifiers() {
271
      return identifiers;
272
      
273
  }
274

    
275
  /**
276
   * When Metacat changes the underlying store, we need to refresh the
277
   * in-memory representation of it.
278
   * @param guid
279
   */
280
  public void refreshSystemMetadataEntry(String guid) {
281
	Identifier identifier = new Identifier();
282
	identifier.setValue(guid);
283
	// force hazelcast to update system metadata in memory from the store
284
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
285
	
286
  }
287

    
288
  public Lock getLock(String identifier) {
289
    
290
    Lock lock = null;
291
    
292
    try {
293
        lock = getInstance().getHazelcastInstance().getLock(identifier);
294
        
295
    } catch (RuntimeException e) {
296
        logMetacat.info("Couldn't get a lock for identifier " + 
297
            identifier + " !!");
298
    }
299
    return lock;
300
      
301
  }
302
  
303
  /**
304
   * Get the DataONE hazelcast node map
305
   * @return nodes - the hazelcast map of nodes
306
   */
307
//  public IMap<NodeReference, Node> getNodesMap() {
308
//	  return nodes;
309
//  }
310
  
311
  /**
312
   * Indicate whether or not this service is refreshable.
313
   *
314
   * @return refreshable - the boolean refreshable status
315
   */
316
  public boolean refreshable() {
317
    // TODO: Determine the consequences of restarting the Hazelcast instance
318
    // Set this to true if it's okay to drop from the cluster, lose the maps,
319
    // and start back up again
320
    return false;
321
    
322
  }
323
  
324
  /**
325
   * Stop the HazelcastService. When stopped, the service will no longer
326
   * respond to requests.
327
   */
328
  public void stop() throws ServiceException {
329
    
330
    Hazelcast.getLifecycleService().shutdown();
331
    
332
  }
333

    
334
  public HazelcastInstance getHazelcastInstance() {
335
      return this.hzInstance;
336
      
337
  }
338
  
339
  /**
340
   * Refresh the Hazelcast service by restarting it
341
   */
342
  @Override
343
  protected void doRefresh() throws ServiceException {
344

    
345
    // TODO: verify that the correct config file is still used
346
    Hazelcast.getLifecycleService().restart();
347
    
348
  }
349
  
350
  /**
351
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
352
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
353
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
354
	 * 
355
	 * @param event - The EntryEvent that occurred
356
	 */
357
	@Override
358
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
359
	  
360
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
361
	      event.getKey().getValue());
362
		// handle as update - that method will create if necessary
363
		entryUpdated(event);
364

    
365
	}
366

    
367
	/**
368
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
369
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
370
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
371
	 * 
372
	 * @param event - The EntryEvent that occurred
373
	 */
374
	@Override
375
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
376

    
377
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
378
          event.getKey().getValue());
379
      
380
	    // ensure identifiers are listed in the hzIdentifiers set
381
      if ( !identifiers.contains(event.getKey()) ) {
382
          identifiers.add(event.getKey());
383
      }
384
	  
385
	}
386
	
387
	/**
388
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
389
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
390
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
391
	 * 
392
	 * @param event - The EntryEvent that occurred
393
	 */
394
	@Override
395
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
396
		
397
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
398
        event.getKey().getValue());
399

    
400
	  // we typically don't remove objects in Metacat, but can remove System Metadata
401
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
402

    
403
    // keep the hzIdentifiers set in sync with the systemmetadata table
404
    if ( identifiers.contains(event.getKey()) ) {
405
        identifiers.remove(event.getKey());
406
        
407
    }
408

    
409
	}
410
	
411
	/**
412
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
413
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
414
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
415
	 * 
416
	 * @param event - The EntryEvent that occurred
417
	 */
418
	@Override
419
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
420

    
421
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
422
		PartitionService partitionService = Hazelcast.getPartitionService();
423
		Partition partition = partitionService.getPartition(event.getKey());
424
		Member ownerMember = partition.getOwner();
425
		SystemMetadata sysmeta = event.getValue();
426
		if (!ownerMember.localMember()) {
427
			if (sysmeta == null) {
428
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
429
				sysmeta = getSystemMetadataMap().get(event.getKey());
430
				if (sysmeta == null) {
431
					// this is a problem
432
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
433
					// TODO: should probably return at this point since the save will fail
434
				}
435
			}
436
			// need to pull the entry into the local store
437
			saveLocally(event.getValue());
438
		}
439

    
440
		// ensure identifiers are listed in the hzIdentifiers set
441
		if (!identifiers.contains(event.getKey())) {
442
			identifiers.add(event.getKey());
443
		}
444

    
445
	}
446
	
447
	/**
448
	 * Save SystemMetadata to local store if needed
449
	 * @param sm
450
	 */
451
	private void saveLocally(SystemMetadata sm) {
452
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
453
		try {
454

    
455
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
456

    
457
		} catch (McdbDocNotFoundException e) {
458
			logMetacat.error("Could not save System Metadata to local store.", e);
459
			
460
		} catch (SQLException e) {
461
	      logMetacat.error("Could not save System Metadata to local store.", e);
462
	      
463
	    } catch (InvalidSystemMetadata e) {
464
	        logMetacat.error("Could not save System Metadata to local store.", e);
465
	        
466
	    }
467
	}
468
	
469
	/**
470
	 * Checks the local backing store for missing SystemMetadata,
471
	 * retrieves those entries from the shared map if they exist,
472
	 * and saves them locally.
473
	 */
474
	private void synchronizeLocalStore() {
475
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
476
		if (localIds != null) {
477
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
478
			for (String localId: localIds) {
479
				logMetacat.debug("Processing system metadata for localId: " + localId);
480
				try {
481
					String docid = DocumentUtil.getSmartDocId(localId);
482
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
483
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
484
					logMetacat.debug("Found mapped guid: " + guid);
485
					Identifier pid = new Identifier();
486
					pid.setValue(guid);
487
					SystemMetadata sm = systemMetadata.get(pid);
488
					logMetacat.debug("Found shared system metadata for guid: " + guid);
489
					saveLocally(sm);
490
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
491
				} catch (Exception e) {
492
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
493
				}
494
			}
495
		}
496
	}
497
	
498
	
499
	/**
500
	 * Make sure we have a copy of every entry in the shared map.
501
	 * We use lazy loading and therefore the CNs may not all be in sync when one
502
	 * comes back online after an extended period of being offline
503
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
504
	 * and makes sure each one is present on the shared map.
505
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in 
506
	 * null values where the owner does not have a complete backing store.
507
	 * This will be an expensive routine and should be run in a background process so that
508
	 * the server can continue to service other requests during the synch
509
	 * @throws Exception
510
	 */
511
	private void resynchToRemote() {
512
		
513
		// the local identifiers not already present in the shared map
514
		Set<Identifier> localIdKeys = loadAllKeys();
515
		
516
		//  the PIDs missing locally
517
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
518
				
519
		// only contribute PIDs that are not already shared
520
		Iterator<Identifier> idIter = identifiers.iterator();
521
		while (idIter.hasNext()) {
522
			Identifier pid = idIter.next();
523
			if (localIdKeys.contains(pid)) {
524
				logMetacat.warn("Shared pid is already in local identifier set: " + pid.getValue());
525
				localIdKeys.remove(pid);
526
			} else {
527
				// we don't have this locally, so we should try to get it
528
				missingIdKeys.add(pid);
529
			}
530
		}
531
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
532

    
533
		//identifiers.addAll(idKeys);
534
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
535
		for (Identifier key: localIdKeys) {
536
			if (!identifiers.contains(key)) {
537
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
538
				identifiers.add(key);
539
			}
540
		}
541
		logMetacat.warn("Initialized identifiers with missing local keys");
542
		
543
		logMetacat.warn("Processing missing SystemMetadata for shared pid count: " + identifiers.size());
544
		
545
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
546
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
547
		while (missingPids.hasNext()) {
548
			Identifier pid = missingPids.next();
549
			logMetacat.trace("Processing missing pid: " + pid.getValue());
550
			SystemMetadata sm = systemMetadata.get(pid);
551
			if (sm == null)  {
552
				logMetacat.warn("Shared SystemMetadata is null for pid: " + pid.getValue());
553

    
554
				// publish that we need this SM entry
555
				logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
556
				missingIdentifiers.add(pid);
557
			} else {
558
				// or just republish the shared non-null entry (all SM listeners will then get it and save it locally)
559
				logMetacat.debug("Putting missing pid's SystemMetadata to shared map: " + pid.getValue());
560
				systemMetadata.put(pid, sm);
561
			}
562
		}
563
		
564
	}
565
	
566
	private void resynchInThread() {
567
		logMetacat.debug("launching system metadata resynch in a thread");
568
		ExecutorService executor = Executors.newSingleThreadExecutor();
569
		executor.execute(new Runnable() {
570
			@Override
571
			public void run() {
572
				try {
573
					// this is a push mechanism
574
					resynchToRemote();
575
				} catch (Exception e) {
576
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
577
				}
578
			}
579
		});
580
		executor.shutdown();
581
	}
582

    
583
	/**
584
	 * When there is missing SystemMetadata on the local member,
585
	 * we retrieve it from the shared map and add it to the local
586
	 * backing store for safe keeping.
587
	 */
588
	@Override
589
	public void memberAdded(MembershipEvent event) {
590
		Member member = event.getMember();
591
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
592
		boolean isLocal = member.localMember();
593
		if (isLocal) {
594
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
595
			synchronizeLocalStore();
596
		}
597
	}
598

    
599
	@Override
600
	public void memberRemoved(MembershipEvent event) {
601
		// TODO Auto-generated method stub
602
		
603
	}
604

    
605
	/**
606
	 * In cases where this cluster is paused, we want to 
607
	 * check that the local store accurately reflects the shared 
608
	 * SystemMetadata map
609
	 * @param event
610
	 */
611
	@Override
612
	public void stateChanged(LifecycleEvent event) {
613
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
614
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
615
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
616
			synchronizeLocalStore();
617
		}
618
	}
619

    
620
	/**
621
	 * Load all System Metadata keys from the backing store
622
	 * @return set of pids
623
	 */
624
	private Set<Identifier> loadAllKeys() {
625

    
626
		Set<Identifier> pids = new HashSet<Identifier>();
627
		
628
		try {
629
			
630
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
631
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
632
//					null, //startTime, 
633
//					null, //endTime, 
634
//					null, //objectFormatId, 
635
//					false, //replicaStatus, 
636
//					0, //start, 
637
//					-1 //count
638
//					);
639
//			for (ObjectInfo o: ol.getObjectInfoList()) {
640
//				Identifier pid = o.getIdentifier();
641
//				if ( !pids.contains(pid) ) {
642
//					pids.add(pid);
643
//				}				
644
//			}
645
			
646
			// ALTERNATIVE method: look up all the Identifiers from the table
647
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
648
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
649
			for (String guid: guids){
650
				Identifier pid = new Identifier();
651
				pid.setValue(guid);
652
				pids.add(pid);
653
			}
654
			
655
		} catch (Exception e) {
656
			throw new RuntimeException(e.getMessage(), e);
657
			
658
		}
659
		
660
		return pids;
661
	}
662

    
663
	@Override
664
	public void itemAdded(Identifier pid) {
665
		// publish the SM for the pid if we have it locally
666
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
667
		try {
668
			// look up the local copy of the SM
669
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
670
			if (sm != null) {
671
				// "publish" the system metadata to the shared map since it showed up on the missing queue
672
				logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
673
				systemMetadata.put(pid, sm);
674
				
675
				// remove the entry since we processed it
676
				missingIdentifiers.remove(pid);
677
			} else {
678
				logMetacat.warn("Local SystemMetadata was null for pid: " + pid.getValue());
679
			}
680
			
681
		} catch (Exception e) {
682
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
683
		}
684
		
685
	}
686

    
687
	@Override
688
	public void itemRemoved(Identifier arg0) {
689
		// do nothing since someone probably handled the wanted PID
690
		
691
	}
692

    
693
}
(1-1/3)