Project

General

Profile

1 6398 cjones
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 *
8
 *   '$Author$'
9
 *     '$Date$'
10
 * '$Revision$'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26
27 6399 leinfelder
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28 6398 cjones
29 6489 leinfelder
import java.io.FileNotFoundException;
30 6894 cjones
import java.sql.SQLException;
31 7187 leinfelder
import java.util.HashSet;
32 7203 leinfelder
import java.util.Iterator;
33 7098 leinfelder
import java.util.List;
34 7187 leinfelder
import java.util.Set;
35 7201 leinfelder
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Executors;
37 6859 cjones
import java.util.concurrent.locks.Lock;
38 6459 leinfelder
39 6398 cjones
import org.apache.log4j.Logger;
40 6904 cjones
import org.dataone.service.exceptions.InvalidSystemMetadata;
41 6446 leinfelder
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.Node;
43
import org.dataone.service.types.v1.NodeReference;
44
import org.dataone.service.types.v1.SystemMetadata;
45 6398 cjones
46 6401 cjones
import com.hazelcast.config.Config;
47 6437 leinfelder
import com.hazelcast.config.FileSystemXmlConfig;
48 6446 leinfelder
import com.hazelcast.core.EntryEvent;
49
import com.hazelcast.core.EntryListener;
50 6398 cjones
import com.hazelcast.core.Hazelcast;
51 6711 cjones
import com.hazelcast.core.HazelcastInstance;
52 7343 cjones
import com.hazelcast.core.ILock;
53 6446 leinfelder
import com.hazelcast.core.IMap;
54 6887 cjones
import com.hazelcast.core.ISet;
55 7419 cjones
import com.hazelcast.core.ItemEvent;
56 7340 leinfelder
import com.hazelcast.core.ItemListener;
57 7107 leinfelder
import com.hazelcast.core.LifecycleEvent;
58
import com.hazelcast.core.LifecycleListener;
59 6446 leinfelder
import com.hazelcast.core.Member;
60 7098 leinfelder
import com.hazelcast.core.MembershipEvent;
61
import com.hazelcast.core.MembershipListener;
62 6446 leinfelder
import com.hazelcast.partition.Partition;
63
import com.hazelcast.partition.PartitionService;
64 6398 cjones
65 6446 leinfelder
import edu.ucsb.nceas.metacat.IdentifierManager;
66
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
67 6401 cjones
import edu.ucsb.nceas.metacat.properties.PropertyService;
68 6398 cjones
import edu.ucsb.nceas.metacat.shared.BaseService;
69
import edu.ucsb.nceas.metacat.shared.ServiceException;
70 7098 leinfelder
import edu.ucsb.nceas.metacat.util.DocumentUtil;
71 6489 leinfelder
import edu.ucsb.nceas.utilities.FileUtil;
72 6446 leinfelder
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
73 6398 cjones
/**
74
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
75
 */
76
public class HazelcastService extends BaseService
77 7340 leinfelder
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
78 6398 cjones
79 6490 leinfelder
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
80 6459 leinfelder
81 7343 cjones
  private static final String MISSING_PID_PREFIX = "missing-";
82
83 6459 leinfelder
/* The instance of the logging class */
84 6398 cjones
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
85
86
  /* The singleton instance of the hazelcast service */
87 6401 cjones
  private static HazelcastService hzService = null;
88 6398 cjones
89 6401 cjones
  /* The Hazelcast configuration */
90
  private Config hzConfig;
91 6398 cjones
92 6446 leinfelder
  /* The name of the system metadata map */
93
  private String systemMetadataMap;
94
95
  /* The Hazelcast distributed system metadata map */
96
  private IMap<Identifier, SystemMetadata> systemMetadata;
97 6711 cjones
98 6887 cjones
  /* The name of the identifiers set */
99
  private String identifiersSet;
100
101
  /* The Hazelcast distributed identifiers set */
102
  private ISet<Identifier> identifiers;
