Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-03-30 17:23:19 -0700 (Fri, 30 Mar 2012) $'
10
 * '$Revision: 7107 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.Collection;
32
import java.util.Date;
33
import java.util.List;
34
import java.util.concurrent.locks.Lock;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.service.exceptions.InvalidSystemMetadata;
38
import org.dataone.service.types.v1.Identifier;
39
import org.dataone.service.types.v1.Node;
40
import org.dataone.service.types.v1.NodeReference;
41
import org.dataone.service.types.v1.SystemMetadata;
42

    
43
import com.hazelcast.config.Config;
44
import com.hazelcast.config.FileSystemXmlConfig;
45
import com.hazelcast.core.EntryEvent;
46
import com.hazelcast.core.EntryListener;
47
import com.hazelcast.core.Hazelcast;
48
import com.hazelcast.core.HazelcastInstance;
49
import com.hazelcast.core.IMap;
50
import com.hazelcast.core.ISet;
51
import com.hazelcast.core.LifecycleEvent;
52
import com.hazelcast.core.LifecycleListener;
53
import com.hazelcast.core.Member;
54
import com.hazelcast.core.MembershipEvent;
55
import com.hazelcast.core.MembershipListener;
56
import com.hazelcast.partition.Partition;
57
import com.hazelcast.partition.PartitionService;
58
import com.hazelcast.query.EntryObject;
59
import com.hazelcast.query.Predicate;
60
import com.hazelcast.query.PredicateBuilder;
61

    
62
import edu.ucsb.nceas.metacat.IdentifierManager;
63
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
64
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
65
import edu.ucsb.nceas.metacat.properties.PropertyService;
66
import edu.ucsb.nceas.metacat.shared.BaseService;
67
import edu.ucsb.nceas.metacat.shared.ServiceException;
68
import edu.ucsb.nceas.metacat.util.DocumentUtil;
69
import edu.ucsb.nceas.utilities.FileUtil;
70
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
71
/**
72
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
73
 */
