Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: cjones $'
9
 *     '$Date: 2012-01-12 07:53:58 -0800 (Thu, 12 Jan 2012) $'
10
 * '$Revision: 6889 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.util.Collection;
31
import java.util.Date;
32
import java.util.concurrent.locks.Lock;
33

    
34
import org.apache.log4j.Logger;
35
import org.dataone.service.types.v1.Identifier;
36
import org.dataone.service.types.v1.Node;
37
import org.dataone.service.types.v1.NodeReference;
38
import org.dataone.service.types.v1.SystemMetadata;
39

    
40
import com.hazelcast.config.Config;
41
import com.hazelcast.config.FileSystemXmlConfig;
42
import com.hazelcast.core.EntryEvent;
43
import com.hazelcast.core.EntryListener;
44
import com.hazelcast.core.Hazelcast;
45
import com.hazelcast.core.HazelcastInstance;
46
import com.hazelcast.core.IMap;
47
import com.hazelcast.core.ISet;
48
import com.hazelcast.core.InstanceEvent;
49
import com.hazelcast.core.InstanceListener;
50
import com.hazelcast.core.Member;
51
import com.hazelcast.partition.Partition;
52
import com.hazelcast.partition.PartitionService;
53
import com.hazelcast.query.EntryObject;
54
import com.hazelcast.query.Predicate;
55
import com.hazelcast.query.PredicateBuilder;
56

    
57
import edu.ucsb.nceas.metacat.IdentifierManager;
58
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
59
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
60
import edu.ucsb.nceas.metacat.properties.PropertyService;
61
import edu.ucsb.nceas.metacat.shared.BaseService;
62
import edu.ucsb.nceas.metacat.shared.ServiceException;
63
import edu.ucsb.nceas.utilities.FileUtil;
64
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
65
/**
66
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
67
 */
68
public class HazelcastService extends BaseService
69
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
70
  
71
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
72

    
73
/* The instance of the logging class */
74
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
75
  
76
  /* The singleton instance of the hazelcast service */
77
  private static HazelcastService hzService = null;
78
  
79
  /* The Hazelcast configuration */
80
  private Config hzConfig;
81
  
82
  /* The instance of the Hazelcast client */
83
//  private HazelcastClient hzClient;
84

    
85
  /* The name of the DataONE Hazelcast cluster group */
86
  private String groupName;
87

    
88
  /* The name of the DataONE Hazelcast cluster password */
89
  private String groupPassword;
90
  
91
  /* The name of the DataONE Hazelcast cluster IP addresses */
92
  private String addressList;
93
  
94
  /* The name of the node map */
95
  private String nodeMap;
96

    
97
  /* The name of the system metadata map */
98
  private String systemMetadataMap;
99
  
100
  /* The Hazelcast distributed task id generator namespace */
101
  private String taskIds;
102
  
103
  /* The Hazelcast distributed node map */
104
  private IMap<NodeReference, Node> nodes;
105

    
106
  /* The Hazelcast distributed system metadata map */
107
  private IMap<Identifier, SystemMetadata> systemMetadata;
108
  
109
  /* The name of the identifiers set */
110
  private String identifiersSet;
111
  
112
  /* The Hazelcast distributed identifiers set */
113
  private ISet<Identifier> identifiers;
114

    
115
  private HazelcastInstance hzInstance;
116
      
117
  /*
118
   * Constructor: Creates an instance of the hazelcast service. Since
119
   * this uses a singleton pattern, use getInstance() to gain the instance.
120
   */
121
  private HazelcastService() {
122
    
123
    super();
124
    _serviceName="HazelcastService";
125
    
126
    try {
127
      init();
128
      
129
    } catch (ServiceException se) {
130
      logMetacat.error("There was a problem creating the HazelcastService. " +
131
                       "The error message was: " + se.getMessage());
132
      
133
    }
134
    
135
  }
136
  
137
  /**
138
   *  Get the instance of the HazelcastService that has been instantiated,
139
   *  or instantiate one if it has not been already.
140
   *
141
   * @return hazelcastService - The instance of the hazelcast service
142
   */
143
  public static HazelcastService getInstance(){
144
    
145
    if ( hzService == null ) {
146
      
147
      hzService = new HazelcastService();
148
      
149
    }
150
    return hzService;
151
  }
152
  
153
  /**
154
   * Initializes the Hazelcast service
155
   */
