Project

General

Profile

1 6398 cjones
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 *
8
 *   '$Author$'
9
 *     '$Date$'
10
 * '$Revision$'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26
27 6399 leinfelder
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28 6398 cjones
29 6489 leinfelder
import java.io.FileNotFoundException;
30 6894 cjones
import java.sql.SQLException;
31 9116 tao
import java.text.DateFormat;
32
import java.text.SimpleDateFormat;
33
import java.util.Calendar;
34 7187 leinfelder
import java.util.HashSet;
35 7203 leinfelder
import java.util.Iterator;
36 7098 leinfelder
import java.util.List;
37 7187 leinfelder
import java.util.Set;
38 7201 leinfelder
import java.util.concurrent.ExecutorService;
39
import java.util.concurrent.Executors;
40 6859 cjones
import java.util.concurrent.locks.Lock;
41 6459 leinfelder
42 6398 cjones
import org.apache.log4j.Logger;
43 6904 cjones
import org.dataone.service.exceptions.InvalidSystemMetadata;
44 6446 leinfelder
import org.dataone.service.types.v1.Identifier;
45 8810 leinfelder
import org.dataone.service.types.v2.SystemMetadata;
46 6398 cjones
47 6401 cjones
import com.hazelcast.config.Config;
48 6437 leinfelder
import com.hazelcast.config.FileSystemXmlConfig;
49 6446 leinfelder
import com.hazelcast.core.EntryEvent;
50
import com.hazelcast.core.EntryListener;
51 6398 cjones
import com.hazelcast.core.Hazelcast;
52 6711 cjones
import com.hazelcast.core.HazelcastInstance;
53 7343 cjones
import com.hazelcast.core.ILock;
54 6446 leinfelder
import com.hazelcast.core.IMap;
55 6887 cjones
import com.hazelcast.core.ISet;
56 7419 cjones
import com.hazelcast.core.ItemEvent;
57 7340 leinfelder
import com.hazelcast.core.ItemListener;
58 7107 leinfelder
import com.hazelcast.core.LifecycleEvent;
59
import com.hazelcast.core.LifecycleListener;
60 6446 leinfelder
import com.hazelcast.core.Member;
61 7098 leinfelder
import com.hazelcast.core.MembershipEvent;
62
import com.hazelcast.core.MembershipListener;
63 6446 leinfelder
import com.hazelcast.partition.Partition;
64
import com.hazelcast.partition.PartitionService;
65 6398 cjones
66 6446 leinfelder
import edu.ucsb.nceas.metacat.IdentifierManager;
67
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
68 8464 leinfelder
import edu.ucsb.nceas.metacat.common.index.IndexTask;
69 7829 leinfelder
import edu.ucsb.nceas.metacat.common.index.event.IndexEvent;
70 6401 cjones
import edu.ucsb.nceas.metacat.properties.PropertyService;
71 6398 cjones
import edu.ucsb.nceas.metacat.shared.BaseService;
72
import edu.ucsb.nceas.metacat.shared.ServiceException;
73 7098 leinfelder
import edu.ucsb.nceas.metacat.util.DocumentUtil;
74 6489 leinfelder
import edu.ucsb.nceas.utilities.FileUtil;
75 6446 leinfelder
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
76 6398 cjones
/**
77
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
78
 */
79
public class HazelcastService extends BaseService
80 7340 leinfelder
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
81 6398 cjones
82 7343 cjones
  private static final String MISSING_PID_PREFIX = "missing-";
83
84 6459 leinfelder
/* The instance of the logging class */
85 6398 cjones
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
86
87
  /* The singleton instance of the hazelcast service */
88 6401 cjones
  private static HazelcastService hzService = null;
89 6398 cjones
90 6401 cjones
  /* The Hazelcast configuration */
91
  private Config hzConfig;
92 6398 cjones
93 6446 leinfelder
  /* The name of the system metadata map */
