Project

General

Profile

1 6398 cjones
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 *
8
 *   '$Author$'
9
 *     '$Date$'
10
 * '$Revision$'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26
27 6399 leinfelder
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28 6398 cjones
29 6459 leinfelder
import java.util.Collection;
30
import java.util.Date;
31
32 6398 cjones
import org.apache.log4j.Logger;
33 6446 leinfelder
import org.dataone.service.types.v1.Identifier;
34
import org.dataone.service.types.v1.Node;
35
import org.dataone.service.types.v1.NodeReference;
36
import org.dataone.service.types.v1.SystemMetadata;
37 6398 cjones
38 6446 leinfelder
import com.hazelcast.client.HazelcastClient;
39 6401 cjones
import com.hazelcast.config.Config;
40 6437 leinfelder
import com.hazelcast.config.FileSystemXmlConfig;
41 6446 leinfelder
import com.hazelcast.core.EntryEvent;
42
import com.hazelcast.core.EntryListener;
43 6398 cjones
import com.hazelcast.core.Hazelcast;
44 6446 leinfelder
import com.hazelcast.core.IMap;
45 6398 cjones
import com.hazelcast.core.InstanceEvent;
46
import com.hazelcast.core.InstanceListener;
47 6446 leinfelder
import com.hazelcast.core.Member;
48
import com.hazelcast.partition.Partition;
49
import com.hazelcast.partition.PartitionService;
50 6459 leinfelder
import com.hazelcast.query.EntryObject;
51
import com.hazelcast.query.Predicate;
52
import com.hazelcast.query.PredicateBuilder;
53 6398 cjones
54 6446 leinfelder
import edu.ucsb.nceas.metacat.IdentifierManager;
55
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
56
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
57 6401 cjones
import edu.ucsb.nceas.metacat.properties.PropertyService;
58 6398 cjones
import edu.ucsb.nceas.metacat.shared.BaseService;
59
import edu.ucsb.nceas.metacat.shared.ServiceException;
60 6446 leinfelder
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
61 6398 cjones
/**
62
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
63
 */
64
public class HazelcastService extends BaseService
65 6446 leinfelder
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
66 6398 cjones
67 6459 leinfelder
  private static final String SINCE_PROPERTY = "dateSysMetaModified";
68
69
/* The instance of the logging class */
70 6398 cjones
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
71
72
  /* The singleton instance of the hazelcast service */
73 6401 cjones
  private static HazelcastService hzService = null;
74 6398 cjones
75 6401 cjones
  /* The Hazelcast configuration */
76
  private Config hzConfig;
77 6398 cjones
78 6446 leinfelder
  /* The instance of the Hazelcast client */
79
  private HazelcastClient hzClient;
80
81
  /* The name of the DataONE Hazelcast cluster group */
82
  private String groupName;
83
84
  /* The name of the DataONE Hazelcast cluster password */
85
  private String groupPassword;
86
87
  /* The name of the DataONE Hazelcast cluster IP addresses */
88
  private String addressList;
89
90
  /* The name of the node map */
91
  private String nodeMap;
92
93
  /* The name of the system metadata map */
94
  private String systemMetadataMap;
95
96
  /* The Hazelcast distributed task id generator namespace */
97
  private String taskIds;
98
99
  /* The Hazelcast distributed system metadata map */
100
  private IMap<NodeReference, Node> nodes;
101
102
  /* The Hazelcast distributed system metadata map */
103
  private IMap<Identifier, SystemMetadata> systemMetadata;
104
105
  /* The Hazelcast distributed pending replication tasks map*/
106
  private IMap<String, CNReplicationTask> pendingReplicationTasks;
107
108
109 6398 cjones
  /*
110
   * Constructor: Creates an instance of the hazelcast service. Since
111
   * this uses a singleton pattern, use getInstance() to gain the instance.
112
   */
