Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: cjones $'
9
 *     '$Date: 2012-01-06 07:23:59 -0800 (Fri, 06 Jan 2012) $'
10
 * '$Revision: 6859 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.util.Collection;
31
import java.util.Date;
32
import java.util.concurrent.locks.Lock;
33

    
34
import org.apache.log4j.Logger;
35
import org.dataone.service.types.v1.Identifier;
36
import org.dataone.service.types.v1.Node;
37
import org.dataone.service.types.v1.NodeReference;
38
import org.dataone.service.types.v1.SystemMetadata;
39

    
40
import com.hazelcast.config.Config;
41
import com.hazelcast.config.FileSystemXmlConfig;
42
import com.hazelcast.core.EntryEvent;
43
import com.hazelcast.core.EntryListener;
44
import com.hazelcast.core.Hazelcast;
45
import com.hazelcast.core.HazelcastInstance;
46
import com.hazelcast.core.IMap;
47
import com.hazelcast.core.InstanceEvent;
48
import com.hazelcast.core.InstanceListener;
49
import com.hazelcast.core.Member;
50
import com.hazelcast.partition.Partition;
51
import com.hazelcast.partition.PartitionService;
52
import com.hazelcast.query.EntryObject;
53
import com.hazelcast.query.Predicate;
54
import com.hazelcast.query.PredicateBuilder;
55

    
56
import edu.ucsb.nceas.metacat.IdentifierManager;
57
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
58
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
59
import edu.ucsb.nceas.metacat.properties.PropertyService;
60
import edu.ucsb.nceas.metacat.shared.BaseService;
61
import edu.ucsb.nceas.metacat.shared.ServiceException;
62
import edu.ucsb.nceas.utilities.FileUtil;
63
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
64
/**
65
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
66
 */
67
public class HazelcastService extends BaseService
68
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
69
  
70
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
71

    
72
/* The instance of the logging class */
73
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
74
  
75
  /* The singleton instance of the hazelcast service */
76
  private static HazelcastService hzService = null;
77
  
78
  /* The Hazelcast configuration */
79
  private Config hzConfig;
80
  
81
  /* The instance of the Hazelcast client */
82
//  private HazelcastClient hzClient;
83

    
84
  /* The name of the DataONE Hazelcast cluster group */
85
  private String groupName;
86

    
87
  /* The name of the DataONE Hazelcast cluster password */
88
  private String groupPassword;
89
  
90
  /* The name of the DataONE Hazelcast cluster IP addresses */
91
  private String addressList;
92
  
93
  /* The name of the node map */
94
  private String nodeMap;
95

    
96
  /* The name of the system metadata map */
97
  private String systemMetadataMap;
98
  
99
  /* The Hazelcast distributed task id generator namespace */
100
  private String taskIds;
101
  
102
  /* The Hazelcast distributed node map */
103
  private IMap<NodeReference, Node> nodes;
104

    
105
  /* The Hazelcast distributed system metadata map */
106
  private IMap<Identifier, SystemMetadata> systemMetadata;
107
  
108
  private HazelcastInstance hzInstance;
109
      
110
  /*
111
   * Constructor: Creates an instance of the hazelcast service. Since
112
   * this uses a singleton pattern, use getInstance() to gain the instance.
113
   */
114
  private HazelcastService() {
115
    
116
    super();
117
    _serviceName="HazelcastService";
118
    
119
    try {
120
      init();
121
      
122
    } catch (ServiceException se) {
123
      logMetacat.error("There was a problem creating the HazelcastService. " +
124
                       "The error message was: " + se.getMessage());
125
      
126
    }
127
    
128
  }
129
  
130
  /**
131
   *  Get the instance of the HazelcastService that has been instantiated,
132
   *  or instantiate one if it has not been already.
133
   *
134
   * @return hazelcastService - The instance of the hazelcast service
135
   */
136
  public static HazelcastService getInstance(){
137
    
138
    if ( hzService == null ) {
139
      
140
      hzService = new HazelcastService();
141
      
142
    }
143
    return hzService;
144
  }
145
  