94
  private String systemMetadataMap;
95
96
  /* The Hazelcast distributed system metadata map */
97
  private IMap<Identifier, SystemMetadata> systemMetadata;
98 6711 cjones
99 6887 cjones
  /* The name of the identifiers set */
100
  private String identifiersSet;
101
102
  /* The Hazelcast distributed identifiers set */
103
  private ISet<Identifier> identifiers;
104 7340 leinfelder
105
  /* The Hazelcast distributed missing identifiers set */
106
  private ISet<Identifier> missingIdentifiers;
107 7812 leinfelder
108
  /* The Hazelcast distributed index queue */
109
  private String hzIndexQueue;
110 8464 leinfelder
  private IMap<Identifier, IndexTask> indexQueue;
111 7829 leinfelder
112
  /* The Hazelcast distributed index event map */
113
  private String hzIndexEventMap;
114
  private IMap<Identifier, IndexEvent> indexEventMap;
115 6887 cjones
116 6711 cjones
  private HazelcastInstance hzInstance;
117 6702 cjones
118 6398 cjones
  /*
119
   * Constructor: Creates an instance of the hazelcast service. Since
120
   * this uses a singleton pattern, use getInstance() to gain the instance.
121
   */
122
  private HazelcastService() {
123
124
    super();
125
    _serviceName="HazelcastService";
126
127
    try {
128 6401 cjones
      init();
129 6398 cjones
130
    } catch (ServiceException se) {
131 6437 leinfelder
      logMetacat.error("There was a problem creating the HazelcastService. " +
132 6398 cjones
                       "The error message was: " + se.getMessage());
133
134
    }
135
136
  }
137
138
  /**
139
   *  Get the instance of the HazelcastService that has been instantiated,
140
   *  or instantiate one if it has not been already.
141
   *
142
   * @return hazelcastService - The instance of the hazelcast service
143
   */
144
  public static HazelcastService getInstance(){
145
146 6401 cjones
    if ( hzService == null ) {
147 6398 cjones
148 6401 cjones
      hzService = new HazelcastService();
149 6398 cjones
150
    }
151 6401 cjones
    return hzService;
152 6398 cjones
  }
153
154
  /**
155 6401 cjones
   * Initializes the Hazelcast service
156 6398 cjones
   */
