Project

General

Profile

1 6398 cjones
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 *
8
 *   '$Author$'
9
 *     '$Date$'
10
 * '$Revision$'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26
27 6399 leinfelder
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28 6398 cjones
29 6489 leinfelder
import java.io.FileNotFoundException;
30 6459 leinfelder
import java.util.Collection;
31
import java.util.Date;
32
33 6398 cjones
import org.apache.log4j.Logger;
34 6446 leinfelder
import org.dataone.service.types.v1.Identifier;
35
import org.dataone.service.types.v1.Node;
36
import org.dataone.service.types.v1.NodeReference;
37
import org.dataone.service.types.v1.SystemMetadata;
38 6398 cjones
39 6446 leinfelder
import com.hazelcast.client.HazelcastClient;
40 6401 cjones
import com.hazelcast.config.Config;
41 6437 leinfelder
import com.hazelcast.config.FileSystemXmlConfig;
42 6446 leinfelder
import com.hazelcast.core.EntryEvent;
43
import com.hazelcast.core.EntryListener;
44 6398 cjones
import com.hazelcast.core.Hazelcast;
45 6446 leinfelder
import com.hazelcast.core.IMap;
46 6398 cjones
import com.hazelcast.core.InstanceEvent;
47
import com.hazelcast.core.InstanceListener;
48 6446 leinfelder
import com.hazelcast.core.Member;
49
import com.hazelcast.partition.Partition;
50
import com.hazelcast.partition.PartitionService;
51 6459 leinfelder
import com.hazelcast.query.EntryObject;
52
import com.hazelcast.query.Predicate;
53
import com.hazelcast.query.PredicateBuilder;
54 6398 cjones
55 6446 leinfelder
import edu.ucsb.nceas.metacat.IdentifierManager;
56
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
57
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
58 6401 cjones
import edu.ucsb.nceas.metacat.properties.PropertyService;
59 6398 cjones
import edu.ucsb.nceas.metacat.shared.BaseService;
60
import edu.ucsb.nceas.metacat.shared.ServiceException;
61 6489 leinfelder
import edu.ucsb.nceas.utilities.FileUtil;
62 6446 leinfelder
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
63 6398 cjones
/**
64
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
65
 */
66
public class HazelcastService extends BaseService
67 6446 leinfelder
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
68 6398 cjones
69 6490 leinfelder
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
70 6459 leinfelder
71
/* The instance of the logging class */
72 6398 cjones
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
73
74
  /* The singleton instance of the hazelcast service */
75 6401 cjones
  private static HazelcastService hzService = null;
76 6398 cjones
77 6401 cjones
  /* The Hazelcast configuration */
78
  private Config hzConfig;
79 6398 cjones
80 6446 leinfelder
  /* The instance of the Hazelcast client */
81 6483 cjones
//  private HazelcastClient hzClient;
82 6446 leinfelder
83
  /* The name of the DataONE Hazelcast cluster group */
84
  private String groupName;
85
86
  /* The name of the DataONE Hazelcast cluster password */
87
  private String groupPassword;
88
89
  /* The name of the DataONE Hazelcast cluster IP addresses */
90
  private String addressList;
91
92
  /* The name of the node map */
93
  private String nodeMap;
94
95
  /* The name of the system metadata map */
96
  private String systemMetadataMap;
97
98
  /* The Hazelcast distributed task id generator namespace */
99
  private String taskIds;
100
101 6462 cjones
  /* The Hazelcast distributed node map */
102 6446 leinfelder
  private IMap<NodeReference, Node> nodes;
103
104
  /* The Hazelcast distributed system metadata map */
105
  private IMap<Identifier, SystemMetadata> systemMetadata;
106 6465 cjones
107 6398 cjones
  /*
108
   * Constructor: Creates an instance of the hazelcast service. Since
109
   * this uses a singleton pattern, use getInstance() to gain the instance.
110
   */
111
  private HazelcastService() {
112
113
    super();
114
    _serviceName="HazelcastService";
115
116
    try {
117 6401 cjones
      init();
118 6398 cjones
119
    } catch (ServiceException se) {
120 6437 leinfelder
      logMetacat.error("There was a problem creating the HazelcastService. " +
121 6398 cjones
                       "The error message was: " + se.getMessage());
122
123
    }
124
125
  }