74
public class HazelcastService extends BaseService
75
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener, LifecycleListener {
76
  
77
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
78

    
79
/* The instance of the logging class */
80
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
81
  
82
  /* The singleton instance of the hazelcast service */
83
  private static HazelcastService hzService = null;
84
  
85
  /* The Hazelcast configuration */
86
  private Config hzConfig;
87
  
88
  /* The instance of the Hazelcast client */
89
//  private HazelcastClient hzClient;
90

    
91
  /* The name of the DataONE Hazelcast cluster group */
92
  private String groupName;
93

    
94
  /* The name of the DataONE Hazelcast cluster password */
95
  private String groupPassword;
96
  
97
  /* The name of the DataONE Hazelcast cluster IP addresses */
98
  private String addressList;
99
  
100
  /* The name of the node map */
101
  private String nodeMap;
102

    
103
  /* The name of the system metadata map */
104
  private String systemMetadataMap;
105
  
106
  /* The Hazelcast distributed task id generator namespace */
107
  private String taskIds;
108
  
109
  /* The Hazelcast distributed node map */
110
  private IMap<NodeReference, Node> nodes;
111

    
112
  /* The Hazelcast distributed system metadata map */
113
  private IMap<Identifier, SystemMetadata> systemMetadata;
114
  
115
  /* The name of the identifiers set */
116
  private String identifiersSet;
117
  
118
  /* The Hazelcast distributed identifiers set */
119
  private ISet<Identifier> identifiers;
120

    
121
  private HazelcastInstance hzInstance;
122
      
123
  /*
124
   * Constructor: Creates an instance of the hazelcast service. Since
125
   * this uses a singleton pattern, use getInstance() to gain the instance.
126
   */
127
  private HazelcastService() {
128
    
129
    super();
130
    _serviceName="HazelcastService";
131
    
132
    try {
133
      init();
134
      
135
    } catch (ServiceException se) {
136
      logMetacat.error("There was a problem creating the HazelcastService. " +
137
                       "The error message was: " + se.getMessage());
138
      
139
    }
140
    
141
  }
142
  
143
  /**
144
   *  Get the instance of the HazelcastService that has been instantiated,
145
   *  or instantiate one if it has not been already.
146
   *
147
   * @return hazelcastService - The instance of the hazelcast service
148
   */
149
  public static HazelcastService getInstance(){
150
    
151
    if ( hzService == null ) {
152
      
153
      hzService = new HazelcastService();
154
      
155
    }
156
    return hzService;
157
  }
158
  
159
  /**
160
   * Initializes the Hazelcast service
161
   */
162
  public void init() throws ServiceException {
163
    
164
    logMetacat.debug("HazelcastService.init() called.");
165
    
166
	String configFileName = null;
167
	Config config = null;
168
	try {
169
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
170
		config = new FileSystemXmlConfig(configFileName);
171
	} catch (Exception e) {
172
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
173
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
174
		// make sure we have the config
175
		try {
176
			config = new FileSystemXmlConfig(configFileName);
177
		} catch (FileNotFoundException e1) {
178
			String msg = e.getMessage();
179
			logMetacat.error(msg);
180
			throw new ServiceException(msg);
181
		}
182
	}
183

    
184
	Hazelcast.init(config);
185
  this.hzInstance = Hazelcast.getDefaultInstance();
186
  
187
    // Get configuration properties on instantiation
188
    try {
189
      groupName = 
190
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
191
      groupPassword = 
192
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
193
      addressList = 
194
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
195
      systemMetadataMap = 
196
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
197
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
198
//    nodeMap = 
199
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
200
      // Become a DataONE-process cluster client
201
//      String[] addresses = addressList.split(",");
202
//      hzClient = 
203
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
204
//      nodes = hzClient.getMap(nodeMap);
205
      
206
      // Get a reference to the shared system metadata map as a cluster member
207
      // NOTE: this loads the map from the backing store and can take a long time for large collections
208
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
209
      
210
      // Get a reference to the shared identifiers set as a cluster member
211
      identifiers = Hazelcast.getSet(identifiersSet);
212
      
213
      // Listen for changes to the system metadata map
214
      systemMetadata.addEntryListener(this, true);
215
      
216
      // Listen for members added/removed
217
      hzInstance.getCluster().addMembershipListener(this);
218
      
219
    } catch (PropertyNotFoundException e) {
220

    
221
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
222
        "The error message was: " + e.getMessage();
223
      logMetacat.error(msg);
224
      
225
    }
226
    
227
    // make sure we have all metadata locally
228
    try {
229
	    // add index for resynch() method
230
    	// can only be added once, TODO: figure out how this works
231
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
232
		//resynch();
233
	} catch (Exception e) {
234
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
235
		logMetacat.error(msg, e);
236
	}
237
        
238
  }
239
  
240
  /**
241
   * Get the system metadata map
242
   * 
243
   * @return systemMetadata - the hazelcast map of system metadata
244
   * @param identifier - the identifier of the object as a string
245
   */
246
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
247
	  return systemMetadata;
248
  }
249
  
250
  /**
251
   * Get the identifiers set
252
   * @return identifiers - the set of unique DataONE identifiers in the cluster
253
   */
254
  public ISet<Identifier> getIdentifiers() {
255
      return identifiers;
256
      
257
  }
258

    
259
  /**
260
   * When Metacat changes the underlying store, we need to refresh the
261
   * in-memory representation of it.
262
   * @param guid
263
   */
264
  public void refreshSystemMetadataEntry(String guid) {
265
	Identifier identifier = new Identifier();
266
	identifier.setValue(guid);
267
	// force hazelcast to update system metadata in memory from the store
268
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
269
	
270
  }
271

    
272
  public Lock getLock(String identifier) {
273
    
274
    Lock lock = null;
275
    
276
    try {
277
        lock = getInstance().getHazelcastInstance().getLock(identifier);
278
        
279
    } catch (RuntimeException e) {
280
        logMetacat.info("Couldn't get a lock for identifier " + 
281
            identifier + " !!");
282
    }
283
    return lock;
284
      
285
  }