157 6401 cjones
  public void init() throws ServiceException {
158 6398 cjones
159 6963 leinfelder
    logMetacat.debug("HazelcastService.init() called.");
160 6398 cjones
161 6489 leinfelder
	String configFileName = null;
162
	try {
163
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
164 7421 cjones
		hzConfig = new FileSystemXmlConfig(configFileName);
165 6489 leinfelder
	} catch (Exception e) {
166
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
167 6963 leinfelder
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
168 6489 leinfelder
		// make sure we have the config
169
		try {
170 7421 cjones
			hzConfig = new FileSystemXmlConfig(configFileName);
171 6489 leinfelder
		} catch (FileNotFoundException e1) {
172
			String msg = e.getMessage();
173
			logMetacat.error(msg);
174
			throw new ServiceException(msg);
175
		}
176
	}
177
178 7421 cjones
	this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
179 6711 cjones
180 7206 leinfelder
  	logMetacat.debug("Initialized hzInstance");
181
182 6446 leinfelder
    // Get configuration properties on instantiation
183
    try {
184
      systemMetadataMap =
185
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
186 6887 cjones
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
187 7421 cjones
188 6446 leinfelder
      // Get a reference to the shared system metadata map as a cluster member
189 7054 leinfelder
      // NOTE: this loads the map from the backing store and can take a long time for large collections
190 7421 cjones
      systemMetadata = this.hzInstance.getMap(systemMetadataMap);
191 6446 leinfelder
192 7206 leinfelder
      logMetacat.debug("Initialized systemMetadata");
193
194 6887 cjones
      // Get a reference to the shared identifiers set as a cluster member
195 7208 leinfelder
      // NOTE: this takes a long time to complete
196 7323 leinfelder
      logMetacat.warn("Retrieving hzIdentifiers from Hazelcast");
197 7421 cjones
      identifiers = this.hzInstance.getSet(identifiersSet);
198 7323 leinfelder
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
199 7209 leinfelder
200 7340 leinfelder
      // for publishing the "PIDs Wanted" list
201 7421 cjones
      missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet");
202 7340 leinfelder
203
      missingIdentifiers.addItemListener(this, true);
204 7829 leinfelder
205
      // for index tasks
206 7812 leinfelder
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
207 8464 leinfelder
      indexQueue = this.hzInstance.getMap(hzIndexQueue);
208 7812 leinfelder
209 7829 leinfelder
      // for index events (failures)
210
      hzIndexEventMap = PropertyService.getProperty("index.hazelcast.indexeventmap");
211
      indexEventMap = this.hzInstance.getMap(hzIndexEventMap);
212
213 6446 leinfelder
      // Listen for changes to the system metadata map
214
      systemMetadata.addEntryListener(this, true);
215
216 7098 leinfelder
      // Listen for members added/removed
217
      hzInstance.getCluster().addMembershipListener(this);
218
219 7108 leinfelder
      // Listen for lifecycle state changes
220
      hzInstance.getLifecycleService().addLifecycleListener(this);
221
222 6446 leinfelder
    } catch (PropertyNotFoundException e) {
223
224
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
225
        "The error message was: " + e.getMessage();
226
      logMetacat.error(msg);
227
228 6401 cjones
    }
229 6459 leinfelder
230
    // make sure we have all metadata locally
231
    try {
232 7114 leinfelder
    	// synch on restart
233 7201 leinfelder
        resynchInThread();
234 6459 leinfelder
	} catch (Exception e) {
235 7201 leinfelder
		String msg = "Problem resynchronizing system metadata. " + e.getMessage();
236 6459 leinfelder
		logMetacat.error(msg, e);
237
	}
238 6446 leinfelder
239 6398 cjones
  }
240
241 6462 cjones
  /**
242
   * Get the system metadata map
243
   *
244
   * @return systemMetadata - the hazelcast map of system metadata
245 6703 cjones
   * @param identifier - the identifier of the object as a string
246 6462 cjones
   */
247 6446 leinfelder
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
248
	  return systemMetadata;
249
  }
250 6714 leinfelder
251
  /**
252 6887 cjones
   * Get the identifiers set
253
   * @return identifiers - the set of unique DataONE identifiers in the cluster
254
   */
255
  public ISet<Identifier> getIdentifiers() {
256
      return identifiers;
257
258
  }
259
260
  /**
261 7812 leinfelder
   * Get the index queue
262
   * @return the set of SystemMetadata to be indexed
263
   */
264 8464 leinfelder
  public IMap<Identifier, IndexTask> getIndexQueue() {
265 7812 leinfelder
      return indexQueue;
266
  }
267
268
  /**
269 7829 leinfelder
   * Get the index event map
270
   * @return indexEventMap - the hazelcast map of index events
271
   */
272
  public IMap<Identifier, IndexEvent> getIndexEventMap() {
273
	  return indexEventMap;
274
  }
275
276
  /**
277 6714 leinfelder
   * When Metacat changes the underlying store, we need to refresh the
278
   * in-memory representation of it.
279
   * @param guid
280
   */
281
  public void refreshSystemMetadataEntry(String guid) {
282
	Identifier identifier = new Identifier();
283
	identifier.setValue(guid);
284
	// force hazelcast to update system metadata in memory from the store
285
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
286
  }