156
  public void init() throws ServiceException {
157
    
158
    logMetacat.debug("HazelcastService.doRefresh() called.");
159
    
160
	String configFileName = null;
161
	Config config = null;
162
	try {
163
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
164
		config = new FileSystemXmlConfig(configFileName);
165
	} catch (Exception e) {
166
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
167
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
168
		// make sure we have the config
169
		try {
170
			config = new FileSystemXmlConfig(configFileName);
171
		} catch (FileNotFoundException e1) {
172
			String msg = e.getMessage();
173
			logMetacat.error(msg);
174
			throw new ServiceException(msg);
175
		}
176
	}
177

    
178
	Hazelcast.init(config);
179
  this.hzInstance = Hazelcast.getDefaultInstance();
180
  
181
    // Get configuration properties on instantiation
182
    try {
183
      groupName = 
184
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
185
      groupPassword = 
186
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
187
      addressList = 
188
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
189
      systemMetadataMap = 
190
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
191
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
192
//    nodeMap = 
193
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
194
      // Become a DataONE-process cluster client
195
//      String[] addresses = addressList.split(",");
196
//      hzClient = 
197
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
198
//      nodes = hzClient.getMap(nodeMap);
199
      
200
      // Get a reference to the shared system metadata map as a cluster member
201
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
202
      
203
      // Get a reference to the shared identifiers set as a cluster member
204
      identifiers = Hazelcast.getSet(identifiersSet);
205
      
206
      // Listen for changes to the system metadata map
207
      systemMetadata.addEntryListener(this, true);
208
      
209
    } catch (PropertyNotFoundException e) {
210

    
211
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
212
        "The error message was: " + e.getMessage();
213
      logMetacat.error(msg);
214
      
215
    }
216
    
217
    // make sure we have all metadata locally
218
    try {
219
	    // add index for resynch() method
220
    	// can only be added once, TODO: figure out how this works
221
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
222
		//resynch();
223
	} catch (Exception e) {
224
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
225
		logMetacat.error(msg, e);
226
	}
227
        
228
  }
229
  
230
  /**
231
   * Get the system metadata map
232
   * 
233
   * @return systemMetadata - the hazelcast map of system metadata
234
   * @param identifier - the identifier of the object as a string
235
   */
236
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
237
	  return systemMetadata;
238
  }
239
  
240
  /**
241
   * Get the identifiers set
242
   * @return identifiers - the set of unique DataONE identifiers in the cluster
243
   */
244
  public ISet<Identifier> getIdentifiers() {
245
      return identifiers;
246
      
247
  }
248

    
249
  /**
250
   * When Metacat changes the underlying store, we need to refresh the
251
   * in-memory representation of it.
252
   * @param guid
253
   */
254
  public void refreshSystemMetadataEntry(String guid) {
255
	Identifier identifier = new Identifier();
256
	identifier.setValue(guid);
257
	// force hazelcast to update system metadata in memory from the store
258
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
259
	
260
  }
261

    
262
  public Lock getLock(String identifier) {
263
    
264
    Lock lock = null;
265
    
266
    try {
267
        lock = getInstance().getHazelcastInstance().getLock(identifier);
268
        
269
    } catch (RuntimeException e) {
270
        logMetacat.info("Couldn't get a lock for identifier " + 
271
            identifier + " !!");
272
    }
273
    return lock;
274
      
275
  }
276
  
277
  /**
278
   * Get the DataONE hazelcast node map
279
   * @return nodes - the hazelcast map of nodes
280
   */
281
//  public IMap<NodeReference, Node> getNodesMap() {
282
//	  return nodes;
283
//  }
284
  
285
  /**
286
   * Indicate whether or not this service is refreshable.
287
   *
288
   * @return refreshable - the boolean refreshable status
289
   */
290
  public boolean refreshable() {
291
    // TODO: Determine the consequences of restarting the Hazelcast instance
292
    // Set this to true if it's okay to drop from the cluster, lose the maps,
293
    // and start back up again
294
    return false;
295
    
296
  }
297
  
298
  /**
299
   * Stop the HazelcastService. When stopped, the service will no longer
300
   * respond to requests.
301
   */
302
  public void stop() throws ServiceException {
303
    
304
    Hazelcast.getLifecycleService().shutdown();
305
    
306
  }
307

    
308
  /**
309
   * Listen for new Hazelcast member events
310
   */
311
  @Override
312
  public void instanceCreated(InstanceEvent event) {
313
    logMetacat.info("New Hazelcast instance created: " +
314
      event.getInstance().getId() + ", " +
315
      event.getInstance().getInstanceType());
316
    
317
  }
