Revision 7340
Added by ben leinfelder over 12 years ago
src/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastService.java | ||
---|---|---|
51 | 51 |
import com.hazelcast.core.HazelcastInstance; |
52 | 52 |
import com.hazelcast.core.IMap; |
53 | 53 |
import com.hazelcast.core.ISet; |
54 |
import com.hazelcast.core.ItemListener; |
|
54 | 55 |
import com.hazelcast.core.LifecycleEvent; |
55 | 56 |
import com.hazelcast.core.LifecycleListener; |
56 | 57 |
import com.hazelcast.core.Member; |
... | ... | |
71 | 72 |
* The Hazelcast service enables Metacat as a Hazelcast cluster member |
72 | 73 |
*/ |
73 | 74 |
public class HazelcastService extends BaseService |
74 |
implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener { |
|
75 |
implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener, ItemListener<Identifier> {
|
|
75 | 76 |
|
76 | 77 |
private static final String SINCE_PROPERTY = "dateSysMetadataModified"; |
77 | 78 |
|
... | ... | |
116 | 117 |
|
117 | 118 |
/* The Hazelcast distributed identifiers set */ |
118 | 119 |
private ISet<Identifier> identifiers; |
120 |
|
|
121 |
/* The Hazelcast distributed missing identifiers set */ |
|
122 |
private ISet<Identifier> missingIdentifiers; |
|
119 | 123 |
|
120 | 124 |
private HazelcastInstance hzInstance; |
121 | 125 |
|
... | ... | |
216 | 220 |
identifiers = Hazelcast.getSet(identifiersSet); |
217 | 221 |
logMetacat.warn("Retrieved hzIdentifiers from Hazelcast"); |
218 | 222 |
|
223 |
// for publishing the "PIDs Wanted" list |
|
224 |
missingIdentifiers = Hazelcast.getSet("hzMissingIdentifiersSet"); |
|
225 |
|
|
226 |
missingIdentifiers.addItemListener(this, true); |
|
227 |
|
|
219 | 228 |
// Listen for changes to the system metadata map |
220 | 229 |
systemMetadata.addEntryListener(this, true); |
221 | 230 |
|
... | ... | |
501 | 510 |
*/ |
502 | 511 |
private void resynchToRemote() { |
503 | 512 |
|
504 |
// add any identifiers not already present in the shared map |
|
505 |
Set<Identifier> idKeys = loadAllKeys(); |
|
513 |
// the local identifiers not already present in the shared map |
|
514 |
Set<Identifier> localIdKeys = loadAllKeys(); |
|
515 |
|
|
516 |
// the PIDs missing locally |
|
517 |
Set<Identifier> missingIdKeys = new HashSet<Identifier>(); |
|
506 | 518 |
|
507 |
// only contribute what are not already shared
|
|
519 |
// only contribute PIDs that are not already shared
|
|
508 | 520 |
Iterator<Identifier> idIter = identifiers.iterator(); |
509 | 521 |
while (idIter.hasNext()) { |
510 | 522 |
Identifier pid = idIter.next(); |
511 |
if (idKeys.contains(pid)) {
|
|
523 |
if (localIdKeys.contains(pid)) {
|
|
512 | 524 |
logMetacat.warn("Shared pid is already in local identifier set: " + pid.getValue()); |
513 |
idKeys.remove(pid); |
|
525 |
localIdKeys.remove(pid); |
|
526 |
} else { |
|
527 |
// we don't have this locally, so we should try to get it |
|
528 |
missingIdKeys.add(pid); |
|
514 | 529 |
} |
515 | 530 |
} |
516 |
logMetacat.warn("local pid count not yet shared: " + idKeys.size() + ", shared pid count: " + identifiers.size());
|
|
531 |
logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size());
|
|
517 | 532 |
|
518 | 533 |
//identifiers.addAll(idKeys); |
519 | 534 |
logMetacat.warn("Loading missing local keys into hzIdentifiers"); |
520 |
for (Identifier key: idKeys) {
|
|
535 |
for (Identifier key: localIdKeys) {
|
|
521 | 536 |
if (!identifiers.contains(key)) { |
522 | 537 |
logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue()); |
523 | 538 |
identifiers.add(key); |
... | ... | |
525 | 540 |
} |
526 | 541 |
logMetacat.warn("Initialized identifiers with missing local keys"); |
527 | 542 |
|
528 |
logMetacat.warn("Processing SystemMetadata for shared pid count: " + identifiers.size()); |
|
543 |
logMetacat.warn("Processing missing SystemMetadata for shared pid count: " + identifiers.size());
|
|
529 | 544 |
|
530 |
//loop through all the pids to find any null SM that needs to be synched
|
|
531 |
Iterator<Identifier> sharedPids = identifiers.iterator();
|
|
532 |
while (sharedPids.hasNext()) {
|
|
533 |
Identifier pid = sharedPids.next();
|
|
534 |
logMetacat.trace("looking up shared value for pid: " + pid.getValue());
|
|
545 |
// loop through all the missing PIDs to find any null (missing) SM that needs to be resynched
|
|
546 |
Iterator<Identifier> missingPids = missingIdKeys.iterator();
|
|
547 |
while (missingPids.hasNext()) {
|
|
548 |
Identifier pid = missingPids.next();
|
|
549 |
logMetacat.trace("Processing missing pid: " + pid.getValue());
|
|
535 | 550 |
SystemMetadata sm = systemMetadata.get(pid); |
536 | 551 |
if (sm == null) { |
537 |
logMetacat.warn("shared SystemMetadata for pid is null: " + pid.getValue()); |
|
538 |
// look up owner of the pid |
|
539 |
// Partition partition = hzInstance.getPartitionService().getPartition(pid); |
|
540 |
// Member owner = partition.getOwner(); |
|
541 |
// boolean isLocalPid = owner.localMember(); |
|
542 |
// logMetacat.debug("owner of pid: " + pid.getValue() + " isLocal: " + isLocalPid); |
|
543 |
// if we don't own it, we can look it up locally in hopes that we have our own copy |
|
544 |
if (true) { |
|
545 |
// get directly from backing store |
|
546 |
try { |
|
547 |
sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue()); |
|
548 |
} catch (McdbDocNotFoundException e) { |
|
549 |
// log and move on, there's nothing more we can do |
|
550 |
logMetacat.error("Could not find local SystemMetadata for pid: " + pid.getValue(), e); |
|
551 |
continue; |
|
552 |
} |
|
553 |
} |
|
554 |
} |
|
555 |
// check again, but hopefully we can add it to the map now (again) so that it is propgated to all listening members |
|
556 |
if (sm != null) { |
|
557 |
logMetacat.debug("saving local SystemMetadata to shared map for pid: " + pid.getValue()); |
|
552 |
logMetacat.warn("Shared SystemMetadata is null for pid: " + pid.getValue()); |
|
553 |
|
|
554 |
// publish that we need this SM entry |
|
555 |
logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue()); |
|
556 |
missingIdentifiers.add(pid); |
|
557 |
} else { |
|
558 |
// or just republish the shared non-null entry (all SM listeners will then get it and save it locally) |
|
559 |
logMetacat.debug("Putting missing pid's SystemMetadata to shared map: " + pid.getValue()); |
|
558 | 560 |
systemMetadata.put(pid, sm); |
559 |
} else { |
|
560 |
logMetacat.error("local SystemMetadata is null for pid: " + pid.getValue()); |
|
561 | 561 |
} |
562 | 562 |
} |
563 | 563 |
|
... | ... | |
570 | 570 |
@Override |
571 | 571 |
public void run() { |
572 | 572 |
try { |
573 |
// this is a pull mechanism |
|
574 |
//resynch(); |
|
575 | 573 |
// this is a push mechanism |
576 | 574 |
resynchToRemote(); |
577 | 575 |
} catch (Exception e) { |
... | ... | |
662 | 660 |
return pids; |
663 | 661 |
} |
664 | 662 |
|
663 |
@Override |
|
664 |
public void itemAdded(Identifier pid) { |
|
665 |
// publish the SM for the pid if we have it locally |
|
666 |
logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue()); |
|
667 |
try { |
|
668 |
// look up the local copy of the SM |
|
669 |
SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue()); |
|
670 |
if (sm != null) { |
|
671 |
// "publish" the system metadata to the shared map since it showed up on the missing queue |
|
672 |
logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue()); |
|
673 |
systemMetadata.put(pid, sm); |
|
674 |
|
|
675 |
// remove the entry since we processed it |
|
676 |
missingIdentifiers.remove(pid); |
|
677 |
} else { |
|
678 |
logMetacat.warn("Local SystemMetadata was null for pid: " + pid.getValue()); |
|
679 |
} |
|
680 |
|
|
681 |
} catch (Exception e) { |
|
682 |
logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue()); |
|
683 |
} |
|
684 |
|
|
685 |
} |
|
686 |
|
|
687 |
@Override |
|
688 |
public void itemRemoved(Identifier arg0) { |
|
689 |
// do nothing since someone probably handled the wanted PID |
|
690 |
|
|
691 |
} |
|
692 |
|
|
665 | 693 |
} |
Also available in: Unified diff
change the system metadata resynch approach: nodes will publish PIDs that they are missing after inspecting the shared identifier set. other nodes will be listening for the "wanted" pids and will put their local copy of SystemMetadata on the shared SM map. This should dramatically decrease the hazelcast chatter during a resynch and targets only the pids that are missing from any of the various nodes.