287 6702 cjones
288 6859 cjones
  public Lock getLock(String identifier) {
289 6702 cjones
290 6859 cjones
    Lock lock = null;
291 6702 cjones
292
    try {
293 6711 cjones
        lock = getInstance().getHazelcastInstance().getLock(identifier);
294 6702 cjones
295
    } catch (RuntimeException e) {
296
        logMetacat.info("Couldn't get a lock for identifier " +
297 6703 cjones
            identifier + " !!");
298 6702 cjones
    }
299
    return lock;
300
301
  }
302 6446 leinfelder
303 6462 cjones
  /**
304
   * Get the DataONE hazelcast node map
305
   * @return nodes - the hazelcast map of nodes
306
   */
307 6483 cjones
//  public IMap<NodeReference, Node> getNodesMap() {
308
//	  return nodes;
309
//  }
310 6446 leinfelder
311 6398 cjones
  /**
312
   * Indicate whether or not this service is refreshable.
313
   *
314
   * @return refreshable - the boolean refreshable status
315
   */
316
  public boolean refreshable() {
317 6401 cjones
    // TODO: Determine the consequences of restarting the Hazelcast instance
318 6407 cjones
    // Set this to true if it's okay to drop from the cluster, lose the maps,
319
    // and start back up again
320
    return false;
321 6398 cjones
322
  }
323
324
  /**
325
   * Stop the HazelcastService. When stopped, the service will no longer
326
   * respond to requests.
327
   */
328
  public void stop() throws ServiceException {
329
330 7421 cjones
	  this.hzInstance.getLifecycleService().shutdown();
331 6398 cjones
332
  }
333
334 6711 cjones
  public HazelcastInstance getHazelcastInstance() {
335
      return this.hzInstance;
336
337
  }
338 6407 cjones
339
  /**
340
   * Refresh the Hazelcast service by restarting it
341
   */
342 6471 jones
  @Override
343 6401 cjones
  protected void doRefresh() throws ServiceException {
344
345 6407 cjones
    // TODO: verify that the correct config file is still used
346 7421 cjones
	  this.hzInstance.getLifecycleService().restart();
347 6407 cjones
348 6401 cjones
  }
349 6446 leinfelder
350
  /**
351
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
352
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
353
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
354
	 *
355
	 * @param event - The EntryEvent that occurred
356
	 */
357 6471 jones
	@Override
358 6446 leinfelder
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
359 7089 cjones
360
	  logMetacat.info("SystemMetadata entry added event on identifier " +
361
	      event.getKey().getValue());
362 6446 leinfelder
		// handle as update - that method will create if necessary
363
		entryUpdated(event);
364 7089 cjones
365 6446 leinfelder
	}
366 6401 cjones
367 6446 leinfelder
	/**
368
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
369
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
370
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
371
	 *
372
	 * @param event - The EntryEvent that occurred
373
	 */
374 6471 jones
	@Override
375 6446 leinfelder
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
376 7089 cjones
377
      logMetacat.info("SystemMetadata entry evicted event on identifier " +
378
          event.getKey().getValue());
379
380 6889 cjones
	    // ensure identifiers are listed in the hzIdentifiers set
381
      if ( !identifiers.contains(event.getKey()) ) {
382
          identifiers.add(event.getKey());
383
      }
384 6446 leinfelder
385
	}
386
387
	/**
388
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
389
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
390
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
391
	 *
392
	 * @param event - The EntryEvent that occurred
393
	 */
394 6471 jones
	@Override
395 6446 leinfelder
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
396 7089 cjones
397
    logMetacat.info("SystemMetadata entry removed event on identifier " +
398
        event.getKey().getValue());
399
400
	  // we typically don't remove objects in Metacat, but can remove System Metadata
401 6648 leinfelder
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
402 6889 cjones
403
    // keep the hzIdentifiers set in sync with the systemmetadata table
404
    if ( identifiers.contains(event.getKey()) ) {
405
        identifiers.remove(event.getKey());
406
407
    }
408
409 6446 leinfelder
	}
410
411
	/**
412
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
413
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
414
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
415
	 *
416
	 * @param event - The EntryEvent that occurred
417
	 */
