Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2011-11-29 15:43:43 -0800 (Tue, 29 Nov 2011) $'
10
 * '$Revision: 6714 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.util.Collection;
31
import java.util.Date;
32

    
33
import org.apache.log4j.Logger;
34
import org.dataone.service.types.v1.Identifier;
35
import org.dataone.service.types.v1.Node;
36
import org.dataone.service.types.v1.NodeReference;
37
import org.dataone.service.types.v1.SystemMetadata;
38

    
39
import com.hazelcast.client.HazelcastClient;
40
import com.hazelcast.config.Config;
41
import com.hazelcast.config.FileSystemXmlConfig;
42
import com.hazelcast.core.EntryEvent;
43
import com.hazelcast.core.EntryListener;
44
import com.hazelcast.core.Hazelcast;
45
import com.hazelcast.core.HazelcastInstance;
46
import com.hazelcast.core.ILock;
47
import com.hazelcast.core.IMap;
48
import com.hazelcast.core.InstanceEvent;
49
import com.hazelcast.core.InstanceListener;
50
import com.hazelcast.core.Member;
51
import com.hazelcast.partition.Partition;
52
import com.hazelcast.partition.PartitionService;
53
import com.hazelcast.query.EntryObject;
54
import com.hazelcast.query.Predicate;
55
import com.hazelcast.query.PredicateBuilder;
56

    
57
import edu.ucsb.nceas.metacat.IdentifierManager;
58
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
59
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
60
import edu.ucsb.nceas.metacat.properties.PropertyService;
61
import edu.ucsb.nceas.metacat.shared.BaseService;
62
import edu.ucsb.nceas.metacat.shared.ServiceException;
63
import edu.ucsb.nceas.utilities.FileUtil;
64
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
65
/**
66
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
67
 */
68
public class HazelcastService extends BaseService
69
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
70
  
71
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
72

    
73
/* The instance of the logging class */
74
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
75
  
76
  /* The singleton instance of the hazelcast service */
77
  private static HazelcastService hzService = null;
78
  
79
  /* The Hazelcast configuration */
80
  private Config hzConfig;
81
  
82
  /* The instance of the Hazelcast client */
83
//  private HazelcastClient hzClient;
84

    
85
  /* The name of the DataONE Hazelcast cluster group */
86
  private String groupName;
87

    
88
  /* The name of the DataONE Hazelcast cluster password */
89
  private String groupPassword;
90
  
91
  /* The name of the DataONE Hazelcast cluster IP addresses */
92
  private String addressList;
93
  
94
  /* The name of the node map */
95
  private String nodeMap;
96

    
97
  /* The name of the system metadata map */
98
  private String systemMetadataMap;
99
  
100
  /* The Hazelcast distributed task id generator namespace */
101
  private String taskIds;
102
  
103
  /* The Hazelcast distributed node map */
104
  private IMap<NodeReference, Node> nodes;
105

    
106
  /* The Hazelcast distributed system metadata map */
107
  private IMap<Identifier, SystemMetadata> systemMetadata;
108
  
109
  private HazelcastInstance hzInstance;
110
      
111
  /*
112
   * Constructor: Creates an instance of the hazelcast service. Since
113
   * this uses a singleton pattern, use getInstance() to gain the instance.
114
   */
115
  private HazelcastService() {
116
    
117
    super();
118
    _serviceName="HazelcastService";
119
    
120
    try {
121
      init();
122
      
123
    } catch (ServiceException se) {
124
      logMetacat.error("There was a problem creating the HazelcastService. " +
125
                       "The error message was: " + se.getMessage());
126
      
127
    }
128
    
129
  }
130
  
131
  /**
132
   *  Get the instance of the HazelcastService that has been instantiated,
133
   *  or instantiate one if it has not been already.
134
   *
135
   * @return hazelcastService - The instance of the hazelcast service
136
   */
137
  public static HazelcastService getInstance(){
138
    
139
    if ( hzService == null ) {
140
      
141
      hzService = new HazelcastService();
142
      
143
    }
144
    return hzService;
145
  }
146
  
