Project

General

Profile

« Previous | Next » 

Revision 7812

use an independent ISet<SystemMetadata> structure to communicate objects that should be indexed by metacat-index. https://projects.ecoinformatics.org/ecoinfo/issues/5943

View differences:

metacat-index/src/main/java/edu/ucsb/nceas/metacat/index/DistributedMapsFactory.java
58 58
    private static HazelcastClient hzClient = null;
59 59
    private static String hzSystemMetadata = null;
60 60
    private static String hzObjectPath = null;
61
    private static String hzIndexQueue = null;
61 62
    private static int waitingTime = IndexGenerator.WAITTIME;
62 63
    private static int maxAttempts = IndexGenerator.MAXWAITNUMBER;
63 64
    private static IMap<Identifier, SystemMetadata> systemMetadataMap = null;
64 65
    private static IMap<Identifier, String> objectPathMap = null;
66
    private static ISet<SystemMetadata> indexQueue = null;
65 67
    /* The name of the identifiers set */
66 68
    private static String identifiersSetName = IDENTIFIERSETNAME;
67 69
    /* The Hazelcast distributed identifiers set */
......
86 88
            log.warn("DistributedMapFactory.startHazelCastClient - couldn't read the name of the identifiersSet from the metacat.properties file since : "+e.getMessage()+". Default values will be used");
87 89
            identifiersSetName = IDENTIFIERSETNAME;
88 90
        }
89
     // get config values
91
        
92
        // the index queue name to listen to
93
        hzIndexQueue = Settings.getConfiguration().getString("index.hazelcast.indexqueue");        
94
        
95
        // get config values
90 96
        hzSystemMetadata = Settings.getConfiguration().getString(
91 97
                "dataone.hazelcast.storageCluster.systemMetadataMap");
92 98
        hzObjectPath = Settings.getConfiguration().getString(
......
231 237
        return identifiersSet;
232 238
    }
233 239
    
240
    /**
241
     * Get the indexQueue set from hazelcast client
242
     * @return the indexQueue
243
     * @throws FileNotFoundException
244
     * @throws ServiceFailure
245
     */
246
    public static ISet<SystemMetadata> getIndexQueue() throws FileNotFoundException, ServiceFailure {
247
        if(hzClient== null) {
248
            startHazelCastClient();
249
        }
250
        indexQueue = hzClient.getSet(hzIndexQueue);
251
        return indexQueue;
252
    }
253
    