286
  
287
  /**
288
   * Get the DataONE hazelcast node map
289
   * @return nodes - the hazelcast map of nodes
290
   */
291
//  public IMap<NodeReference, Node> getNodesMap() {
292
//	  return nodes;
293
//  }
294
  
295
  /**
296
   * Indicate whether or not this service is refreshable.
297
   *
298
   * @return refreshable - the boolean refreshable status
299
   */
300
  public boolean refreshable() {
301
    // TODO: Determine the consequences of restarting the Hazelcast instance
302
    // Set this to true if it's okay to drop from the cluster, lose the maps,
303
    // and start back up again
304
    return false;
305
    
306
  }
307
  
308
  /**
309
   * Stop the HazelcastService. When stopped, the service will no longer
310
   * respond to requests.
311
   */
312
  public void stop() throws ServiceException {
313
    
314
    Hazelcast.getLifecycleService().shutdown();
315
    
316
  }
317

    
318
  public HazelcastInstance getHazelcastInstance() {
319
      return this.hzInstance;
320
      
321
  }
322
  
323
  /**
324
   * Refresh the Hazelcast service by restarting it
325
   */
326
  @Override
327
  protected void doRefresh() throws ServiceException {
328

    
329
    // TODO: verify that the correct config file is still used
330
    Hazelcast.getLifecycleService().restart();
331
    
332
  }
333
  
334
  /**
335
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
336
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
337
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
338
	 * 
339
	 * @param event - The EntryEvent that occurred
340
	 */
341
	@Override
342
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
343
	  
344
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
345
	      event.getKey().getValue());
346
		// handle as update - that method will create if necessary
347
		entryUpdated(event);
348

    
349
	}
350

    
351
	/**
352
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
353
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
354
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
355
	 * 
356
	 * @param event - The EntryEvent that occurred
357
	 */
358
	@Override
359
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
360

    
361
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
362
          event.getKey().getValue());
363
      
364
	    // ensure identifiers are listed in the hzIdentifiers set
365
      if ( !identifiers.contains(event.getKey()) ) {
366
          identifiers.add(event.getKey());
367
      }
368
	  
369
	}
370
	
371
	/**
372
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
373
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
374
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
375
	 * 
376
	 * @param event - The EntryEvent that occurred
377
	 */
378
	@Override
379
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
380
		
381
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
382
        event.getKey().getValue());
383

    
384
	  // we typically don't remove objects in Metacat, but can remove System Metadata
385
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
386

    
387
    // keep the hzIdentifiers set in sync with the systemmetadata table
388
    if ( identifiers.contains(event.getKey()) ) {
389
        identifiers.remove(event.getKey());
390
        
391
    }
392

    
393
	}
394
	
395
	/**
396
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
397
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
398
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
399
	 * 
400
	 * @param event - The EntryEvent that occurred
401
	 */
402
	@Override
403
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
404
	
405
			logMetacat.debug("Entry added/updated to System Metadata map: " + 
406
			    event.getKey().getValue());
407
			PartitionService partitionService = Hazelcast.getPartitionService();
408
			Partition partition = partitionService.getPartition(event.getKey());
409
			Member ownerMember = partition.getOwner();
410
			if (!ownerMember.localMember()) {
411
				// need to pull the entry into the local store
412
				saveLocally(event.getValue());
413
			}
414
	
415
			// TODO evaluate the type of system metadata change, decide if it
416
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
417
			// iteratively lock the PID, create and submit the tasks, and expect a
418
			// result back. Deal with exceptions.
419
			SystemMetadata sysmeta = event.getValue();
420
			if (sysmeta != null) {
421
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
422
				// TODO: do we need to do anything explicit here?
423
			}
424

    
425
	     // ensure identifiers are listed in the hzIdentifiers set
