Project

General

Profile

« Previous | Next » 

Revision 6446

move bulk of the Hazelcast code into HazelcastService from CNodeService so that it is centrall located - easier to manage and configure

View differences:

CNodeService.java
43 43
import org.dataone.service.exceptions.ServiceFailure;
44 44
import org.dataone.service.types.v1.Checksum;
45 45
import org.dataone.service.types.v1.Identifier;
46
import org.dataone.service.types.v1.Node;
47 46
import org.dataone.service.types.v1.NodeList;
48 47
import org.dataone.service.types.v1.NodeReference;
49 48
import org.dataone.service.types.v1.ObjectFormat;
......
59 58
import org.dataone.service.types.v1.Subject;
60 59
import org.dataone.service.types.v1.SystemMetadata;
61 60

  
62
import com.hazelcast.client.HazelcastClient;
63
import com.hazelcast.core.EntryEvent;
64
import com.hazelcast.core.EntryListener;
65
import com.hazelcast.core.Hazelcast;
66
import com.hazelcast.core.IMap;
67
import com.hazelcast.core.Member;
68
import com.hazelcast.partition.Partition;
69
import com.hazelcast.partition.PartitionService;
70 61
import com.hazelcast.query.SqlPredicate;
71 62

  
72 63
import edu.ucsb.nceas.metacat.EventLog;
73 64
import edu.ucsb.nceas.metacat.IdentifierManager;
74 65
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
75
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
76 67
import edu.ucsb.nceas.metacat.replication.ForceReplicationSystemMetadataHandler;
77
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
78 68

  
79 69
/**
80 70
 * Represents Metacat's implementation of the DataONE Coordinating Node 
......
84 74
 *
85 75
 */
86 76
public class CNodeService extends D1NodeService implements CNAuthorization,
87
    CNCore, CNRead, CNReplication, EntryListener<Identifier, SystemMetadata> {
77
    CNCore, CNRead, CNReplication {
88 78

  
89 79
  /* the instance of the CNodeService object */
90 80
  private static CNodeService instance = null;
91
  
92
  /* The instance of the Hazelcast client */
93
  private HazelcastClient hzClient;
94 81

  
95
  /* The name of the DataONE Hazelcast cluster group */
96
  private String groupName;
97

  
98
  /* The name of the DataONE Hazelcast cluster password */
99
  private String groupPassword;
100
  
101
  /* The name of the DataONE Hazelcast cluster IP addresses */
102
  private String addressList;
103
  
104
  /* The name of the node map */
105
  private String nodeMap;
106

  
107
  /* The name of the system metadata map */
108
  private String systemMetadataMap;
109
  
110
  /* The Hazelcast distributed task id generator namespace */
111
  private String taskIds;
112
  
113
  /* The Hazelcast distributed system metadata map */
114
  private IMap<NodeReference, Node> nodes;
115

  
116
  /* The Hazelcast distributed system metadata map */
117
  private IMap<Identifier, SystemMetadata> systemMetadata;
118
  
119
  /* The Hazelcast distributed pending replication tasks map*/
120
  private IMap<String, CNReplicationTask> pendingReplicationTasks;
121

  
122 82
  /* the logger instance */
123 83
  private Logger logMetacat = null;
124 84

  
......
141 101
  private CNodeService() {
142 102
    super();
143 103
    logMetacat = Logger.getLogger(CNodeService.class);
144
    
145
    // Get configuration properties on instantiation
146
    try {
147
      groupName = 
148
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
149
      groupPassword = 
150
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
151
      addressList = 
152
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
153
      nodeMap = 
154
        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
155
      systemMetadataMap = 
156
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
157

  
158
      // Become a DataONE-process cluster client
159
      //TODO: where should this be?
160
//      String[] addresses = addressList.split(",");
161
//      hzClient = 
162
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
163
//      nodes = hzClient.getMap(nodeMap);
164
//      pendingReplicationTasks = hzClient.getMap(pendingTasksQueue);
165
      
166
      // Get a reference to the shared system metadata map as a cluster member
167
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
168
      
169
      // Listen for changes to the system metadata map
170
      systemMetadata.addEntryListener(this, true);
171
      
172
    } catch (PropertyNotFoundException e) {
173

  
174
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
175
        "The error message was: " + e.getMessage();
176
      logMetacat.error(msg);
177
      
178
    }
179

  
180 104
        
181 105
  }
182 106
    
......
854 778
	    // target node subject, pid, and replicate permission
855 779
	    
856 780
	    Set<CNReplicationTask> tasks = 
857
	      (Set<CNReplicationTask>) this.pendingReplicationTasks.values(new SqlPredicate(query));
781
	      (Set<CNReplicationTask>) HazelcastService.getInstance().getPendingReplicationTasks().values(new SqlPredicate(query));
858 782
	    
859 783
	    // do we have a matching task?
860 784
	    if ( tasks.size() >= 1 ) {
......
866 790
    
867 791
  }
868 792

  
869
  /**
870
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
871
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
872
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
873
	 * 
874
	 * @param event - The EntryEvent that occurred
875
	 */
876
	@Override
877
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
878
		// handle as update - that method will create if necessary
879
		entryUpdated(event);
880
	}
881

  
882
  /**
883
   * Implement the EntryListener interface for Hazelcast, reponding to entry
884
   * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
885
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
886
   * 
887
   * @param event - The EntryEvent that occurred
888
   */
889
  @Override
890
  public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
891
    // nothing to do, entries are still in the backing store
892
    
893
  }
894

  
895
  /**
896
   * Implement the EntryListener interface for Hazelcast, reponding to entry
897
   * removed events in the hzSystemMetadata map.  Evaluate the entry and create
898
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
899
   * 
900
   * @param event - The EntryEvent that occurred
901
   */
902
  @Override
903
  public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
904
    // we don't remove objects
905
    
906
  }
907

  
908
  /**
909
   * Implement the EntryListener interface for Hazelcast, reponding to entry
910
   * updated events in the hzSystemMetadata map.  Evaluate the entry and create
911
   * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
912
   * 
913
   * @param event - The EntryEvent that occurred
914
   */
915
  @Override
916
  public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
917

  
918
		logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
919
		PartitionService partitionService = Hazelcast.getPartitionService();
920
		Partition partition = partitionService.getPartition(event.getKey());
921
		Member ownerMember = partition.getOwner();
922
		if (!ownerMember.localMember()) {
923
			// need to pull the entry into the local store
924
			logMetacat.debug("Saving entry locally: " + event.getKey().getValue());
925
			try {
926
				if (!IdentifierManager.getInstance().identifierExists(event.getKey().getValue())) {
927
					IdentifierManager.getInstance().createSystemMetadata(event.getValue());
928
				} else {
929
					IdentifierManager.getInstance().updateSystemMetadata(event.getValue());
930
				}
931
			} catch (McdbDocNotFoundException e) {
932
				logMetacat.error(
933
						"Could not save System Metadata to local store.", e);
934
			}
935
		}
936

  
937
		// TODO evaluate the type of system metadata change, decide if it
938
		// warrants a replication event, what type (DATA, METADATA, RESOURCE),
939
		// iteratively lock the PID, create and submit the tasks, and expect a
940
		// result back. Deal with exceptions.
941
		boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
942
		// TODO: do we need to do anything explicit here?
943
    
944
  }
945

  
946 793
}

Also available in: Unified diff