Project

General

Profile

« Previous | Next » 

Revision 6446

move bulk of the Hazelcast code into HazelcastService from CNodeService so that it is centrall located - easier to manage and configure

View differences:

test/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastServiceTest.java
26 26
package edu.ucsb.nceas.metacat.dataone.hazelcast;
27 27

  
28 28
import java.io.ByteArrayInputStream;
29
import java.io.FileNotFoundException;
30 29
import java.io.InputStream;
31 30

  
32
import org.dataone.configuration.Settings;
31
import junit.framework.Test;
32
import junit.framework.TestSuite;
33

  
33 34
import org.dataone.service.types.v1.Identifier;
34 35
import org.dataone.service.types.v1.Session;
35 36
import org.dataone.service.types.v1.SystemMetadata;
36 37

  
37
import com.hazelcast.config.Config;
38
import com.hazelcast.config.FileSystemXmlConfig;
39 38
import com.hazelcast.core.Hazelcast;
40 39
import com.hazelcast.core.IMap;
41 40

  
42
import junit.framework.Test;
43
import junit.framework.TestSuite;
44 41
import edu.ucsb.nceas.MCTestCase;
45 42
import edu.ucsb.nceas.metacat.dataone.CNodeService;
46 43
import edu.ucsb.nceas.metacat.dataone.CNodeServiceTest;
......
57 54
			// initialize the configuration
58 55
			HazelcastService.getInstance();
59 56
			
60
			// initialize the entry listeners
61
			CNodeService.getInstance();