126
127
  /**
128
   *  Get the instance of the HazelcastService that has been instantiated,
129
   *  or instantiate one if it has not been already.
130
   *
131
   * @return hazelcastService - The instance of the hazelcast service
132
   */
133
  public static HazelcastService getInstance(){
134
135 6401 cjones
    if ( hzService == null ) {
136 6398 cjones
137 6401 cjones
      hzService = new HazelcastService();
138 6398 cjones
139
    }
140 6401 cjones
    return hzService;
141 6398 cjones
  }
142
143
  /**
144 6401 cjones
   * Initializes the Hazelcast service
145 6398 cjones
   */
146 6401 cjones
  public void init() throws ServiceException {
147 6398 cjones
148
    logMetacat.debug("HazelcastService.doRefresh() called.");
149
150 6489 leinfelder
	String configFileName = null;
151
	Config config = null;
152
	try {
153
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
154
		config = new FileSystemXmlConfig(configFileName);
155
	} catch (Exception e) {
156
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
157
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
158
		// make sure we have the config
159
		try {
160
			config = new FileSystemXmlConfig(configFileName);
161
		} catch (FileNotFoundException e1) {
162
			String msg = e.getMessage();
163
			logMetacat.error(msg);
164
			throw new ServiceException(msg);
165
		}
166
	}
167
168
	Hazelcast.init(config);
169 6398 cjones
170 6446 leinfelder
    // Get configuration properties on instantiation
171
    try {
172
      groupName =
173
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
174
      groupPassword =
175
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
176
      addressList =
177
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
178 6483 cjones
//      nodeMap =
179
//        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
180 6446 leinfelder
      systemMetadataMap =
181
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
182
183
      // Become a DataONE-process cluster client
184 6483 cjones
//      String[] addresses = addressList.split(",");
185
//      hzClient =
186
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
187
//      nodes = hzClient.getMap(nodeMap);
188 6446 leinfelder
189
      // Get a reference to the shared system metadata map as a cluster member
190
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
191
192
      // Listen for changes to the system metadata map
193
      systemMetadata.addEntryListener(this, true);
194
195
    } catch (PropertyNotFoundException e) {
196
197
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
198
        "The error message was: " + e.getMessage();
199
      logMetacat.error(msg);
200
201 6401 cjones
    }
202 6459 leinfelder
203
    // make sure we have all metadata locally
204
    try {
205
	    // add index for resynch() method
206 6490 leinfelder
    	// can only be added once, TODO: figure out how this works
207
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
208
		//resynch();
209 6459 leinfelder
	} catch (Exception e) {
210
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
211
		logMetacat.error(msg, e);
212
	}
213 6446 leinfelder
214 6398 cjones
  }
215
216 6462 cjones
  /**
217
   * Get the system metadata map
218
   *
219
   * @return systemMetadata - the hazelcast map of system metadata
220
   */
221 6446 leinfelder
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
222
	  return systemMetadata;
223
  }
224
225 6462 cjones
  /**
226
   * Get the DataONE hazelcast node map
227
   * @return nodes - the hazelcast map of nodes
228
   */
229 6483 cjones
//  public IMap<NodeReference, Node> getNodesMap() {
230
//	  return nodes;
231
//  }
232 6446 leinfelder
233 6398 cjones
  /**
234
   * Indicate whether or not this service is refreshable.
235
   *
236
   * @return refreshable - the boolean refreshable status
237
   */
238
  public boolean refreshable() {
239 6401 cjones
    // TODO: Determine the consequences of restarting the Hazelcast instance
240 6407 cjones
    // Set this to true if it's okay to drop from the cluster, lose the maps,
241
    // and start back up again
242
    return false;
243 6398 cjones
244
  }
245
246
  /**
247
   * Stop the HazelcastService. When stopped, the service will no longer
248
   * respond to requests.
249
   */
250
  public void stop() throws ServiceException {
251
252
    Hazelcast.getLifecycleService().shutdown();
253
254
  }
255
256
  /**
257
   * Listen for new Hazelcast member events
258
   */
259 6471 jones
  @Override