418 6471 jones
	@Override
419 6446 leinfelder
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
420 7117 leinfelder
421
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
422 7421 cjones
		PartitionService partitionService = this.hzInstance.getPartitionService();
423 7117 leinfelder
		Partition partition = partitionService.getPartition(event.getKey());
424
		Member ownerMember = partition.getOwner();
425
		SystemMetadata sysmeta = event.getValue();
426
		if (!ownerMember.localMember()) {
427
			if (sysmeta == null) {
428
				logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue());
429
				sysmeta = getSystemMetadataMap().get(event.getKey());
430 7116 leinfelder
				if (sysmeta == null) {
431 7117 leinfelder
					// this is a problem
432
					logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue());
433
					// TODO: should probably return at this point since the save will fail
434 7116 leinfelder
				}
435 6446 leinfelder
			}
436 7117 leinfelder
			// need to pull the entry into the local store
437
			saveLocally(event.getValue());
438
		}
439 6889 cjones
440 7117 leinfelder
		// ensure identifiers are listed in the hzIdentifiers set
441
		if (!identifiers.contains(event.getKey())) {
442
			identifiers.add(event.getKey());
443
		}
444 6889 cjones
445 6446 leinfelder
	}
446 6459 leinfelder
447
	/**
448
	 * Save SystemMetadata to local store if needed
449
	 * @param sm
450
	 */
451
	private void saveLocally(SystemMetadata sm) {
452
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
453
		try {
454 7188 leinfelder
455
			IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm);
456
457 6459 leinfelder
		} catch (McdbDocNotFoundException e) {
458 6904 cjones
			logMetacat.error("Could not save System Metadata to local store.", e);
459
460 6894 cjones
		} catch (SQLException e) {
461 6904 cjones
	      logMetacat.error("Could not save System Metadata to local store.", e);
462
463 7106 leinfelder
	    } catch (InvalidSystemMetadata e) {
464
	        logMetacat.error("Could not save System Metadata to local store.", e);
465
466
	    }
467 6459 leinfelder
	}
468
469 7106 leinfelder
	/**
470
	 * Checks the local backing store for missing SystemMetadata,
471
	 * retrieves those entries from the shared map if they exist,
472
	 * and saves them locally.
473
	 */
474
	private void synchronizeLocalStore() {
475 7168 leinfelder
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1);
476 7106 leinfelder
		if (localIds != null) {
477
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
478
			for (String localId: localIds) {
479
				logMetacat.debug("Processing system metadata for localId: " + localId);
480
				try {
481
					String docid = DocumentUtil.getSmartDocId(localId);
482
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
483
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
484
					logMetacat.debug("Found mapped guid: " + guid);
485
					Identifier pid = new Identifier();
486
					pid.setValue(guid);
487
					SystemMetadata sm = systemMetadata.get(pid);
488
					logMetacat.debug("Found shared system metadata for guid: " + guid);
489
					saveLocally(sm);
490
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
491
				} catch (Exception e) {
492
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
493
				}
494
			}
495
		}
496
	}
497
498 7201 leinfelder
499 7213 leinfelder
	/**
500
	 * Make sure we have a copy of every entry in the shared map.
501
	 * We use lazy loading and therefore the CNs may not all be in sync when one
502
	 * comes back online after an extended period of being offline
503
	 * This method loops through the entries that a FULLY UP-TO-DATE CN has
504
	 * and makes sure each one is present on the shared map.
505
	 * It is meant to overcome a HZ weakness wherein ownership of a key results in
506
	 * null values where the owner does not have a complete backing store.
507
	 * This will be an expensive routine and should be run in a background process so that
508
	 * the server can continue to service other requests during the synch
509
	 * @throws Exception
510
	 */