62 57
		} catch (Exception e) {
63 58
			e.printStackTrace();
64 59
			fail();
src/edu/ucsb/nceas/metacat/dataone/CNodeService.java
43 43
import org.dataone.service.exceptions.ServiceFailure;
44 44
import org.dataone.service.types.v1.Checksum;
45 45
import org.dataone.service.types.v1.Identifier;
46
import org.dataone.service.types.v1.Node;
47 46
import org.dataone.service.types.v1.NodeList;
48 47
import org.dataone.service.types.v1.NodeReference;
49 48
import org.dataone.service.types.v1.ObjectFormat;
......
59 58
import org.dataone.service.types.v1.Subject;
60 59
import org.dataone.service.types.v1.SystemMetadata;
61 60

  
62
import com.hazelcast.client.HazelcastClient;
63
import com.hazelcast.core.EntryEvent;
64
import com.hazelcast.core.EntryListener;
65
import com.hazelcast.core.Hazelcast;
66
import com.hazelcast.core.IMap;
67
import com.hazelcast.core.Member;
68
import com.hazelcast.partition.Partition;
69
import com.hazelcast.partition.PartitionService;
70 61
import com.hazelcast.query.SqlPredicate;
71 62

  
72 63
import edu.ucsb.nceas.metacat.EventLog;
73 64
import edu.ucsb.nceas.metacat.IdentifierManager;
74 65
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
75
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
76 67
import edu.ucsb.nceas.metacat.replication.ForceReplicationSystemMetadataHandler;
77
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
78 68

  
79 69
/**
80 70
 * Represents Metacat's implementation of the DataONE Coordinating Node 
......
84 74
 *
85 75
 */
86 76
public class CNodeService extends D1NodeService implements CNAuthorization,
87
    CNCore, CNRead, CNReplication, EntryListener<Identifier, SystemMetadata> {
77
    CNCore, CNRead, CNReplication {
88 78

  
89 79
  /* the instance of the CNodeService object */
90 80
  private static CNodeService instance = null;
91
  
92
  /* The instance of the Hazelcast client */
93
  private HazelcastClient hzClient;
94 81

  
95
  /* The name of the DataONE Hazelcast cluster group */
96
  private String groupName;
97

  
98
  /* The name of the DataONE Hazelcast cluster password */
99
  private String groupPassword;
100
  
101
  /* The name of the DataONE Hazelcast cluster IP addresses */
102
  private String addressList;
103
  
104
  /* The name of the node map */
105
  private String nodeMap;
106

  
107
  /* The name of the system metadata map */
108
  private String systemMetadataMap;
109
  
110
  /* The Hazelcast distributed task id generator namespace */
111
  private String taskIds;
112
  
113
  /* The Hazelcast distributed system metadata map */
114
  private IMap<NodeReference, Node> nodes;
115

  
116
  /* The Hazelcast distributed system metadata map */
117
  private IMap<Identifier, SystemMetadata> systemMetadata;
118
  
119
  /* The Hazelcast distributed pending replication tasks map*/
120
  private IMap<String, CNReplicationTask> pendingReplicationTasks;
121

  
122 82
  /* the logger instance */
123 83
  private Logger logMetacat = null;
124 84

  
......
141 101
  private CNodeService() {
142 102
    super();
143 103
    logMetacat = Logger.getLogger(CNodeService.class);
144
    
145
    // Get configuration properties on instantiation
146
    try {
147
      groupName = 
148
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
149
      groupPassword = 
150
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
151
      addressList = 
152
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
153
      nodeMap = 
154
        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
155
      systemMetadataMap = 
156
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
157

  
158
      // Become a DataONE-process cluster client
159
      //TODO: where should this be?
160
//      String[] addresses = addressList.split(",");
161
//      hzClient = 
162
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
163
//      nodes = hzClient.getMap(nodeMap);
164
//      pendingReplicationTasks = hzClient.getMap(pendingTasksQueue);
165
      
166
      // Get a reference to the shared system metadata map as a cluster member
167
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
168
      
169
      // Listen for changes to the system metadata map
170
      systemMetadata.addEntryListener(this, true);
171
      
172
    } catch (PropertyNotFoundException e) {
173

  
174
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
175
        "The error message was: " + e.getMessage();
176
      logMetacat.error(msg);
177
      
178
    }
179

  
180 104
        
181 105
  }
182 106
    
......
854 778
	    // target node subject, pid, and replicate permission
855 779
	    
856 780
	    Set<CNReplicationTask> tasks = 
857
	      (Set<CNReplicationTask>) this.pendingReplicationTasks.values(new SqlPredicate(query));
781
	      (Set<CNReplicationTask>) HazelcastService.getInstance().getPendingReplicationTasks().values(new SqlPredicate(query));
858 782
	    
859 783
	    // do we have a matching task?
860 784
	    if ( tasks.size() >= 1 ) {
......
866 790
    
867 791
  }
868 792

  
869
  /**
870
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
871
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
872
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
873
	 * 
874
	 * @param event - The EntryEvent that occurred
875
	 */
876
	@Override
877
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
878
		// handle as update - that method will create if necessary
879
		entryUpdated(event);
880
	}
881

  
882
  /**
883
   * Implement the EntryListener interface for Hazelcast, reponding to entry
884
   * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
885
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
886
   * 
887
   * @param event - The EntryEvent that occurred
888
   */
889
  @Override
890
  public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
891
    // nothing to do, entries are still in the backing store
892
    
893
  }
894

  
895
  /**
896
   * Implement the EntryListener interface for Hazelcast, reponding to entry
897
   * removed events in the hzSystemMetadata map.  Evaluate the entry and create
898
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
899
   * 
900
   * @param event - The EntryEvent that occurred
901
   */
902
  @Override
903
  public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
904
    // we don't remove objects
905
    
906
  }
907

  
908
  /**
909
   * Implement the EntryListener interface for Hazelcast, reponding to entry
910
   * updated events in the hzSystemMetadata map.  Evaluate the entry and create
911
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
912
   * 
913
   * @param event - The EntryEvent that occurred
914
   */
915
  @Override
916
  public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
917

  
918
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
919
		PartitionService partitionService = Hazelcast.getPartitionService();
920
		Partition partition = partitionService.getPartition(event.getKey());