103 7340 leinfelder
104
  /* The Hazelcast distributed missing identifiers set */
105
  private ISet<Identifier> missingIdentifiers;
106 6887 cjones
107 6711 cjones
  private HazelcastInstance hzInstance;
108 6702 cjones
109 6398 cjones
  /*
110
   * Constructor: Creates an instance of the hazelcast service. Since
111
   * this uses a singleton pattern, use getInstance() to gain the instance.
112
   */
113
  private HazelcastService() {
114
115
    super();
116
    _serviceName="HazelcastService";
117
118
    try {
119 6401 cjones
      init();
120 6398 cjones
121
    } catch (ServiceException se) {
122 6437 leinfelder
      logMetacat.error("There was a problem creating the HazelcastService. " +
123 6398 cjones
                       "The error message was: " + se.getMessage());
124
125
    }
126
127
  }
128
129
  /**
130
   *  Get the instance of the HazelcastService that has been instantiated,
131
   *  or instantiate one if it has not been already.
132
   *
133
   * @return hazelcastService - The instance of the hazelcast service
134
   */
135
  public static HazelcastService getInstance(){
136
137 6401 cjones
    if ( hzService == null ) {
138 6398 cjones
139 6401 cjones
      hzService = new HazelcastService();
140 6398 cjones
141
    }
142 6401 cjones
    return hzService;
143 6398 cjones
  }
144
145
  /**
146 6401 cjones
   * Initializes the Hazelcast service
147 6398 cjones
   */
148 6401 cjones
  public void init() throws ServiceException {
149 6398 cjones
150 6963 leinfelder
    logMetacat.debug("HazelcastService.init() called.");
151 6398 cjones
152 6489 leinfelder
	String configFileName = null;
153
	try {
154
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
155 7421 cjones
		hzConfig = new FileSystemXmlConfig(configFileName);
156 6489 leinfelder
	} catch (Exception e) {
157
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
158 6963 leinfelder
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
159 6489 leinfelder
		// make sure we have the config
160
		try {
161 7421 cjones
			hzConfig = new FileSystemXmlConfig(configFileName);
162 6489 leinfelder
		} catch (FileNotFoundException e1) {
163
			String msg = e.getMessage();
164
			logMetacat.error(msg);
165
			throw new ServiceException(msg);
166
		}
167
	}
168
169 7421 cjones
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
170 6711 cjones
171 7206 leinfelder
  	logMetacat.debug("Initialized hzInstance");
172
173 6446 leinfelder
    // Get configuration properties on instantiation
174
    try {
175
      systemMetadataMap =
176
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
177 6887 cjones
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
178 7421 cjones
179 6446 leinfelder
      // Get a reference to the shared system metadata map as a cluster member
180 7054 leinfelder
      // NOTE: this loads the map from the backing store and can take a long time for large collections
181 7421 cjones
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
182 6446 leinfelder
183 7206 leinfelder
      logMetacat.debug("Initialized systemMetadata");
184
185 6887 cjones
      // Get a reference to the shared identifiers set as a cluster member
186 7208 leinfelder
      // NOTE: this takes a long time to complete
187 7323 leinfelder
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
188 7421 cjones
      identifiers = this.hzInstance.getSet(identifiersSet);
189 7323 leinfelder
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
190 7209 leinfelder
191 7340 leinfelder
      // for publishing the "PIDs Wanted" list
192 7421 cjones
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
193 7340 leinfelder
194
      missingIdentifiers.addItemListener(this, true);
195
196 6446 leinfelder
      // Listen for changes to the system metadata map
197
      systemMetadata.addEntryListener(this, true);
198
199 7098 leinfelder
      // Listen for members added/removed
200
      hzInstance.getCluster().addMembershipListener(this);
201
202 7108 leinfelder
      // Listen for lifecycle state changes
203
      hzInstance.getLifecycleService().addLifecycleListener(this);
204
205 6446 leinfelder
    } catch (PropertyNotFoundException e) {
206
207
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
208
        "The error message was: " + e.getMessage();
209
      logMetacat.error(msg);
210
211 6401 cjones
    }
212 6459 leinfelder
213
    // make sure we have all metadata locally
214
    try {
215 7114 leinfelder
    	// synch on restart
216 7201 leinfelder
        resynchInThread();
217 6459 leinfelder
	} catch (Exception e) {
218 7201 leinfelder
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
219 6459 leinfelder
		logMetacat.error(msg, e);
220
	}
221 6446 leinfelder
222 6398 cjones
  }