146
  /**
147
   * Initializes the Hazelcast service
148
   */
149
  public void init() throws ServiceException {
150
    
151
    logMetacat.debug("HazelcastService.doRefresh() called.");
152
    
153
	String configFileName = null;
154
	Config config = null;
155
	try {
156
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
157
		config = new FileSystemXmlConfig(configFileName);
158
	} catch (Exception e) {
159
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
160
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
161
		// make sure we have the config
162
		try {
163
			config = new FileSystemXmlConfig(configFileName);
164
		} catch (FileNotFoundException e1) {
165
			String msg = e.getMessage();
166
			logMetacat.error(msg);
167
			throw new ServiceException(msg);
168
		}
169
	}
170

    
171
	Hazelcast.init(config);
172
  this.hzInstance = Hazelcast.getDefaultInstance();
173
  
174
    // Get configuration properties on instantiation
175
    try {
176
      groupName = 
177
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
178
      groupPassword = 
179
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
180
      addressList = 
181
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
182
//      nodeMap = 
183
//        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
184
      systemMetadataMap = 
185
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
186

    
187
      // Become a DataONE-process cluster client
188
//      String[] addresses = addressList.split(",");
189
//      hzClient = 
190
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
191
//      nodes = hzClient.getMap(nodeMap);
192
      
193
      // Get a reference to the shared system metadata map as a cluster member
194
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
195
      
196
      // Listen for changes to the system metadata map
197
      systemMetadata.addEntryListener(this, true);
198
      
199
    } catch (PropertyNotFoundException e) {
200

    
201
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
202
        "The error message was: " + e.getMessage();
203
      logMetacat.error(msg);
204
      
205
    }
206
    
207
    // make sure we have all metadata locally
208
    try {
209
	    // add index for resynch() method
210
    	// can only be added once, TODO: figure out how this works
211
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
212
		//resynch();
213
	} catch (Exception e) {
214
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
215
		logMetacat.error(msg, e);
216
	}
217
        
218
  }
219
  
220
  /**
221
   * Get the system metadata map
222
   * 
223
   * @return systemMetadata - the hazelcast map of system metadata
224
   * @param identifier - the identifier of the object as a string
225
   */
226
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
227
	  return systemMetadata;
228
  }
229
  
230
  /**
231
   * When Metacat changes the underlying store, we need to refresh the
232
   * in-memory representation of it.
233
   * @param guid
234
   */
235
  public void refreshSystemMetadataEntry(String guid) {
236
	Identifier identifier = new Identifier();
237
	identifier.setValue(guid);
238
	// force hazelcast to update system metadata in memory from the store
239
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
240
	
241
  }
242

    
243
  public Lock getLock(String identifier) {
244
    
245
    Lock lock = null;
246
    
247
    try {
248
        lock = getInstance().getHazelcastInstance().getLock(identifier);
249
        
250
    } catch (RuntimeException e) {
251
        logMetacat.info("Couldn't get a lock for identifier " + 
252
            identifier + " !!");
253
    }
254
    return lock;
255
      
256
  }
257
  
258
  /**
259
   * Get the DataONE hazelcast node map
260
   * @return nodes - the hazelcast map of nodes
261
   */
262
//  public IMap<NodeReference, Node> getNodesMap() {
263
//	  return nodes;
264
//  }
265
  
266
  /**
267
   * Indicate whether or not this service is refreshable.
268
   *
269
   * @return refreshable - the boolean refreshable status
270
   */
271
  public boolean refreshable() {
272
    // TODO: Determine the consequences of restarting the Hazelcast instance
273
    // Set this to true if it's okay to drop from the cluster, lose the maps,
274
    // and start back up again
275
    return false;
276
    
277
  }
278
  
279
  /**
280
   * Stop the HazelcastService. When stopped, the service will no longer
281
   * respond to requests.
282
   */
283
  public void stop() throws ServiceException {
284
    
285
    Hazelcast.getLifecycleService().shutdown();
286
    
287
  }
288

    
289
  /**
290
   * Listen for new Hazelcast member events
291
   */
292
  @Override