234 254
}
metacat-index/src/main/java/edu/ucsb/nceas/metacat/index/SystemMetadataEventListener.java
38 38
import org.dataone.service.types.v1.Identifier;
39 39
import org.dataone.service.types.v1.SystemMetadata;
40 40

  
41
import com.hazelcast.core.EntryEvent;
42
import com.hazelcast.core.EntryListener;
43 41
import com.hazelcast.core.IMap;
42
import com.hazelcast.core.ISet;
43
import com.hazelcast.core.ItemEvent;
44
import com.hazelcast.core.ItemListener;
44 45

  
45
public class SystemMetadataEventListener implements EntryListener<Identifier, SystemMetadata> {
46
public class SystemMetadataEventListener implements ItemListener<SystemMetadata> {
46 47
	
47 48
	private static Log log = LogFactory.getLog(SystemMetadataEventListener.class);
48 49
	
......
94 95
    	
95 96
        // get shared structures and add listener
96 97
        IMap<Identifier, String> objectPathMap = DistributedMapsFactory.getObjectPathMap();
97
        IMap<Identifier, SystemMetadata> systemMetadataMap = DistributedMapsFactory.getSystemMetadataMap();
98
		systemMetadataMap.addEntryListener(this, true);
99
        log.info("System Metadata size: " + systemMetadataMap.size());
98
        ISet<SystemMetadata> indexQueue = DistributedMapsFactory.getIndexQueue();
99
        indexQueue.addItemListener(this, true);
100
        log.info("System Metadata size: " + indexQueue.size());
100 101
        log.info("Object path size:" + objectPathMap.size());
101 102
    }
102 103

  
......
107 108
     */
108 109
    public void stop() throws FileNotFoundException, ServiceFailure {
109 110
    	log.info("stopping index entry listener...");
110
    	DistributedMapsFactory.getSystemMetadataMap().removeEntryListener(this);
111
    	DistributedMapsFactory.getIndexQueue().removeItemListener(this);
111 112
    }
112 113
    
113 114
    /**
......
135 136
        return obsoletes;
136 137
    }
137 138
    
138
    
139
  
140
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> entryEvent) {
141
	    //System.out.println("===================================calling entryAdded method ");
142
	    log.info("===================================calling entryAdded method ");
143
		// use the same implementation for insert/update for now
144
		this.entryUpdated(entryEvent);
145 139

  
146
	}
147

  
148
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> entryEvent) {
149
		// remove from the index for now, this may be a temporary eviction
150
		this.entryRemoved(entryEvent);
151
		
152
	}
153

  
154
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> entryEvent) {
140
	public void itemRemoved(ItemEvent<SystemMetadata> entryEvent) {
155 141
		// remove from the index
156
		Identifier pid = entryEvent.getKey();		
142
		Identifier pid = entryEvent.getItem().getIdentifier();		
157 143
		try {
158 144
			solrIndex.remove(pid.getValue());
159 145
		} catch (Exception e) {
......
163 149
		
164 150
	}
165 151

  
166
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> entryEvent) {
152
	public void itemAdded(ItemEvent<SystemMetadata> entryEvent) {
167 153
	    //System.out.println("===================================calling entryUpdated method ");
168 154
	    log.info("===================================calling entryUpdated method ");
169 155
		// add to the index
170
		Identifier pid = entryEvent.getKey();
156
		Identifier pid = entryEvent.getItem().getIdentifier();
171 157
		//System.out.println("===================================update the document "+pid.getValue());
172 158
		log.info("===================================update the document "+pid.getValue());
173
		SystemMetadata systemMetadata = entryEvent.getValue();
159
		SystemMetadata systemMetadata = entryEvent.getItem();
174 160
		Identifier obsoletes = systemMetadata.getObsoletes();
175 161
		List<String> obsoletesChain = null;
176 162
		if (obsoletes != null) {
lib/metacat.properties
595 595
#If the inerval is less than 0, the thread would not run.
596 596
index.regenerate.interval=7200000
597 597
index.evenlog.classname=edu.ucsb.nceas.metacat.index.event.IndexEventFileLog
598
index.hazelcast.indexqueue=hzIndexQueue
598 599

  
599 600
############# SOLR Search Section ###########################################
600 601
#Embedded (default):
src/edu/ucsb/nceas/metacat/DocumentImpl.java
3421 3421
            	sysMeta.setArchived(true);
3422 3422
            	sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
3423 3423
				HazelcastService.getInstance().getSystemMetadataMap().put(guid, sysMeta);
3424
				// submit for indexing
3425
                HazelcastService.getInstance().getIndexQueue().add(sysMeta);
3424 3426
            }
3425 3427
            
3426 3428
            // clear cache after inserting or updating a document
src/edu/ucsb/nceas/metacat/MetacatHandler.java
1857 1857
                    // save it to the map
1858 1858
                    HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
1859 1859
                    
1860
                    // submit for indexing
1861
                    HazelcastService.getInstance().getIndexQueue().add(sysMeta);
1862
                    
1860 1863
                  } catch ( McdbDocNotFoundException dnfe ) {
1861 1864
                    logMetacat.debug(
1862 1865
                      "There was a problem finding the localId " +
......
3036 3039
							
3037 3040
					        // manage it in the store
3038 3041
                            HazelcastService.getInstance().getSystemMetadataMap().put(sm.getIdentifier(), sm);
3042
                            
3043
                            // submit for indexing
3044
                            HazelcastService.getInstance().getIndexQueue().add(sm);
3039 3045
					        
3040 3046
                        } catch (Exception ee) {
3041 3047
                            // If the file did not exist before this method was 
src/edu/ucsb/nceas/metacat/dataone/MNodeService.java
1191 1191
            // update the local copy of system metadata for the pid
1192 1192
            try {
1193 1193
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1194
                // submit for indexing
1195
                HazelcastService.getInstance().getIndexQueue().add(newSysMeta);
1194 1196
                logMetacat.info("Updated local copy of system metadata for pid " +
1195 1197
                    pid.getValue() + " after change notification from the CN.");
1196 1198
                
src/edu/ucsb/nceas/metacat/dataone/D1NodeService.java
424 424
    
425 425
    // save the sysmeta
426 426
    try {
427
      // lock and unlock of the pid happens in the subclass
427
    	// lock and unlock of the pid happens in the subclass
428 428
    	HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
429
    	
429
    	// submit for indexing
430
        HazelcastService.getInstance().getIndexQueue().add(sysmeta);
430 431
    } catch (Exception e) {
431 432
    	logMetacat.error("Problem creating system metadata: " + pid.getValue(), e);
432 433
        throw new ServiceFailure("1190", e.getMessage());
......
1232 1233
      try {
1233 1234
        // note: the calling subclass handles the map hazelcast lock/unlock
1234 1235
      	HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1235
      	
1236
      	// submit for indexing
1237
        HazelcastService.getInstance().getIndexQueue().add(sysmeta);
1236 1238
      } catch (Exception e) {
1237 1239
          throw new ServiceFailure("1190", e.getMessage());
1238 1240
          
......
1252 1254
        try {
1253 1255
            HazelcastService.getInstance().getSystemMetadataMap().lock(sysMeta.getIdentifier());
1254 1256
            HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
1255

  
1257
            // submit for indexing
1258
            HazelcastService.getInstance().getIndexQueue().add(sysMeta);
1256 1259
        } catch (Exception e) {
1257 1260
            throw new ServiceFailure("4862", e.getMessage());
1258 1261

  
......
1429 1432
              sysMeta.setArchived(true);
1430 1433
              sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1431 1434
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
1435
              // submit for indexing
1436
              HazelcastService.getInstance().getIndexQueue().add(sysMeta);
1432 1437
              
1433 1438
          } catch (McdbDocNotFoundException e) {
1434 1439
              throw new NotFound("1340", "The provided identifier was invalid.");
src/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastService.java
39 39
import org.apache.log4j.Logger;
40 40
import org.dataone.service.exceptions.InvalidSystemMetadata;
41 41
import org.dataone.service.types.v1.Identifier;
42
import org.dataone.service.types.v1.Node;
43
import org.dataone.service.types.v1.NodeReference;
44 42
import org.dataone.service.types.v1.SystemMetadata;
45 43

  
46 44
import com.hazelcast.config.Config;
......
103 101
  
104 102
  /* The Hazelcast distributed missing identifiers set */
105 103
  private ISet<Identifier> missingIdentifiers;
104
  
105
  /* The Hazelcast distributed index queue */
106
  private String hzIndexQueue;
107
  private ISet<SystemMetadata> indexQueue;
106 108

  
107 109
  private HazelcastInstance hzInstance;
108 110
      
......
193 195
      
194 196
      missingIdentifiers.addItemListener(this, true);
195 197
      
198
      hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue");
199
      indexQueue = this.hzInstance.getSet(hzIndexQueue);
200

  
196 201
      // Listen for changes to the system metadata map
197 202
      systemMetadata.addEntryListener(this, true);
198 203
      
......
241 246
  }
