Project

General

Profile

« Previous | Next » 

Revision 7340

change the system metadata resynch approach: nodes will publish PIDs that they are missing after inspecting the shared identifier set. other nodes will be listening for the "wanted" pids and will put their local copy of SystemMetadata on the shared SM map. This should dramatically decrease the hazelcast chatter during a resynch and targets only the pids that are missing from any of the various nodes.

View differences:

src/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastService.java
51 51
import com.hazelcast.core.HazelcastInstance;
52 52
import com.hazelcast.core.IMap;
53 53
import com.hazelcast.core.ISet;
54
import com.hazelcast.core.ItemListener;
54 55
import com.hazelcast.core.LifecycleEvent;
55 56
import com.hazelcast.core.LifecycleListener;
56 57
import com.hazelcast.core.Member;
......
71 72
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
72 73
 */
73 74
public class HazelcastService extends BaseService
74
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener {
75
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
75 76
  
76 77
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
77 78

  
......
116 117
  
117 118
  /* The Hazelcast distributed identifiers set */
118 119
  private ISet<Identifier> identifiers;
120
  
121
  /* The Hazelcast distributed missing identifiers set */
122
  private ISet<Identifier> missingIdentifiers;
119 123

  
120 124
  private HazelcastInstance hzInstance;
121 125
      
......
216 220
      identifiers = Hazelcast.getSet(identifiersSet);
217 221
      logMetacat.warn("Retrieved hzIdentifiers from Hazelcast");
218 222
      
223
      // for publishing the "PIDs Wanted" list
224
      missingIdentifiers = Hazelcast.getSet("hzMissingIdentifiersSet");
225
      
226
      missingIdentifiers.addItemListener(this, true);
227
      
219 228
      // Listen for changes to the system metadata map
220 229
      systemMetadata.addEntryListener(this, true);
221 230
      
......
501 510
	 */
502 511
	private void resynchToRemote() {
503 512
		
504
		// add any identifiers not already present in the shared map
505
		Set<Identifier> idKeys = loadAllKeys();
513
		// the local identifiers not already present in the shared map
514
		Set<Identifier> localIdKeys = loadAllKeys();
515
		
516
		//  the PIDs missing locally
517
		Set<Identifier> missingIdKeys = new HashSet<Identifier>();
506 518
				
507
		// only contribute what are not already shared
519
		// only contribute PIDs that are not already shared
508 520
		Iterator<Identifier> idIter = identifiers.iterator();
509 521
		while (idIter.hasNext()) {
510 522
			Identifier pid = idIter.next();
511
			if (idKeys.contains(pid)) {
523
			if (localIdKeys.contains(pid)) {
512 524
				logMetacat.warn("Shared pid is already in local identifier set: " + pid.getValue());
513
				idKeys.remove(pid);
525
				localIdKeys.remove(pid);
526
			} else {
527
				// we don't have this locally, so we should try to get it
528
				missingIdKeys.add(pid);
514 529
			}
515 530
		}
516
		logMetacat.warn("local pid count not yet shared: " + idKeys.size() + ", shared pid count: " + identifiers.size());
531
		logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
517 532

  
518 533
		//identifiers.addAll(idKeys);
519 534
		logMetacat.warn("Loading missing local keys into hzIdentifiers");
520
		for (Identifier key: idKeys) {
535
		for (Identifier key: localIdKeys) {
521 536
			if (!identifiers.contains(key)) {
522 537
				logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue());
523 538
				identifiers.add(key);
......
525 540
		}
526 541
		logMetacat.warn("Initialized identifiers with missing local keys");
527 542
		
528
		logMetacat.warn("Processing SystemMetadata for shared pid count: " + identifiers.size());
543
		logMetacat.warn("Processing missing SystemMetadata for shared pid count: " + identifiers.size());
529 544
		
530
		//loop through all the pids to find any null SM that needs to be synched
531
		Iterator<Identifier> sharedPids = identifiers.iterator();
532
		while (sharedPids.hasNext()) {
533
			Identifier pid = sharedPids.next();
534
			logMetacat.trace("looking up shared value for pid: " + pid.getValue());
545
		// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
546
		Iterator<Identifier> missingPids = missingIdKeys.iterator();
547
		while (missingPids.hasNext()) {
548
			Identifier pid = missingPids.next();
549
			logMetacat.trace("Processing missing pid: " + pid.getValue());
535 550
			SystemMetadata sm = systemMetadata.get(pid);
536 551
			if (sm == null)  {
537
				logMetacat.warn("shared SystemMetadata for pid is null: " + pid.getValue());
538
				// look up owner of the pid
539
//				Partition partition = hzInstance.getPartitionService().getPartition(pid);
540
//				Member owner = partition.getOwner();
541
//				boolean isLocalPid = owner.localMember();
542
//				logMetacat.debug("owner of pid: " + pid.getValue() + " isLocal: " + isLocalPid);
543
				// if we don't own it, we can look it up locally in hopes that we have our own copy
544
				if (true) {
545
					// get directly from backing store
546
					try {
547
						sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
548
					} catch (McdbDocNotFoundException e) {
549
						// log and move on, there's nothing more we can do
550
						logMetacat.error("Could not find local SystemMetadata for pid: " + pid.getValue(), e);
551
						continue;
552
					}
553
				}
554
			}
555
			// check again, but hopefully we can add it to the map now (again) so that it is propgated to all listening members
556
			if (sm != null)  {
557
				logMetacat.debug("saving local SystemMetadata to shared map for pid: " + pid.getValue());
552
				logMetacat.warn("Shared SystemMetadata is null for pid: " + pid.getValue());
553

  
554
				// publish that we need this SM entry
555
				logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue());
556
				missingIdentifiers.add(pid);
557
			} else {
558
				// or just republish the shared non-null entry (all SM listeners will then get it and save it locally)
559
				logMetacat.debug("Putting missing pid's SystemMetadata to shared map: " + pid.getValue());
558 560
				systemMetadata.put(pid, sm);
559
			} else {
560
				logMetacat.error("local SystemMetadata is null for pid: " + pid.getValue());
561 561
			}
562 562
		}
563 563
		
......
570 570
			@Override
571 571
			public void run() {
572 572
				try {
573
					// this is a pull mechanism
574
					//resynch();
575 573
					// this is a push mechanism
576 574
					resynchToRemote();
577 575
				} catch (Exception e) {
......
662 660
		return pids;
663 661
	}
664 662

  
663
	@Override
664
	public void itemAdded(Identifier pid) {
665
		// publish the SM for the pid if we have it locally
666
		logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue());
667
		try {
668
			// look up the local copy of the SM
669
			SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue());
670
			if (sm != null) {
671
				// "publish" the system metadata to the shared map since it showed up on the missing queue
672
				logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue());
673
				systemMetadata.put(pid, sm);
674
				
675
				// remove the entry since we processed it
676
				missingIdentifiers.remove(pid);
677
			} else {
678
				logMetacat.warn("Local SystemMetadata was null for pid: " + pid.getValue());
679
			}
680
			
681
		} catch (Exception e) {
682
			logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue());
683
		}
684
		
685
	}
686

  
687
	@Override
688
	public void itemRemoved(Identifier arg0) {
689
		// do nothing since someone probably handled the wanted PID
690
		
691
	}
692

  
665 693
}

Also available in: Unified diff