147
  /**
148
   * Initializes the Hazelcast service
149
   */
150
  public void init() throws ServiceException {
151
    
152
    logMetacat.debug("HazelcastService.doRefresh() called.");
153
    
154
	String configFileName = null;
155
	Config config = null;
156
	try {
157
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
158
		config = new FileSystemXmlConfig(configFileName);
159
	} catch (Exception e) {
160
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
161
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
162
		// make sure we have the config
163
		try {
164
			config = new FileSystemXmlConfig(configFileName);
165
		} catch (FileNotFoundException e1) {
166
			String msg = e.getMessage();
167
			logMetacat.error(msg);
168
			throw new ServiceException(msg);
169
		}
170
	}
171

    
172
	Hazelcast.init(config);
173
  this.hzInstance = Hazelcast.getDefaultInstance();
174
  
175
    // Get configuration properties on instantiation
176
    try {
177
      groupName = 
178
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
179
      groupPassword = 
180
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
181
      addressList = 
182
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
183
//      nodeMap = 
184
//        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
185
      systemMetadataMap = 
186
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
187

    
188
      // Become a DataONE-process cluster client
189
//      String[] addresses = addressList.split(",");
190
//      hzClient = 
191
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
192
//      nodes = hzClient.getMap(nodeMap);
193
      
194
      // Get a reference to the shared system metadata map as a cluster member
195
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
196
      
197
      // Listen for changes to the system metadata map
198
      systemMetadata.addEntryListener(this, true);
199
      
200
    } catch (PropertyNotFoundException e) {
201

    
202
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
203
        "The error message was: " + e.getMessage();
204
      logMetacat.error(msg);
205
      
206
    }
207
    
208
    // make sure we have all metadata locally
209
    try {
210
	    // add index for resynch() method
211
    	// can only be added once, TODO: figure out how this works
212
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
213
		//resynch();
214
	} catch (Exception e) {
215
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
216
		logMetacat.error(msg, e);
217
	}
218
        
219
  }
220
  
221
  /**
222
   * Get the system metadata map
223
   * 
224
   * @return systemMetadata - the hazelcast map of system metadata
225
   * @param identifier - the identifier of the object as a string
226
   */
227
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
228
	  return systemMetadata;
229
  }
230
  
231
  /**
232
   * When Metacat changes the underlying store, we need to refresh the
233
   * in-memory representation of it.
234
   * @param guid
235
   */
236
  public void refreshSystemMetadataEntry(String guid) {
237
	Identifier identifier = new Identifier();
238
	identifier.setValue(guid);
239
	// force hazelcast to update system metadata in memory from the store
240
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
241
	
242
  }
243

    
244
  public ILock getLock(String identifier) {
245
    
246
    ILock lock = null;
247
    
248
    try {
249
        lock = getInstance().getHazelcastInstance().getLock(identifier);
250
        
251
    } catch (RuntimeException e) {
252
        logMetacat.info("Couldn't get a lock for identifier " + 
253
            identifier + " !!");
254
    }
255
    return lock;
256
      
257
  }
258
  
259
  /**
260
   * Get the DataONE hazelcast node map
261
   * @return nodes - the hazelcast map of nodes
262
   */
263
//  public IMap<NodeReference, Node> getNodesMap() {
264
//	  return nodes;
265
//  }
266
  
267
  /**
268
   * Indicate whether or not this service is refreshable.
269
   *
270
   * @return refreshable - the boolean refreshable status
271
   */
272
  public boolean refreshable() {
273
    // TODO: Determine the consequences of restarting the Hazelcast instance
274
    // Set this to true if it's okay to drop from the cluster, lose the maps,
275
    // and start back up again
276
    return false;
277
    
278
  }
279
  
280
  /**
281
   * Stop the HazelcastService. When stopped, the service will no longer
282
   * respond to requests.
283
   */
284
  public void stop() throws ServiceException {
285
    
286
    Hazelcast.getLifecycleService().shutdown();
287
    
288
  }
289

    
290
  /**
291
   * Listen for new Hazelcast member events
292
   */
293
  @Override