242 247

  
243 248
  /**
249
   * Get the index queue
250
   * @return the set of SystemMetadata to be indexed
251
   */
252
  public ISet<SystemMetadata> getIndexQueue() {
253
      return indexQueue;
254
      
255
  }
256
  
257
  /**
244 258
   * When Metacat changes the underlying store, we need to refresh the
245 259
   * in-memory representation of it.
246 260
   * @param guid
src/edu/ucsb/nceas/metacat/replication/ReplicationHandler.java
401 401
      	  // save the system metadata
402 402
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
403 403
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
404
      	  // submit for indexing
405
          HazelcastService.getInstance().getIndexQueue().add(sysMeta);
404 406
      }
405 407
   	  
406 408
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
......
578 580
    	  }
579 581
    	  // save the system metadata
580 582
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
583
    	  // submit for indexing
584
          HazelcastService.getInstance().getIndexQueue().add(sysMeta);
581 585

  
582 586
      }
583 587
   	  
......
875 879
								new ByteArrayInputStream(systemMetadataXML
876 880
										.getBytes("UTF-8")));
877 881
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
882
				// submit for indexing
883
                HazelcastService.getInstance().getIndexQueue().add(sysMeta);
878 884
			}
879 885

  
880 886
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
src/edu/ucsb/nceas/metacat/replication/ReplicationService.java
588 588
		      	}
589 589
				// save the system metadata
590 590
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
591
				// submit for indexing
592
                HazelcastService.getInstance().getIndexQueue().add(sysMeta);
591 593
			}
592 594
      
593 595
			// dates
......
883 885
	      	  }
884 886
	      	  // save the system metadata
885 887
	      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
888
	      	  // submit for indexing
889
              HazelcastService.getInstance().getIndexQueue().add(sysMeta);
886 890
	        }
887 891
	        
888 892
	        // process the access control
......
1153 1157
							SystemMetadata.class,
1154 1158
							new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
1155 1159
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
1160
				// submit for indexing
1161
                HazelcastService.getInstance().getIndexQueue().add(sysMeta);
1156 1162
			}
1157 1163
      
1158 1164
			logReplication.info("ReplicationService.handleForceReplicateSystemMetadataRequest - processed guid: " + guid);

Also available in: Unified diff