223
224 6462 cjones
  /**
225
   * Get the system metadata map
226
   *
227
   * @return systemMetadata - the hazelcast map of system metadata
228 6703 cjones
   * @param identifier - the identifier of the object as a string
229 6462 cjones
   */
230 6446 leinfelder
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
231
	  return systemMetadata;
232
  }
233 6714 leinfelder
234
  /**
235 6887 cjones
   * Get the identifiers set
236
   * @return identifiers - the set of unique DataONE identifiers in the cluster
237
   */
238
  public ISet<Identifier> getIdentifiers() {
239
      return identifiers;
240
241
  }
242
243
  /**
244 6714 leinfelder
   * When Metacat changes the underlying store, we need to refresh the
245
   * in-memory representation of it.
246
   * @param guid
247
   */
248
  public void refreshSystemMetadataEntry(String guid) {
249
	Identifier identifier = new Identifier();
250
	identifier.setValue(guid);
251
	// force hazelcast to update system metadata in memory from the store
252
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
253 7755 leinfelder
	HazelcastService.getInstance().getSystemMetadataMap().get(identifier);
254 6714 leinfelder
  }
255 6702 cjones
256 6859 cjones
  public Lock getLock(String identifier) {
257 6702 cjones
258 6859 cjones
    Lock lock = null;
259 6702 cjones
260
    try {
261 6711 cjones
        lock = getInstance().getHazelcastInstance().getLock(identifier);
262 6702 cjones
263
    } catch (RuntimeException e) {
264
        logMetacat.info("Couldn't get a lock for identifier " +
265 6703 cjones
            identifier + " !!");
266 6702 cjones
    }
267
    return lock;
268
269
  }
270 6446 leinfelder
271 6462 cjones
  /**
272
   * Get the DataONE hazelcast node map
273
   * @return nodes - the hazelcast map of nodes
274
   */
275 6483 cjones
//  public IMap<NodeReference, Node> getNodesMap() {
276
//	  return nodes;
277
//  }
278 6446 leinfelder
279 6398 cjones
  /**
280
   * Indicate whether or not this service is refreshable.
281
   *
282
   * @return refreshable - the boolean refreshable status
283
   */
284
  public boolean refreshable() {
285 6401 cjones
    // TODO: Determine the consequences of restarting the Hazelcast instance
286 6407 cjones
    // Set this to true if it's okay to drop from the cluster, lose the maps,
287
    // and start back up again
288
    return false;
289 6398 cjones
290
  }
291
292
  /**
293
   * Stop the HazelcastService. When stopped, the service will no longer
294
   * respond to requests.
295
   */
296
  public void stop() throws ServiceException {
297
298 7421 cjones
	  this.hzInstance.getLifecycleService().shutdown();
299 6398 cjones
300
  }
301
302 6711 cjones
  public HazelcastInstance getHazelcastInstance() {
303
      return this.hzInstance;
304
305
  }
306 6407 cjones
307
  /**
308
   * Refresh the Hazelcast service by restarting it
309
   */
310 6471 jones
  @Override
311 6401 cjones
  protected void doRefresh() throws ServiceException {
312
313 6407 cjones
    // TODO: verify that the correct config file is still used
314 7421 cjones
	  this.hzInstance.getLifecycleService().restart();
315 6407 cjones
316 6401 cjones
  }
317 6446 leinfelder
318
  /**
319
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
320
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
321
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
322
	 *
323
	 * @param event - The EntryEvent that occurred
324
	 */