293
  public void instanceCreated(InstanceEvent event) {
294
    logMetacat.info("New Hazelcast instance created: " +
295
      event.getInstance().getId() + ", " +
296
      event.getInstance().getInstanceType());
297
    
298
  }
299

    
300
  @Override
301
  public void instanceDestroyed(InstanceEvent event) {
302
    logMetacat.info("Hazelcast instance removed: " +
303
        event.getInstance().getId() + ", " +
304
        event.getInstance().getInstanceType());
305
    
306
  }
307

    
308
  public HazelcastInstance getHazelcastInstance() {
309
      return this.hzInstance;
310
      
311
  }
312
  
313
  /**
314
   * Refresh the Hazelcast service by restarting it
315
   */
316
  @Override
317
  protected void doRefresh() throws ServiceException {
318

    
319
    // TODO: verify that the correct config file is still used
320
    Hazelcast.getLifecycleService().restart();
321
    
322
  }
323
  
324
  /**
325
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
326
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
327
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
328
	 * 
329
	 * @param event - The EntryEvent that occurred
330
	 */
331
	@Override
332
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
333
		// handle as update - that method will create if necessary
334
		entryUpdated(event);
335
	}
336

    
337
	/**
338
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
339
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
340
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
341
	 * 
342
	 * @param event - The EntryEvent that occurred
343
	 */
344
	@Override
345
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
346
	  // nothing to do, entries are still in the backing store
347
	  
348
	}
349
	
350
	/**
351
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
352
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
353
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
354
	 * 
355
	 * @param event - The EntryEvent that occurred
356
	 */
357
	@Override
358
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
359
		// we typically don't remove objects in Metacat, but can remove System Metadata
360
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
361
	  
362
	}
363
	
364
	/**
365
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
366
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
367
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
368
	 * 
369
	 * @param event - The EntryEvent that occurred
370
	 */
371
	@Override
372
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
373
	
374
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
375
			PartitionService partitionService = Hazelcast.getPartitionService();
376
			Partition partition = partitionService.getPartition(event.getKey());
377
			Member ownerMember = partition.getOwner();
378
			if (!ownerMember.localMember()) {
379
				// need to pull the entry into the local store
380
				saveLocally(event.getValue());
381
			}
382
	
383
			// TODO evaluate the type of system metadata change, decide if it
384
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
385
			// iteratively lock the PID, create and submit the tasks, and expect a
386
			// result back. Deal with exceptions.
387
			SystemMetadata sysmeta = event.getValue();
388
			if (sysmeta != null) {
389
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
390
				// TODO: do we need to do anything explicit here?
391
			}
392
	  
393
	}
394
	
395
	/**
396
	 * Save SystemMetadata to local store if needed
397
	 * @param sm
398
	 */
399
	private void saveLocally(SystemMetadata sm) {
400
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
401
		try {
402
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
403
				IdentifierManager.getInstance().createSystemMetadata(sm);
404
			} else {
405
				IdentifierManager.getInstance().updateSystemMetadata(sm);
406
			}
407
		} catch (McdbDocNotFoundException e) {
408
			logMetacat.error(
409
					"Could not save System Metadata to local store.", e);
410
		}
411
	}
412
	
413
	public void resynch() throws Exception {
414
		
415
		// get the CN that is online
416
		// TODO: do we even need to use a specific CN?
417
		// All the System Metadata records should be available via the shared map
418
//		NodeList allNodes = CNodeService.getInstance().listNodes();
419
//		Node onlineCN = null;
420
//		for (Node node: allNodes.getNodeList()) {
421
//			if (node.getType().equals(NodeType.CN)) {
422
//				if (node.getState().equals(NodeState.UP)) {
423
//					onlineCN = node;
424
//					break;
425
//				}
426
//			}
427
//		}
428
		
429
		// get the list of items that have changed since X
430
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
431
		EntryObject e = new PredicateBuilder().getEntryObject();
432
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
433
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
434
		for (SystemMetadata sm: updatedSystemMetadata) {
435
			saveLocally(sm);
436
		}
437
	}
438

    
439
}
(1-1/3)