318

    
319
  @Override
320
  public void instanceDestroyed(InstanceEvent event) {
321
    logMetacat.info("Hazelcast instance removed: " +
322
        event.getInstance().getId() + ", " +
323
        event.getInstance().getInstanceType());
324
    
325
  }
326

    
327
  public HazelcastInstance getHazelcastInstance() {
328
      return this.hzInstance;
329
      
330
  }
331
  
332
  /**
333
   * Refresh the Hazelcast service by restarting it
334
   */
335
  @Override
336
  protected void doRefresh() throws ServiceException {
337

    
338
    // TODO: verify that the correct config file is still used
339
    Hazelcast.getLifecycleService().restart();
340
    
341
  }
342
  
343
  /**
344
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
345
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
346
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
347
	 * 
348
	 * @param event - The EntryEvent that occurred
349
	 */
350
	@Override
351
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
352
		// handle as update - that method will create if necessary
353
		entryUpdated(event);
354
	}
355

    
356
	/**
357
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
358
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
359
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
360
	 * 
361
	 * @param event - The EntryEvent that occurred
362
	 */
363
	@Override
364
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
365
	    // ensure identifiers are listed in the hzIdentifiers set
366
      if ( !identifiers.contains(event.getKey()) ) {
367
          identifiers.add(event.getKey());
368
      }
369
	  
370
	}
371
	
372
	/**
373
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
374
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
375
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
376
	 * 
377
	 * @param event - The EntryEvent that occurred
378
	 */
379
	@Override
380
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
381
		// we typically don't remove objects in Metacat, but can remove System Metadata
382
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
383

    
384
    // keep the hzIdentifiers set in sync with the systemmetadata table
385
    if ( identifiers.contains(event.getKey()) ) {
386
        identifiers.remove(event.getKey());
387
        
388
    }
389

    
390
	}
391
	
392
	/**
393
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
394
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
395
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
396
	 * 
397
	 * @param event - The EntryEvent that occurred
398
	 */
399
	@Override
400
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
401
	
402
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
403
			PartitionService partitionService = Hazelcast.getPartitionService();
404
			Partition partition = partitionService.getPartition(event.getKey());
405
			Member ownerMember = partition.getOwner();
406
			if (!ownerMember.localMember()) {
407
				// need to pull the entry into the local store
408
				saveLocally(event.getValue());
409
			}
410
	
411
			// TODO evaluate the type of system metadata change, decide if it
412
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
413
			// iteratively lock the PID, create and submit the tasks, and expect a
414
			// result back. Deal with exceptions.
415
			SystemMetadata sysmeta = event.getValue();
416
			if (sysmeta != null) {
417
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
418
				// TODO: do we need to do anything explicit here?
419
			}
420

    
421
	     // ensure identifiers are listed in the hzIdentifiers set
422
      if ( !identifiers.contains(event.getKey()) ) {
423
          identifiers.add(event.getKey());
424
      }
425

    
426
	}
427
	
428
	/**
429
	 * Save SystemMetadata to local store if needed
430
	 * @param sm
431
	 */
432
	private void saveLocally(SystemMetadata sm) {
433
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
434
		try {
435
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
436
				IdentifierManager.getInstance().createSystemMetadata(sm);
437
			} else {
438
				IdentifierManager.getInstance().updateSystemMetadata(sm);
439
			}
440
		} catch (McdbDocNotFoundException e) {
441
			logMetacat.error(
442
					"Could not save System Metadata to local store.", e);
443
		}
444
	}
445
	
446
	public void resynch() throws Exception {
447
		
448
		// get the CN that is online
449
		// TODO: do we even need to use a specific CN?
450
		// All the System Metadata records should be available via the shared map
451
//		NodeList allNodes = CNodeService.getInstance().listNodes();
452
//		Node onlineCN = null;
453
//		for (Node node: allNodes.getNodeList()) {
454
//			if (node.getType().equals(NodeType.CN)) {
455
//				if (node.getState().equals(NodeState.UP)) {
456
//					onlineCN = node;
457
//					break;
458
//				}
459
//			}
460
//		}
461
		
462
		// get the list of items that have changed since X
463
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
464
		EntryObject e = new PredicateBuilder().getEntryObject();
465
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
466
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
467
		for (SystemMetadata sm: updatedSystemMetadata) {
468
			saveLocally(sm);
469
		}
470
	}
471

    
472
}
(1-1/3)