325 6471 jones
	@Override
326 6446 leinfelder
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
327 7089 cjones
328
	  logMetacat.info("SystemMetadata entry added event on identifier " +
329
	      event.getKey().getValue());
330 6446 leinfelder
		// handle as update - that method will create if necessary
331
		entryUpdated(event);
332 7089 cjones
333 6446 leinfelder
	}
334 6401 cjones
335 6446 leinfelder
	/**
336
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
337
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
338
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
339
	 *
340
	 * @param event - The EntryEvent that occurred
341
	 */
342 6471 jones
	@Override
343 6446 leinfelder
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
344 7089 cjones
345
      logMetacat.info("SystemMetadata entry evicted event on identifier " +
346
          event.getKey().getValue());
347
348 6889 cjones
	    // ensure identifiers are listed in the hzIdentifiers set
349
      if ( !identifiers.contains(event.getKey()) ) {
350
          identifiers.add(event.getKey());
351
      }
352 6446 leinfelder
353
	}
354
355
	/**
356
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
357
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
358
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
359
	 *
360
	 * @param event - The EntryEvent that occurred
361
	 */
362 6471 jones
	@Override
363 6446 leinfelder
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
364 7089 cjones
365
    logMetacat.info("SystemMetadata entry removed event on identifier " +
366
        event.getKey().getValue());
367
368
	  // we typically don't remove objects in Metacat, but can remove System Metadata
369 6648 leinfelder
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
370 6889 cjones
371
    // keep the hzIdentifiers set in sync with the systemmetadata table
372
    if ( identifiers.contains(event.getKey()) ) {
373
        identifiers.remove(event.getKey());
374
375
    }
376
377 6446 leinfelder
	}
378
379
	/**
380
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
381
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
382
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
383
	 *
384
	 * @param event - The EntryEvent that occurred
385
	 */
386 6471 jones
	@Override
387 6446 leinfelder
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
388 7117 leinfelder
389
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
390 7421 cjones
		PartitionService partitionService = this.hzInstance.getPartitionService();
391 7117 leinfelder
		Partition partition = partitionService.getPartition(event.getKey());
392
		Member ownerMember = partition.getOwner();
393
		SystemMetadata sysmeta = event.getValue();
394
		if (!ownerMember.localMember()) {
395
			if (sysmeta == null) {
396
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
397
				sysmeta = getSystemMetadataMap().get(event.getKey());
398 7116 leinfelder
				if (sysmeta == null) {
399 7117 leinfelder
					// this is a problem
400
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
401
					// TODO: should probably return at this point since the save will fail
402 7116 leinfelder
				}
403 6446 leinfelder
			}
404 7117 leinfelder
			// need to pull the entry into the local store
405
			saveLocally(event.getValue());
406
		}
407 6889 cjones
408 7117 leinfelder
		// ensure identifiers are listed in the hzIdentifiers set
409
		if (!identifiers.contains(event.getKey())) {
410
			identifiers.add(event.getKey());
411
		}
412 6889 cjones
413 6446 leinfelder
	}
414 6459 leinfelder
415
	/**
416
	 * Save SystemMetadata to local store if needed
417
	 * @param sm
418
	 */
419
	private void saveLocally(SystemMetadata sm) {
420
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
421
		try {
422 7188 leinfelder
423
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
424
425 6459 leinfelder
		} catch (McdbDocNotFoundException e) {
426 6904 cjones
			logMetacat.error("Could not save System Metadata to local store.", e);
427
428 6894 cjones
		} catch (SQLException e) {
429 6904 cjones
	      logMetacat.error("Could not save System Metadata to local store.", e);
430
431 7106 leinfelder
	    } catch (InvalidSystemMetadata e) {
432
	        logMetacat.error("Could not save System Metadata to local store.", e);
433
434
	    }
435 6459 leinfelder
	}
436
437 7106 leinfelder
	/**
438
	 * Checks the local backing store for missing SystemMetadata,
439
	 * retrieves those entries from the shared map if they exist,
440
	 * and saves them locally.
441
	 */