511 7295 leinfelder
	private void resynchToRemote() {
512 7325 leinfelder
513 7340 leinfelder
		// the local identifiers not already present in the shared map
514
		Set<Identifier> localIdKeys = loadAllKeys();
515
516
		//  the PIDs missing locally
517
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
518 7326 leinfelder
519 7340 leinfelder
		// only contribute PIDs that are not already shared
520 7339 leinfelder
		Iterator<Identifier> idIter = identifiers.iterator();
521 7411 leinfelder
		int processedCount = 0;
522 7339 leinfelder
		while (idIter.hasNext()) {
523
			Identifier pid = idIter.next();
524 7340 leinfelder
			if (localIdKeys.contains(pid)) {
525 7345 leinfelder
				logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue());
526 7340 leinfelder
				localIdKeys.remove(pid);
527
			} else {
528
				// we don't have this locally, so we should try to get it
529
				missingIdKeys.add(pid);
530 7339 leinfelder
			}
531 7411 leinfelder
			processedCount++;
532 7339 leinfelder
		}
533 7411 leinfelder
		logMetacat.warn("processedCount (identifiers from iterator): " + processedCount);
534
535 7340 leinfelder
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
536 7326 leinfelder
537 7325 leinfelder
		//identifiers.addAll(idKeys);
538 7326 leinfelder
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
539 7340 leinfelder
		for (Identifier key: localIdKeys) {
540 7325 leinfelder
			if (!identifiers.contains(key)) {
541
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
542
				identifiers.add(key);
543
			}
544
		}
545 7326 leinfelder
		logMetacat.warn("Initialized identifiers with missing local keys");
546 7213 leinfelder
547 7341 leinfelder
		logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size());
548 7326 leinfelder
549 7340 leinfelder
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
550
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
551
		while (missingPids.hasNext()) {
552
			Identifier pid = missingPids.next();
553 7342 leinfelder
			// publish that we need this SM entry
554
			logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
555
			missingIdentifiers.add(pid);
556 7213 leinfelder
		}
557 7216 leinfelder
558 7213 leinfelder
	}
559
560 7346 leinfelder
	public void resynchInThread() {
561 7204 leinfelder
		logMetacat.debug("launching system metadata resynch in a thread");
562 7201 leinfelder
		ExecutorService executor = Executors.newSingleThreadExecutor();
563
		executor.execute(new Runnable() {
564
			@Override
565
			public void run() {
566
				try {
567 7213 leinfelder
					// this is a push mechanism
568 9116 tao
				    DateFormat dateFormat = new SimpleDateFormat("MMM dd, yyyy HH:mm:ss aaa");
569
				    System.out.println(dateFormat.format(Calendar.getInstance().getTime())+" Start the hazelcast synchronization");
570 9078 tao
				    logMetacat.warn("Start the hazelcast synchronization");
571 7213 leinfelder
					resynchToRemote();
572 9116 tao
					System.out.println(dateFormat.format(Calendar.getInstance().getTime())+" End the hazelcast synchronization");
573 9078 tao
					logMetacat.warn("End the hazelcast synchronization");
574 7201 leinfelder
				} catch (Exception e) {
575
					logMetacat.error("Error in resynchInThread: " + e.getMessage(), e);
576
				}
577
			}
578
		});
579
		executor.shutdown();
580
	}
581 6446 leinfelder
582 7098 leinfelder
	/**
583
	 * When there is missing SystemMetadata on the local member,
584
	 * we retrieve it from the shared map and add it to the local
585
	 * backing store for safe keeping.
586
	 */
587
	@Override
588
	public void memberAdded(MembershipEvent event) {
589
		Member member = event.getMember();
590
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
591
		boolean isLocal = member.localMember();
592
		if (isLocal) {
593
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
594 7106 leinfelder
			synchronizeLocalStore();
595 7098 leinfelder
		}
596
	}
597
598
	@Override
599
	public void memberRemoved(MembershipEvent event) {
600
		// TODO Auto-generated method stub
601
602
	}