260 6398 cjones
  public void instanceCreated(InstanceEvent event) {
261 6407 cjones
    logMetacat.info("New Hazelcast instance created: " +
262
      event.getInstance().getId() + ", " +
263
      event.getInstance().getInstanceType());
264
265 6398 cjones
  }
266
267 6471 jones
  @Override
268 6398 cjones
  public void instanceDestroyed(InstanceEvent event) {
269 6407 cjones
    logMetacat.info("Hazelcast instance removed: " +
270
        event.getInstance().getId() + ", " +
271
        event.getInstance().getInstanceType());
272
273 6398 cjones
  }
274 6407 cjones
275
  /**
276
   * Refresh the Hazelcast service by restarting it
277
   */
278 6471 jones
  @Override
279 6401 cjones
  protected void doRefresh() throws ServiceException {
280
281 6407 cjones
    // TODO: verify that the correct config file is still used
282 6401 cjones
    Hazelcast.getLifecycleService().restart();
283 6407 cjones
284 6401 cjones
  }
285 6446 leinfelder
286
  /**
287
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
288
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
289
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
290
	 *
291
	 * @param event - The EntryEvent that occurred
292
	 */
293 6471 jones
	@Override
294 6446 leinfelder
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
295
		// handle as update - that method will create if necessary
296
		entryUpdated(event);
297
	}
298 6401 cjones
299 6446 leinfelder
	/**
300
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
301
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
302
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
303
	 *
304
	 * @param event - The EntryEvent that occurred
305
	 */
306 6471 jones
	@Override
307 6446 leinfelder
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
308
	  // nothing to do, entries are still in the backing store
309
310
	}
311
312
	/**
313
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
314
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
315
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
316
	 *
317
	 * @param event - The EntryEvent that occurred
318
	 */
319 6471 jones
	@Override
320 6446 leinfelder
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
321
	  // we don't remove objects
322
323
	}
324
325
	/**
326
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
327
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
328
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
329
	 *
330
	 * @param event - The EntryEvent that occurred
331
	 */
332 6471 jones
	@Override
333 6446 leinfelder
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
334
335
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
336
			PartitionService partitionService = Hazelcast.getPartitionService();
337
			Partition partition = partitionService.getPartition(event.getKey());
338
			Member ownerMember = partition.getOwner();
339
			if (!ownerMember.localMember()) {
340
				// need to pull the entry into the local store
341 6459 leinfelder
				saveLocally(event.getValue());
342 6446 leinfelder
			}
343
344
			// TODO evaluate the type of system metadata change, decide if it
345
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
346
			// iteratively lock the PID, create and submit the tasks, and expect a
347
			// result back. Deal with exceptions.
348
			boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
349
			// TODO: do we need to do anything explicit here?
350
351
	}
352 6459 leinfelder
353
	/**
354
	 * Save SystemMetadata to local store if needed
355
	 * @param sm
356
	 */
357
	private void saveLocally(SystemMetadata sm) {
358
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
359
		try {
360
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
361
				IdentifierManager.getInstance().createSystemMetadata(sm);
362
			} else {
363
				IdentifierManager.getInstance().updateSystemMetadata(sm);
364
			}
365
		} catch (McdbDocNotFoundException e) {
366
			logMetacat.error(
367
					"Could not save System Metadata to local store.", e);
368
		}
369
	}
370
371
	public void resynch() throws Exception {
372
373
		// get the CN that is online
374
		// TODO: do we even need to use a specific CN?
375
		// All the System Metadata records should be available via the shared map
376
//		NodeList allNodes = CNodeService.getInstance().listNodes();
377
//		Node onlineCN = null;
378
//		for (Node node: allNodes.getNodeList()) {
379
//			if (node.getType().equals(NodeType.CN)) {
380
//				if (node.getState().equals(NodeState.UP)) {
381
//					onlineCN = node;
382
//					break;
383
//				}
384
//			}
385
//		}
386
387
		// get the list of items that have changed since X
388
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
389
		EntryObject e = new PredicateBuilder().getEntryObject();
390
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
391
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
392
		for (SystemMetadata sm: updatedSystemMetadata) {
393
			saveLocally(sm);
394
		}
395
	}
396 6446 leinfelder
397 6398 cjones
}