442
	private void synchronizeLocalStore() {
443 7168 leinfelder
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
444 7106 leinfelder
		if (localIds != null) {
445
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
446
			for (String localId: localIds) {
447
				logMetacat.debug("Processing system metadata for localId: " + localId);
448
				try {
449
					String docid = DocumentUtil.getSmartDocId(localId);
450
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
451
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
452
					logMetacat.debug("Found mapped guid: " + guid);
453
					Identifier pid = new Identifier();
454
					pid.setValue(guid);
455
					SystemMetadata sm = systemMetadata.get(pid);
456
					logMetacat.debug("Found shared system metadata for guid: " + guid);
457
					saveLocally(sm);
458
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
459
				} catch (Exception e) {
460
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
461
				}
462
			}
463
		}
464
	}
465
466 7201 leinfelder
467 7213 leinfelder
	/**
468
	 * Make sure we have a copy of every entry in the shared map.
469
	 * We use lazy loading and therefore the CNs may not all be in sync when one
470
	 * comes back online after an extended period of being offline
471
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
472
	 * and makes sure each one is present on the shared map.
473
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in
474
	 * null values where the owner does not have a complete backing store.
475
	 * This will be an expensive routine and should be run in a background process so that
476
	 * the server can continue to service other requests during the synch
477
	 * @throws Exception
478
	 */
479 7295 leinfelder
	private void resynchToRemote() {
480 7325 leinfelder
481 7340 leinfelder
		// the local identifiers not already present in the shared map
482
		Set<Identifier> localIdKeys = loadAllKeys();
483
484
		//  the PIDs missing locally
485
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
486 7326 leinfelder
487 7340 leinfelder
		// only contribute PIDs that are not already shared
488 7339 leinfelder
		Iterator<Identifier> idIter = identifiers.iterator();
489 7411 leinfelder
		int processedCount = 0;
490 7339 leinfelder
		while (idIter.hasNext()) {
491
			Identifier pid = idIter.next();
492 7340 leinfelder
			if (localIdKeys.contains(pid)) {
493 7345 leinfelder
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
494 7340 leinfelder
				localIdKeys.remove(pid);
495
			} else {
496
				// we don't have this locally, so we should try to get it
497
				missingIdKeys.add(pid);
498 7339 leinfelder
			}
499 7411 leinfelder
			processedCount++;
500 7339 leinfelder
		}
501 7411 leinfelder
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
502
503 7340 leinfelder
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
504 7326 leinfelder
505 7325 leinfelder
		//identifiers.addAll(idKeys);
506 7326 leinfelder
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
507 7340 leinfelder
		for (Identifier key: localIdKeys) {
508 7325 leinfelder
			if (!identifiers.contains(key)) {
509
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
510
				identifiers.add(key);
511
			}
512
		}
513 7326 leinfelder
		logMetacat.warn("Initialized identifiers with missing local keys");
514 7213 leinfelder
515 7341 leinfelder
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
516 7326 leinfelder
517 7340 leinfelder
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
518
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
519
		while (missingPids.hasNext()) {
520
			Identifier pid = missingPids.next();
521 7342 leinfelder
			// publish that we need this SM entry
522
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
523
			missingIdentifiers.add(pid);
524 7213 leinfelder
		}
525 7216 leinfelder
526 7213 leinfelder
	}
527
528 7346 leinfelder
	public void resynchInThread() {
529 7204 leinfelder
		logMetacat.debug("launching system metadata resynch in a thread");
530 7201 leinfelder
		ExecutorService executor = Executors.newSingleThreadExecutor();
531
		executor.execute(new Runnable() {
532
			@Override
533
			public void run() {
534
				try {
535 7213 leinfelder
					// this is a push mechanism
536
					resynchToRemote();
537 7201 leinfelder
				} catch (Exception e) {
538
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
539
				}
540
			}
541
		});
542
		executor.shutdown();
543
	}
