Revision 7812
Added by ben leinfelder over 11 years ago
metacat-index/src/main/java/edu/ucsb/nceas/metacat/index/DistributedMapsFactory.java | ||
---|---|---|
58 | 58 |
private static HazelcastClient hzClient = null; |
59 | 59 |
private static String hzSystemMetadata = null; |
60 | 60 |
private static String hzObjectPath = null; |
61 |
private static String hzIndexQueue = null; |
|
61 | 62 |
private static int waitingTime = IndexGenerator.WAITTIME; |
62 | 63 |
private static int maxAttempts = IndexGenerator.MAXWAITNUMBER; |
63 | 64 |
private static IMap<Identifier, SystemMetadata> systemMetadataMap = null; |
64 | 65 |
private static IMap<Identifier, String> objectPathMap = null; |
66 |
private static ISet<SystemMetadata> indexQueue = null; |
|
65 | 67 |
/* The name of the identifiers set */ |
66 | 68 |
private static String identifiersSetName = IDENTIFIERSETNAME; |
67 | 69 |
/* The Hazelcast distributed identifiers set */ |
... | ... | |
86 | 88 |
log.warn("DistributedMapFactory.startHazelCastClient - couldn't read the name of the identifiersSet from the metacat.properties file since : "+e.getMessage()+". Default values will be used"); |
87 | 89 |
identifiersSetName = IDENTIFIERSETNAME; |
88 | 90 |
} |
89 |
// get config values |
|
91 |
|
|
92 |
// the index queue name to listen to |
|
93 |
hzIndexQueue = Settings.getConfiguration().getString("index.hazelcast.indexqueue"); |
|
94 |
|
|
95 |
// get config values |
|
90 | 96 |
hzSystemMetadata = Settings.getConfiguration().getString( |
91 | 97 |
"dataone.hazelcast.storageCluster.systemMetadataMap"); |
92 | 98 |
hzObjectPath = Settings.getConfiguration().getString( |
... | ... | |
231 | 237 |
return identifiersSet; |
232 | 238 |
} |
233 | 239 |
|
240 |
/** |
|
241 |
* Get the indexQueue set from hazelcast client |
|
242 |
* @return the indexQueue |
|
243 |
* @throws FileNotFoundException |
|
244 |
* @throws ServiceFailure |
|
245 |
*/ |
|
246 |
public static ISet<SystemMetadata> getIndexQueue() throws FileNotFoundException, ServiceFailure { |
|
247 |
if(hzClient== null) { |
|
248 |
startHazelCastClient(); |
|
249 |
} |
|
250 |
indexQueue = hzClient.getSet(hzIndexQueue); |
|
251 |
return indexQueue; |
|
252 |
} |
|
253 |
|
|
234 | 254 |
} |
metacat-index/src/main/java/edu/ucsb/nceas/metacat/index/SystemMetadataEventListener.java | ||
---|---|---|
38 | 38 |
import org.dataone.service.types.v1.Identifier; |
39 | 39 |
import org.dataone.service.types.v1.SystemMetadata; |
40 | 40 |
|
41 |
import com.hazelcast.core.EntryEvent; |
|
42 |
import com.hazelcast.core.EntryListener; |
|
43 | 41 |
import com.hazelcast.core.IMap; |
42 |
import com.hazelcast.core.ISet; |
|
43 |
import com.hazelcast.core.ItemEvent; |
|
44 |
import com.hazelcast.core.ItemListener; |
|
44 | 45 |
|
45 |
public class SystemMetadataEventListener implements EntryListener<Identifier, SystemMetadata> {
|
|
46 |
public class SystemMetadataEventListener implements ItemListener<SystemMetadata> {
|
|
46 | 47 |
|
47 | 48 |
private static Log log = LogFactory.getLog(SystemMetadataEventListener.class); |
48 | 49 |
|
... | ... | |
94 | 95 |
|
95 | 96 |
// get shared structures and add listener |
96 | 97 |
IMap<Identifier, String> objectPathMap = DistributedMapsFactory.getObjectPathMap(); |
97 |
IMap<Identifier, SystemMetadata> systemMetadataMap = DistributedMapsFactory.getSystemMetadataMap();
|
|
98 |
systemMetadataMap.addEntryListener(this, true);
|
|
99 |
log.info("System Metadata size: " + systemMetadataMap.size());
|
|
98 |
ISet<SystemMetadata> indexQueue = DistributedMapsFactory.getIndexQueue();
|
|
99 |
indexQueue.addItemListener(this, true);
|
|
100 |
log.info("System Metadata size: " + indexQueue.size());
|
|
100 | 101 |
log.info("Object path size:" + objectPathMap.size()); |
101 | 102 |
} |
102 | 103 |
|
... | ... | |
107 | 108 |
*/ |
108 | 109 |
public void stop() throws FileNotFoundException, ServiceFailure { |
109 | 110 |
log.info("stopping index entry listener..."); |
110 |
DistributedMapsFactory.getSystemMetadataMap().removeEntryListener(this);
|
|
111 |
DistributedMapsFactory.getIndexQueue().removeItemListener(this);
|
|
111 | 112 |
} |
112 | 113 |
|
113 | 114 |
/** |
... | ... | |
135 | 136 |
return obsoletes; |
136 | 137 |
} |
137 | 138 |
|
138 |
|
|
139 |
|
|
140 |
public void entryAdded(EntryEvent<Identifier, SystemMetadata> entryEvent) { |
|
141 |
//System.out.println("===================================calling entryAdded method "); |
|
142 |
log.info("===================================calling entryAdded method "); |
|
143 |
// use the same implementation for insert/update for now |
|
144 |
this.entryUpdated(entryEvent); |
|
145 | 139 |
|
146 |
} |
|
147 |
|
|
148 |
public void entryEvicted(EntryEvent<Identifier, SystemMetadata> entryEvent) { |
|
149 |
// remove from the index for now, this may be a temporary eviction |
|
150 |
this.entryRemoved(entryEvent); |
|
151 |
|
|
152 |
} |
|
153 |
|
|
154 |
public void entryRemoved(EntryEvent<Identifier, SystemMetadata> entryEvent) { |
|
140 |
public void itemRemoved(ItemEvent<SystemMetadata> entryEvent) { |
|
155 | 141 |
// remove from the index |
156 |
Identifier pid = entryEvent.getKey();
|
|
142 |
Identifier pid = entryEvent.getItem().getIdentifier();
|
|
157 | 143 |
try { |
158 | 144 |
solrIndex.remove(pid.getValue()); |
159 | 145 |
} catch (Exception e) { |
... | ... | |
163 | 149 |
|
164 | 150 |
} |
165 | 151 |
|
166 |
public void entryUpdated(EntryEvent<Identifier, SystemMetadata> entryEvent) {
|
|
152 |
public void itemAdded(ItemEvent<SystemMetadata> entryEvent) {
|
|
167 | 153 |
//System.out.println("===================================calling entryUpdated method "); |
168 | 154 |
log.info("===================================calling entryUpdated method "); |
169 | 155 |
// add to the index |
170 |
Identifier pid = entryEvent.getKey();
|
|
156 |
Identifier pid = entryEvent.getItem().getIdentifier();
|
|
171 | 157 |
//System.out.println("===================================update the document "+pid.getValue()); |
172 | 158 |
log.info("===================================update the document "+pid.getValue()); |
173 |
SystemMetadata systemMetadata = entryEvent.getValue();
|
|
159 |
SystemMetadata systemMetadata = entryEvent.getItem();
|
|
174 | 160 |
Identifier obsoletes = systemMetadata.getObsoletes(); |
175 | 161 |
List<String> obsoletesChain = null; |
176 | 162 |
if (obsoletes != null) { |
lib/metacat.properties | ||
---|---|---|
595 | 595 |
#If the inerval is less than 0, the thread would not run. |
596 | 596 |
index.regenerate.interval=7200000 |
597 | 597 |
index.evenlog.classname=edu.ucsb.nceas.metacat.index.event.IndexEventFileLog |
598 |
index.hazelcast.indexqueue=hzIndexQueue |
|
598 | 599 |
|
599 | 600 |
############# SOLR Search Section ########################################### |
600 | 601 |
#Embedded (default): |
src/edu/ucsb/nceas/metacat/DocumentImpl.java | ||
---|---|---|
3421 | 3421 |
sysMeta.setArchived(true); |
3422 | 3422 |
sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime()); |
3423 | 3423 |
HazelcastService.getInstance().getSystemMetadataMap().put(guid, sysMeta); |
3424 |
// submit for indexing |
|
3425 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
3424 | 3426 |
} |
3425 | 3427 |
|
3426 | 3428 |
// clear cache after inserting or updating a document |
src/edu/ucsb/nceas/metacat/MetacatHandler.java | ||
---|---|---|
1857 | 1857 |
// save it to the map |
1858 | 1858 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
1859 | 1859 |
|
1860 |
// submit for indexing |
|
1861 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
1862 |
|
|
1860 | 1863 |
} catch ( McdbDocNotFoundException dnfe ) { |
1861 | 1864 |
logMetacat.debug( |
1862 | 1865 |
"There was a problem finding the localId " + |
... | ... | |
3036 | 3039 |
|
3037 | 3040 |
// manage it in the store |
3038 | 3041 |
HazelcastService.getInstance().getSystemMetadataMap().put(sm.getIdentifier(), sm); |
3042 |
|
|
3043 |
// submit for indexing |
|
3044 |
HazelcastService.getInstance().getIndexQueue().add(sm); |
|
3039 | 3045 |
|
3040 | 3046 |
} catch (Exception ee) { |
3041 | 3047 |
// If the file did not exist before this method was |
src/edu/ucsb/nceas/metacat/dataone/MNodeService.java | ||
---|---|---|
1191 | 1191 |
// update the local copy of system metadata for the pid |
1192 | 1192 |
try { |
1193 | 1193 |
HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta); |
1194 |
// submit for indexing |
|
1195 |
HazelcastService.getInstance().getIndexQueue().add(newSysMeta); |
|
1194 | 1196 |
logMetacat.info("Updated local copy of system metadata for pid " + |
1195 | 1197 |
pid.getValue() + " after change notification from the CN."); |
1196 | 1198 |
|
src/edu/ucsb/nceas/metacat/dataone/D1NodeService.java | ||
---|---|---|
424 | 424 |
|
425 | 425 |
// save the sysmeta |
426 | 426 |
try { |
427 |
// lock and unlock of the pid happens in the subclass
|
|
427 |
// lock and unlock of the pid happens in the subclass
|
|
428 | 428 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta); |
429 |
|
|
429 |
// submit for indexing |
|
430 |
HazelcastService.getInstance().getIndexQueue().add(sysmeta); |
|
430 | 431 |
} catch (Exception e) { |
431 | 432 |
logMetacat.error("Problem creating system metadata: " + pid.getValue(), e); |
432 | 433 |
throw new ServiceFailure("1190", e.getMessage()); |
... | ... | |
1232 | 1233 |
try { |
1233 | 1234 |
// note: the calling subclass handles the map hazelcast lock/unlock |
1234 | 1235 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta); |
1235 |
|
|
1236 |
// submit for indexing |
|
1237 |
HazelcastService.getInstance().getIndexQueue().add(sysmeta); |
|
1236 | 1238 |
} catch (Exception e) { |
1237 | 1239 |
throw new ServiceFailure("1190", e.getMessage()); |
1238 | 1240 |
|
... | ... | |
1252 | 1254 |
try { |
1253 | 1255 |
HazelcastService.getInstance().getSystemMetadataMap().lock(sysMeta.getIdentifier()); |
1254 | 1256 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
1255 |
|
|
1257 |
// submit for indexing |
|
1258 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
1256 | 1259 |
} catch (Exception e) { |
1257 | 1260 |
throw new ServiceFailure("4862", e.getMessage()); |
1258 | 1261 |
|
... | ... | |
1429 | 1432 |
sysMeta.setArchived(true); |
1430 | 1433 |
sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime()); |
1431 | 1434 |
HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta); |
1435 |
// submit for indexing |
|
1436 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
1432 | 1437 |
|
1433 | 1438 |
} catch (McdbDocNotFoundException e) { |
1434 | 1439 |
throw new NotFound("1340", "The provided identifier was invalid."); |
src/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastService.java | ||
---|---|---|
39 | 39 |
import org.apache.log4j.Logger; |
40 | 40 |
import org.dataone.service.exceptions.InvalidSystemMetadata; |
41 | 41 |
import org.dataone.service.types.v1.Identifier; |
42 |
import org.dataone.service.types.v1.Node; |
|
43 |
import org.dataone.service.types.v1.NodeReference; |
|
44 | 42 |
import org.dataone.service.types.v1.SystemMetadata; |
45 | 43 |
|
46 | 44 |
import com.hazelcast.config.Config; |
... | ... | |
103 | 101 |
|
104 | 102 |
/* The Hazelcast distributed missing identifiers set */ |
105 | 103 |
private ISet<Identifier> missingIdentifiers; |
104 |
|
|
105 |
/* The Hazelcast distributed index queue */ |
|
106 |
private String hzIndexQueue; |
|
107 |
private ISet<SystemMetadata> indexQueue; |
|
106 | 108 |
|
107 | 109 |
private HazelcastInstance hzInstance; |
108 | 110 |
|
... | ... | |
193 | 195 |
|
194 | 196 |
missingIdentifiers.addItemListener(this, true); |
195 | 197 |
|
198 |
hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue"); |
|
199 |
indexQueue = this.hzInstance.getSet(hzIndexQueue); |
|
200 |
|
|
196 | 201 |
// Listen for changes to the system metadata map |
197 | 202 |
systemMetadata.addEntryListener(this, true); |
198 | 203 |
|
... | ... | |
241 | 246 |
} |
242 | 247 |
|
243 | 248 |
/** |
249 |
* Get the index queue |
|
250 |
* @return the set of SystemMetadata to be indexed |
|
251 |
*/ |
|
252 |
public ISet<SystemMetadata> getIndexQueue() { |
|
253 |
return indexQueue; |
|
254 |
|
|
255 |
} |
|
256 |
|
|
257 |
/** |
|
244 | 258 |
* When Metacat changes the underlying store, we need to refresh the |
245 | 259 |
* in-memory representation of it. |
246 | 260 |
* @param guid |
src/edu/ucsb/nceas/metacat/replication/ReplicationHandler.java | ||
---|---|---|
401 | 401 |
// save the system metadata |
402 | 402 |
logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue()); |
403 | 403 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
404 |
// submit for indexing |
|
405 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
404 | 406 |
} |
405 | 407 |
|
406 | 408 |
docinfoParser.parse(new InputSource(new StringReader(docInfoStr))); |
... | ... | |
578 | 580 |
} |
579 | 581 |
// save the system metadata |
580 | 582 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
583 |
// submit for indexing |
|
584 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
581 | 585 |
|
582 | 586 |
} |
583 | 587 |
|
... | ... | |
875 | 879 |
new ByteArrayInputStream(systemMetadataXML |
876 | 880 |
.getBytes("UTF-8"))); |
877 | 881 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
882 |
// submit for indexing |
|
883 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
878 | 884 |
} |
879 | 885 |
|
880 | 886 |
logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: " |
src/edu/ucsb/nceas/metacat/replication/ReplicationService.java | ||
---|---|---|
588 | 588 |
} |
589 | 589 |
// save the system metadata |
590 | 590 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
591 |
// submit for indexing |
|
592 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
591 | 593 |
} |
592 | 594 |
|
593 | 595 |
// dates |
... | ... | |
883 | 885 |
} |
884 | 886 |
// save the system metadata |
885 | 887 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
888 |
// submit for indexing |
|
889 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
886 | 890 |
} |
887 | 891 |
|
888 | 892 |
// process the access control |
... | ... | |
1153 | 1157 |
SystemMetadata.class, |
1154 | 1158 |
new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8"))); |
1155 | 1159 |
HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta); |
1160 |
// submit for indexing |
|
1161 |
HazelcastService.getInstance().getIndexQueue().add(sysMeta); |
|
1156 | 1162 |
} |
1157 | 1163 |
|
1158 | 1164 |
logReplication.info("ReplicationService.handleForceReplicateSystemMetadataRequest - processed guid: " + guid); |
Also available in: Unified diff
use an independent ISet<SystemMetadata> structure to communicate objects that should be indexed by metacat-index. https://projects.ecoinformatics.org/ecoinfo/issues/5943