603
604 7107 leinfelder
	/**
605
	 * In cases where this cluster is paused, we want to
606
	 * check that the local store accurately reflects the shared
607
	 * SystemMetadata map
608
	 * @param event
609
	 */
610
	@Override
611
	public void stateChanged(LifecycleEvent event) {
612
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
613
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
614
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
615
			synchronizeLocalStore();
616
		}
617
	}
618
619 7187 leinfelder
	/**
620
	 * Load all System Metadata keys from the backing store
621
	 * @return set of pids
622
	 */
623
	private Set<Identifier> loadAllKeys() {
624
625
		Set<Identifier> pids = new HashSet<Identifier>();
626
627
		try {
628
629
			// ALTERNATIVE 1: this has more overhead than just looking at the GUIDs
630
//			ObjectList ol = IdentifierManager.getInstance().querySystemMetadata(
631
//					null, //startTime,
632
//					null, //endTime,
633
//					null, //objectFormatId,
634
//					false, //replicaStatus,
635
//					0, //start,
636
//					-1 //count
637
//					);
638
//			for (ObjectInfo o: ol.getObjectInfoList()) {
639
//				Identifier pid = o.getIdentifier();
640
//				if ( !pids.contains(pid) ) {
641
//					pids.add(pid);
642
//				}
643
//			}
644
645
			// ALTERNATIVE method: look up all the Identifiers from the table
646
			List<String> guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs();
647 7211 leinfelder
			logMetacat.warn("Local SystemMetadata pid count: " + guids.size());
648 7187 leinfelder
			for (String guid: guids){
649
				Identifier pid = new Identifier();
650
				pid.setValue(guid);
651
				pids.add(pid);
652
			}
653
654
		} catch (Exception e) {
655
			throw new RuntimeException(e.getMessage(), e);
656
657
		}
658
659
		return pids;
660
	}
661
662 7343 cjones
	/**
663
	 * Respond to itemAdded events on the hzMissingIdentifiers Set.  Uses a
664
	 * distributed ILock to try to prevent multiple put calls on hzSystemMetadata
665
	 *
666
	 * @param pid   the identifier of the event
667
	 */
668 7340 leinfelder
	@Override
669 7419 cjones
	public void itemAdded(ItemEvent<Identifier> event) {
670
671
		Identifier pid = (Identifier) event.getItem();
672 7340 leinfelder
		// publish the SM for the pid if we have it locally
673
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
674 7344 leinfelder
675
		// lock this event, only if we have a local copy to contribute
676 7343 cjones
		ILock lock = null;
677 7340 leinfelder
		try {
678 7344 leinfelder
			// look up the local copy of the SM
679
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
680
			if (sm != null) {
681
				lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue());
682
683
				if ( lock.tryLock() ) {
684
			        // "publish" the system metadata to the shared map since it showed up on the missing queue
685
			        logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
686
			        systemMetadata.put(pid, sm);
687
688
			        // remove the entry since we processed it
689
			        missingIdentifiers.remove(pid);
690
691
				  } else {
692
				      logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping.");
693
				  }
694
			} else {
695
				// can't help here
696
				logMetacat.warn("Local system metadata not found for pid: " + pid.getValue());
697
			}
698 7340 leinfelder
		} catch (Exception e) {
699
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
700 7343 cjones
		} finally {
701 7344 leinfelder
			if ( lock != null ) {
702
				lock.unlock();
703
			}
704 7343 cjones
        }
705 7340 leinfelder
	}
706
707 7343 cjones
	/**
708
   * Respond to itemRemoved events on the hzMissingIdentifiers Set
709
   *
710
   * @param pid   the identifier of the event
711
   */
712 7340 leinfelder
	@Override
713 7419 cjones
	public void itemRemoved(ItemEvent<Identifier> event) {
714 7340 leinfelder
		// do nothing since someone probably handled the wanted PID
715
716
	}
717
718 6398 cjones
}