113
  private HazelcastService() {
114
115
    super();
116
    _serviceName="HazelcastService";
117
118
    try {
119 6401 cjones
      init();
120 6398 cjones
121
    } catch (ServiceException se) {
122 6437 leinfelder
      logMetacat.error("There was a problem creating the HazelcastService. " +
123 6398 cjones
                       "The error message was: " + se.getMessage());
124
125
    }
126
127
  }
128
129
  /**
130
   *  Get the instance of the HazelcastService that has been instantiated,
131
   *  or instantiate one if it has not been already.
132
   *
133
   * @return hazelcastService - The instance of the hazelcast service
134
   */
135
  public static HazelcastService getInstance(){
136
137 6401 cjones
    if ( hzService == null ) {
138 6398 cjones
139 6401 cjones
      hzService = new HazelcastService();
140 6398 cjones
141
    }
142 6401 cjones
    return hzService;
143 6398 cjones
  }
144
145
  /**
146 6401 cjones
   * Initializes the Hazelcast service
147 6398 cjones
   */
148 6401 cjones
  public void init() throws ServiceException {
149 6398 cjones
150
    logMetacat.debug("HazelcastService.doRefresh() called.");
151
152 6401 cjones
    try {
153 6437 leinfelder
    	String configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
154
//    	System.setProperty("hazelcast.config", configFileName);
155
		Config config = new FileSystemXmlConfig(configFileName);
156
		Hazelcast.init(config);
157
    } catch (Exception e) {
158
      String msg = e.getMessage();
159 6407 cjones
      logMetacat.error(msg);
160
      throw new ServiceException(msg);
161 6446 leinfelder
    }
162 6398 cjones
163 6446 leinfelder
    // Get configuration properties on instantiation
164
    try {
165
      groupName =
166
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
167
      groupPassword =
168
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
169
      addressList =
170
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
171
      nodeMap =
172
        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
173
      systemMetadataMap =
174
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
175
176
      // Become a DataONE-process cluster client
177
      //TODO: where should this be?
178
//      String[] addresses = addressList.split(",");
179
//      hzClient =
180
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
181
//      nodes = hzClient.getMap(nodeMap);
182
//      pendingReplicationTasks = hzClient.getMap(pendingTasksQueue);
183
184
      // Get a reference to the shared system metadata map as a cluster member
185
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
186
187
      // Listen for changes to the system metadata map
188
      systemMetadata.addEntryListener(this, true);
189
190
    } catch (PropertyNotFoundException e) {
191
192
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
193
        "The error message was: " + e.getMessage();
194
      logMetacat.error(msg);
195
196 6401 cjones
    }
197 6459 leinfelder
198
    // make sure we have all metadata locally
199
    try {
200
	    // add index for resynch() method
201
	    systemMetadata.addIndex(SINCE_PROPERTY, true);
202
		resynch();
203
	} catch (Exception e) {
204
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
205
		logMetacat.error(msg, e);
206
	}
207 6446 leinfelder
208 6398 cjones
  }
209
210 6446 leinfelder
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
211
	  return systemMetadata;
212
  }
213
214
  public IMap<String,CNReplicationTask> getPendingReplicationTasks() {
215
	  return pendingReplicationTasks;
216
  }
217
218 6398 cjones
  /**
219
   * Indicate whether or not this service is refreshable.
220
   *
221
   * @return refreshable - the boolean refreshable status
222
   */
223
  public boolean refreshable() {
224 6401 cjones
    // TODO: Determine the consequences of restarting the Hazelcast instance
225 6407 cjones
    // Set this to true if it's okay to drop from the cluster, lose the maps,
226
    // and start back up again
227
    return false;
228 6398 cjones
229
  }
230
231
  /**
232
   * Stop the HazelcastService. When stopped, the service will no longer
233
   * respond to requests.
234
   */
235
  public void stop() throws ServiceException {
236
237
    Hazelcast.getLifecycleService().shutdown();
238
239
  }
240
241
  /**
242
   * Listen for new Hazelcast member events
243
   */
244 6407 cjones
  @Override