426
      if ( !identifiers.contains(event.getKey()) ) {
427
          identifiers.add(event.getKey());
428
      }
429

    
430
	}
431
	
432
	/**
433
	 * Save SystemMetadata to local store if needed
434
	 * @param sm
435
	 */
436
	private void saveLocally(SystemMetadata sm) {
437
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
438
		try {
439
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
440
				IdentifierManager.getInstance().insertSystemMetadata(sm);
441
				
442
			} else {
443
				IdentifierManager.getInstance().updateSystemMetadata(sm);
444
				
445
			}
446
		} catch (McdbDocNotFoundException e) {
447
			logMetacat.error("Could not save System Metadata to local store.", e);
448
			
449
		} catch (SQLException e) {
450
	      logMetacat.error("Could not save System Metadata to local store.", e);
451
	      
452
	    } catch (InvalidSystemMetadata e) {
453
	        logMetacat.error("Could not save System Metadata to local store.", e);
454
	        
455
	    }
456
	}
457
	
458
	/**
459
	 * Checks the local backing store for missing SystemMetadata,
460
	 * retrieves those entries from the shared map if they exist,
461
	 * and saves them locally.
462
	 */
463
	private void synchronizeLocalStore() {
464
		List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true);
465
		if (localIds != null) {
466
			logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
467
			for (String localId: localIds) {
468
				logMetacat.debug("Processing system metadata for localId: " + localId);
469
				try {
470
					String docid = DocumentUtil.getSmartDocId(localId);
471
					int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
472
					String guid = IdentifierManager.getInstance().getGUID(docid, rev);
473
					logMetacat.debug("Found mapped guid: " + guid);
474
					Identifier pid = new Identifier();
475
					pid.setValue(guid);
476
					SystemMetadata sm = systemMetadata.get(pid);
477
					logMetacat.debug("Found shared system metadata for guid: " + guid);
478
					saveLocally(sm);
479
					logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
480
				} catch (Exception e) {
481
					logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
482
				}
483
			}
484
		}
485
	}
486
	
487
	public void resynch() throws Exception {
488
		
489
		// get the CN that is online
490
		// TODO: do we even need to use a specific CN?
491
		// All the System Metadata records should be available via the shared map
492
//		NodeList allNodes = CNodeService.getInstance().listNodes();
493
//		Node onlineCN = null;
494
//		for (Node node: allNodes.getNodeList()) {
495
//			if (node.getType().equals(NodeType.CN)) {
496
//				if (node.getState().equals(NodeState.UP)) {
497
//					onlineCN = node;
498
//					break;
499
//				}
500
//			}
501
//		}
502
		
503
		// get the list of items that have changed since X
504
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
505
		EntryObject e = new PredicateBuilder().getEntryObject();
506
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
507
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
508
		for (SystemMetadata sm: updatedSystemMetadata) {
509
			saveLocally(sm);
510
		}
511
	}
512

    
513
	/**
514
	 * When there is missing SystemMetadata on the local member,
515
	 * we retrieve it from the shared map and add it to the local
516
	 * backing store for safe keeping.
517
	 */
518
	@Override
519
	public void memberAdded(MembershipEvent event) {
520
		Member member = event.getMember();
521
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
522
		boolean isLocal = member.localMember();
523
		if (isLocal) {
524
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
525
			synchronizeLocalStore();
526
		}
527
	}
528

    
529
	@Override
530
	public void memberRemoved(MembershipEvent event) {
531
		// TODO Auto-generated method stub
532
		
533
	}
534

    
535
	/**
536
	 * In cases where this cluster is paused, we want to 
537
	 * check that the local store accurately reflects the shared 
538
	 * SystemMetadata map
539
	 * @param event
540
	 */
541
	@Override
542
	public void stateChanged(LifecycleEvent event) {
543
		logMetacat.debug("HZ LifecycleEvent.state: " + event.getState());
544
		if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) {
545
			logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()");
546
			synchronizeLocalStore();
547
		}
548
	}
549

    
550
}
(1-1/3)