294
  public void instanceCreated(InstanceEvent event) {
295
    logMetacat.info("New Hazelcast instance created: " +
296
      event.getInstance().getId() + ", " +
297
      event.getInstance().getInstanceType());
298
    
299
  }
300

    
301
  @Override
302
  public void instanceDestroyed(InstanceEvent event) {
303
    logMetacat.info("Hazelcast instance removed: " +
304
        event.getInstance().getId() + ", " +
305
        event.getInstance().getInstanceType());
306
    
307
  }
308

    
309
  public HazelcastInstance getHazelcastInstance() {
310
      return this.hzInstance;
311
      
312
  }
313
  
314
  /**
315
   * Refresh the Hazelcast service by restarting it
316
   */
317
  @Override
318
  protected void doRefresh() throws ServiceException {
319

    
320
    // TODO: verify that the correct config file is still used
321
    Hazelcast.getLifecycleService().restart();
322
    
323
  }
324
  
325
  /**
326
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
327
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
328
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
329
	 * 
330
	 * @param event - The EntryEvent that occurred
331
	 */
332
	@Override
333
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
334
		// handle as update - that method will create if necessary
335
		entryUpdated(event);
336
	}
337

    
338
	/**
339
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
340
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
341
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
342
	 * 
343
	 * @param event - The EntryEvent that occurred
344
	 */
345
	@Override
346
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
347
	  // nothing to do, entries are still in the backing store
348
	  
349
	}
350
	
351
	/**
352
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
353
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
354
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
355
	 * 
356
	 * @param event - The EntryEvent that occurred
357
	 */
358
	@Override
359
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
360
		// we typically don't remove objects in Metacat, but can remove System Metadata
361
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
362
	  
363
	}
364
	
365
	/**
366
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
367
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
368
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
369
	 * 
370
	 * @param event - The EntryEvent that occurred
371
	 */
372
	@Override
373
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
374
	
375
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
376
			PartitionService partitionService = Hazelcast.getPartitionService();
377
			Partition partition = partitionService.getPartition(event.getKey());
378
			Member ownerMember = partition.getOwner();
379
			if (!ownerMember.localMember()) {
380
				// need to pull the entry into the local store
381
				saveLocally(event.getValue());
382
			}
383
	
384
			// TODO evaluate the type of system metadata change, decide if it
385
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
386
			// iteratively lock the PID, create and submit the tasks, and expect a
387
			// result back. Deal with exceptions.
388
			SystemMetadata sysmeta = event.getValue();
389
			if (sysmeta != null) {
390
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
391
				// TODO: do we need to do anything explicit here?
392
			}
393
	  
394
	}
395
	
396
	/**
397
	 * Save SystemMetadata to local store if needed
398
	 * @param sm
399
	 */
400
	private void saveLocally(SystemMetadata sm) {
401
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
402
		try {
403
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
404
				IdentifierManager.getInstance().createSystemMetadata(sm);
405
			} else {
406
				IdentifierManager.getInstance().updateSystemMetadata(sm);
407
			}
408
		} catch (McdbDocNotFoundException e) {
409
			logMetacat.error(
410
					"Could not save System Metadata to local store.", e);
411
		}
412
	}
413
	
414
	public void resynch() throws Exception {
415
		
416
		// get the CN that is online
417
		// TODO: do we even need to use a specific CN?
418
		// All the System Metadata records should be available via the shared map
419
//		NodeList allNodes = CNodeService.getInstance().listNodes();
420
//		Node onlineCN = null;
421
//		for (Node node: allNodes.getNodeList()) {
422
//			if (node.getType().equals(NodeType.CN)) {
423
//				if (node.getState().equals(NodeState.UP)) {
424
//					onlineCN = node;
425
//					break;
426
//				}
427
//			}
428
//		}
429
		
430
		// get the list of items that have changed since X
431
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
432
		EntryObject e = new PredicateBuilder().getEntryObject();
433
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
434
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
435
		for (SystemMetadata sm: updatedSystemMetadata) {
436
			saveLocally(sm);
437
		}
438
	}
439

    
440
}
(1-1/3)