245 6398 cjones
  public void instanceCreated(InstanceEvent event) {
246 6407 cjones
    logMetacat.info("New Hazelcast instance created: " +
247
      event.getInstance().getId() + ", " +
248
      event.getInstance().getInstanceType());
249
250 6398 cjones
  }
251
252 6407 cjones
  @Override
253 6398 cjones
  public void instanceDestroyed(InstanceEvent event) {
254 6407 cjones
    logMetacat.info("Hazelcast instance removed: " +
255
        event.getInstance().getId() + ", " +
256
        event.getInstance().getInstanceType());
257
258 6398 cjones
  }
259 6407 cjones
260
  /**
261
   * Refresh the Hazelcast service by restarting it
262
   */
263
  @Override
264 6401 cjones
  protected void doRefresh() throws ServiceException {
265
266 6407 cjones
    // TODO: verify that the correct config file is still used
267 6401 cjones
    Hazelcast.getLifecycleService().restart();
268 6407 cjones
269 6401 cjones
  }
270 6446 leinfelder
271
  /**
272
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
273
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
274
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
275
	 *
276
	 * @param event - The EntryEvent that occurred
277
	 */
278
	@Override
279
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
280
		// handle as update - that method will create if necessary
281
		entryUpdated(event);
282
	}
283 6401 cjones
284 6446 leinfelder
	/**
285
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
286
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
287
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
288
	 *
289
	 * @param event - The EntryEvent that occurred
290
	 */
291
	@Override
292
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
293
	  // nothing to do, entries are still in the backing store
294
295
	}
296
297
	/**
298
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
299
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
300
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
301
	 *
302
	 * @param event - The EntryEvent that occurred
303
	 */
304
	@Override
305
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
306
	  // we don't remove objects
307
308
	}
309
310
	/**
311
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
312
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
313
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
314
	 *
315
	 * @param event - The EntryEvent that occurred
316
	 */
317
	@Override
318
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
319
320
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
321
			PartitionService partitionService = Hazelcast.getPartitionService();
322
			Partition partition = partitionService.getPartition(event.getKey());
323
			Member ownerMember = partition.getOwner();
324
			if (!ownerMember.localMember()) {
325
				// need to pull the entry into the local store
326 6459 leinfelder
				saveLocally(event.getValue());
327 6446 leinfelder
			}
328
329
			// TODO evaluate the type of system metadata change, decide if it
330
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
331
			// iteratively lock the PID, create and submit the tasks, and expect a
332
			// result back. Deal with exceptions.
333
			boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
334
			// TODO: do we need to do anything explicit here?
335
336
	}
337 6459 leinfelder
338
	/**
339
	 * Save SystemMetadata to local store if needed
340
	 * @param sm
341
	 */
342
	private void saveLocally(SystemMetadata sm) {
343
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
344
		try {
345
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
346
				IdentifierManager.getInstance().createSystemMetadata(sm);
347
			} else {
348
				IdentifierManager.getInstance().updateSystemMetadata(sm);
349
			}
350
		} catch (McdbDocNotFoundException e) {
351
			logMetacat.error(
352
					"Could not save System Metadata to local store.", e);
353
		}
354
	}
355
356
	public void resynch() throws Exception {
357
358
		// get the CN that is online
359
		// TODO: do we even need to use a specific CN?
360
		// All the System Metadata records should be available via the shared map
361
//		NodeList allNodes = CNodeService.getInstance().listNodes();
362
//		Node onlineCN = null;
363
//		for (Node node: allNodes.getNodeList()) {
364
//			if (node.getType().equals(NodeType.CN)) {
365
//				if (node.getState().equals(NodeState.UP)) {
366
//					onlineCN = node;
367
//					break;
368
//				}
369
//			}
370
//		}
371
372
		// get the list of items that have changed since X
373
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
374
		EntryObject e = new PredicateBuilder().getEntryObject();
375
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
376
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
377
		for (SystemMetadata sm: updatedSystemMetadata) {
378
			saveLocally(sm);
379
		}
380
	}
381 6446 leinfelder
382 6398 cjones
}