921
		Member ownerMember = partition.getOwner();
922
		if (!ownerMember.localMember()) {
923
			// need to pull the entry into the local store
924
			logMetacat.debug("Saving entry locally: " + event.getKey().getValue());
925
			try {
926
				if (!IdentifierManager.getInstance().identifierExists(event.getKey().getValue())) {
927
					IdentifierManager.getInstance().createSystemMetadata(event.getValue());
928
				} else {
929
					IdentifierManager.getInstance().updateSystemMetadata(event.getValue());
930
				}
931
			} catch (McdbDocNotFoundException e) {
932
				logMetacat.error(
933
						"Could not save System Metadata to local store.", e);
934
			}
935
		}
936

  
937
		// TODO evaluate the type of system metadata change, decide if it
938
		// warrants a replication event, what type (DATA, METADATA, RESOURCE),
939
		// iteratively lock the PID, create and submit the tasks, and expect a
940
		// result back. Deal with exceptions.
941
		boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
942
		// TODO: do we need to do anything explicit here?
943
    
944
  }
945

  
946 793
}
src/edu/ucsb/nceas/metacat/dataone/hazelcast/HazelcastService.java
27 27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28 28

  
29 29
import org.apache.log4j.Logger;
30
import org.dataone.service.types.v1.Identifier;
31
import org.dataone.service.types.v1.Node;
32
import org.dataone.service.types.v1.NodeReference;
33
import org.dataone.service.types.v1.SystemMetadata;
30 34

  
35
import com.hazelcast.client.HazelcastClient;
31 36
import com.hazelcast.config.Config;
32 37
import com.hazelcast.config.FileSystemXmlConfig;
38
import com.hazelcast.core.EntryEvent;
39
import com.hazelcast.core.EntryListener;
33 40
import com.hazelcast.core.Hazelcast;
41
import com.hazelcast.core.IMap;
34 42
import com.hazelcast.core.InstanceEvent;
35 43
import com.hazelcast.core.InstanceListener;
44
import com.hazelcast.core.Member;
45
import com.hazelcast.partition.Partition;
46
import com.hazelcast.partition.PartitionService;
36 47

  
48
import edu.ucsb.nceas.metacat.IdentifierManager;
49
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
50
import edu.ucsb.nceas.metacat.dataone.CNReplicationTask;
51
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
37 52
import edu.ucsb.nceas.metacat.properties.PropertyService;
38 53
import edu.ucsb.nceas.metacat.shared.BaseService;
39 54
import edu.ucsb.nceas.metacat.shared.ServiceException;
55
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
40 56
/**
41 57
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
42 58
 */