544 6446 leinfelder
545 7098 leinfelder
	/**
546
	 * When there is missing SystemMetadata on the local member,
547
	 * we retrieve it from the shared map and add it to the local
548
	 * backing store for safe keeping.
549
	 */
550
	@Override
551
	public void memberAdded(MembershipEvent event) {
552
		Member member = event.getMember();
553
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
554
		boolean isLocal = member.localMember();
555
		if (isLocal) {
556
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
557 7106 leinfelder
			synchronizeLocalStore();
558 7098 leinfelder
		}
559
	}
560
561
	@Override
562
	public void memberRemoved(MembershipEvent event) {
563
		// TODO Auto-generated method stub
564
565
	}
566
567 7107 leinfelder
	/**
568
	 * In cases where this cluster is paused, we want to
569
	 * check that the local store accurately reflects the shared
570
	 * SystemMetadata map
571
	 * @param event
572
	 */
573
	@Override
574
	public void stateChanged(LifecycleEvent event) {
575
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
576
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
577
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
578
			synchronizeLocalStore();
579
		}
580
	}
581
582 7187 leinfelder
	/**
583
	 * Load all System Metadata keys from the backing store
584
	 * @return set of pids
585
	 */
586
	private Set<Identifier> loadAllKeys() {
587
588
		Set<Identifier> pids = new HashSet<Identifier>();
589
590
		try {
591
592
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
593
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
594
//					null, //startTime,
595
//					null, //endTime,
596
//					null, //objectFormatId,
597
//					false, //replicaStatus,
598
//					0, //start,
599
//					-1 //count
600
//					);
601
//			for (ObjectInfo o: ol.getObjectInfoList()) {
602
//				Identifier pid = o.getIdentifier();
603
//				if ( !pids.contains(pid) ) {
604
//					pids.add(pid);
605
//				}
606
//			}
607
608
			// ALTERNATIVE method: look up all the Identifiers from the table
609
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
610 7211 leinfelder
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
611 7187 leinfelder
			for (String guid: guids){
612
				Identifier pid = new Identifier();
613
				pid.setValue(guid);
614
				pids.add(pid);
615
			}
616
617
		} catch (Exception e) {
618
			throw new RuntimeException(e.getMessage(), e);
619
620
		}
621
622
		return pids;
623
	}
624
625 7343 cjones
	/**
626
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
627
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
628
	 *
629
	 * @param pid   the identifier of the event
630
	 */
631 7340 leinfelder
	@Override
632 7419 cjones
	public void itemAdded(ItemEvent<Identifier> event) {
633
634
		Identifier pid = (Identifier) event.getItem();
635 7340 leinfelder
		// publish the SM for the pid if we have it locally
636
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
637 7344 leinfelder
638
		// lock this event, only if we have a local copy to contribute
639 7343 cjones
		ILock lock = null;
640 7340 leinfelder
		try {
641 7344 leinfelder
			// look up the local copy of the SM
642
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
643
			if (sm != null) {
644
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
645
646
				if ( lock.tryLock() ) {
647
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
648
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
649
			        systemMetadata.put(pid, sm);
650
651
			        // remove the entry since we processed it
652
			        missingIdentifiers.remove(pid);
653
654
				  } else {
655
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
656
				  }
657
			} else {
658
				// can't help here
659
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
660
			}
661 7340 leinfelder
		} catch (Exception e) {
662
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
663 7343 cjones
		} finally {
664 7344 leinfelder
			if ( lock != null ) {
665
				lock.unlock();
666
			}
667 7343 cjones
        }
668 7340 leinfelder
	}
669
670 7343 cjones
	/**
671
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
672
   *
673
   * @param pid   the identifier of the event
674
   */
675 7340 leinfelder
	@Override
676 7419 cjones
	public void itemRemoved(ItemEvent<Identifier> event) {
677 7340 leinfelder
		// do nothing since someone probably handled the wanted PID
678
679
	}
680
681 6398 cjones
}