43 59
public class HazelcastService extends BaseService
44
  implements InstanceListener {
60
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
45 61
  
46 62
  /* The instance of the logging class */
47 63
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
......
52 68
  /* The Hazelcast configuration */
53 69
  private Config hzConfig;
54 70
  
71
  /* The instance of the Hazelcast client */
72
  private HazelcastClient hzClient;
73

  
74
  /* The name of the DataONE Hazelcast cluster group */
75
  private String groupName;
76

  
77
  /* The name of the DataONE Hazelcast cluster password */
78
  private String groupPassword;
79
  
80
  /* The name of the DataONE Hazelcast cluster IP addresses */
81
  private String addressList;
82
  
83
  /* The name of the node map */
84
  private String nodeMap;
85

  
86
  /* The name of the system metadata map */
87
  private String systemMetadataMap;
88
  
89
  /* The Hazelcast distributed task id generator namespace */
90
  private String taskIds;
91
  
92
  /* The Hazelcast distributed system metadata map */
93
  private IMap<NodeReference, Node> nodes;
94

  
95
  /* The Hazelcast distributed system metadata map */
96
  private IMap<Identifier, SystemMetadata> systemMetadata;
97
  
98
  /* The Hazelcast distributed pending replication tasks map*/
99
  private IMap<String, CNReplicationTask> pendingReplicationTasks;
100
  
101
  
55 102
  /*
56 103
   * Constructor: Creates an instance of the hazelcast service. Since
57 104
   * this uses a singleton pattern, use getInstance() to gain the instance.
......
104 151
      String msg = e.getMessage();
105 152
      logMetacat.error(msg);
106 153
      throw new ServiceException(msg);
154
    }
107 155
    
156
    // Get configuration properties on instantiation
157
    try {
158
      groupName = 
159
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
160
      groupPassword = 
161
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
162
      addressList = 
163
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
164
      nodeMap = 
165
        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
166
      systemMetadataMap = 
167
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
168

  
169
      // Become a DataONE-process cluster client
170
      //TODO: where should this be?
171
//      String[] addresses = addressList.split(",");
172
//      hzClient = 
173
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
174
//      nodes = hzClient.getMap(nodeMap);
175
//      pendingReplicationTasks = hzClient.getMap(pendingTasksQueue);
176
      
177
      // Get a reference to the shared system metadata map as a cluster member
178
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
179
      
180
      // Listen for changes to the system metadata map
181
      systemMetadata.addEntryListener(this, true);
182
      
183
    } catch (PropertyNotFoundException e) {
184

  
185
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
186
        "The error message was: " + e.getMessage();
187
      logMetacat.error(msg);
188
      
108 189
    }
109
    
110
    return;
111
    
190
        
112 191
  }
113 192
  
193
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
194
	  return systemMetadata;
195
  }
196
  
197
  public IMap<String,CNReplicationTask> getPendingReplicationTasks() {
198
	  return pendingReplicationTasks;
199
  }
200
  
114 201
  /**
115 202
   * Indicate whether or not this service is refreshable.
116 203
   *
......
163 250
    Hazelcast.getLifecycleService().restart();
164 251
    
165 252
  }
253
  
254
  /**
255
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
256
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
257
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
258
	 * 
259
	 * @param event - The EntryEvent that occurred
260
	 */
261
	@Override
262
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
263
		// handle as update - that method will create if necessary
264
		entryUpdated(event);
265
	}
166 266

  
267
	/**
268
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
269
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
270
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
271
	 * 
272
	 * @param event - The EntryEvent that occurred
273
	 */
274
	@Override
275
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
276
	  // nothing to do, entries are still in the backing store
277
	  
278
	}
279
	
280
	/**
281
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
282
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
283
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
284
	 * 
285
	 * @param event - The EntryEvent that occurred
286
	 */
287
	@Override
288
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
289
	  // we don't remove objects
290
	  
291
	}
292
	
293
	/**
294
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
295
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
296
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
297
	 * 
298
	 * @param event - The EntryEvent that occurred
299
	 */
300
	@Override
301
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
302
	
303
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
304
			PartitionService partitionService = Hazelcast.getPartitionService();
305
			Partition partition = partitionService.getPartition(event.getKey());
306
			Member ownerMember = partition.getOwner();
307
			if (!ownerMember.localMember()) {
308
				// need to pull the entry into the local store
309
				logMetacat.debug("Saving entry locally: " + event.getKey().getValue());
310
				try {
311
					if (!IdentifierManager.getInstance().identifierExists(event.getKey().getValue())) {
312
						IdentifierManager.getInstance().createSystemMetadata(event.getValue());
313
					} else {
314
						IdentifierManager.getInstance().updateSystemMetadata(event.getValue());
315
					}
316
				} catch (McdbDocNotFoundException e) {
317
					logMetacat.error(
318
							"Could not save System Metadata to local store.", e);
319
				}
320
			}
321
	
322
			// TODO evaluate the type of system metadata change, decide if it
323
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
324
			// iteratively lock the PID, create and submit the tasks, and expect a
325
			// result back. Deal with exceptions.
326
			boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
327
			// TODO: do we need to do anything explicit here?
328
	  
329
	}
330

  
167 331
}

Also available in: Unified diff