Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-03-27 14:25:03 -0700 (Tue, 27 Mar 2012) $'
10
 * '$Revision: 7098 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.Collection;
32
import java.util.Date;
33
import java.util.List;
34
import java.util.concurrent.locks.Lock;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.service.exceptions.InvalidSystemMetadata;
38
import org.dataone.service.types.v1.Identifier;
39
import org.dataone.service.types.v1.Node;
40
import org.dataone.service.types.v1.NodeReference;
41
import org.dataone.service.types.v1.SystemMetadata;
42

    
43
import com.hazelcast.config.Config;
44
import com.hazelcast.config.FileSystemXmlConfig;
45
import com.hazelcast.core.EntryEvent;
46
import com.hazelcast.core.EntryListener;
47
import com.hazelcast.core.Hazelcast;
48
import com.hazelcast.core.HazelcastInstance;
49
import com.hazelcast.core.IMap;
50
import com.hazelcast.core.ISet;
51
import com.hazelcast.core.Member;
52
import com.hazelcast.core.MembershipEvent;
53
import com.hazelcast.core.MembershipListener;
54
import com.hazelcast.partition.Partition;
55
import com.hazelcast.partition.PartitionService;
56
import com.hazelcast.query.EntryObject;
57
import com.hazelcast.query.Predicate;
58
import com.hazelcast.query.PredicateBuilder;
59

    
60
import edu.ucsb.nceas.metacat.IdentifierManager;
61
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
62
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
63
import edu.ucsb.nceas.metacat.properties.PropertyService;
64
import edu.ucsb.nceas.metacat.shared.BaseService;
65
import edu.ucsb.nceas.metacat.shared.ServiceException;
66
import edu.ucsb.nceas.metacat.util.DocumentUtil;
67
import edu.ucsb.nceas.utilities.FileUtil;
68
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
69
/**
70
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
71
 */
72
public class HazelcastService extends BaseService
73
  implements EntryListener<Identifier, SystemMetadata>, MembershipListener {
74
  
75
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
76

    
77
/* The instance of the logging class */
78
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
79
  
80
  /* The singleton instance of the hazelcast service */
81
  private static HazelcastService hzService = null;
82
  
83
  /* The Hazelcast configuration */
84
  private Config hzConfig;
85
  
86
  /* The instance of the Hazelcast client */
87
//  private HazelcastClient hzClient;
88

    
89
  /* The name of the DataONE Hazelcast cluster group */
90
  private String groupName;
91

    
92
  /* The name of the DataONE Hazelcast cluster password */
93
  private String groupPassword;
94
  
95
  /* The name of the DataONE Hazelcast cluster IP addresses */
96
  private String addressList;
97
  
98
  /* The name of the node map */
99
  private String nodeMap;
100

    
101
  /* The name of the system metadata map */
102
  private String systemMetadataMap;
103
  
104
  /* The Hazelcast distributed task id generator namespace */
105
  private String taskIds;
106
  
107
  /* The Hazelcast distributed node map */
108
  private IMap<NodeReference, Node> nodes;
109

    
110
  /* The Hazelcast distributed system metadata map */
111
  private IMap<Identifier, SystemMetadata> systemMetadata;
112
  
113
  /* The name of the identifiers set */
114
  private String identifiersSet;
115
  
116
  /* The Hazelcast distributed identifiers set */
117
  private ISet<Identifier> identifiers;
118

    
119
  private HazelcastInstance hzInstance;
120
      
121
  /*
122
   * Constructor: Creates an instance of the hazelcast service. Since
123
   * this uses a singleton pattern, use getInstance() to gain the instance.
124
   */
125
  private HazelcastService() {
126
    
127
    super();
128
    _serviceName="HazelcastService";
129
    
130
    try {
131
      init();
132
      
133
    } catch (ServiceException se) {
134
      logMetacat.error("There was a problem creating the HazelcastService. " +
135
                       "The error message was: " + se.getMessage());
136
      
137
    }
138
    
139
  }
140
  
141
  /**
142
   *  Get the instance of the HazelcastService that has been instantiated,
143
   *  or instantiate one if it has not been already.
144
   *
145
   * @return hazelcastService - The instance of the hazelcast service
146
   */
147
  public static HazelcastService getInstance(){
148
    
149
    if ( hzService == null ) {
150
      
151
      hzService = new HazelcastService();
152
      
153
    }
154
    return hzService;
155
  }
156
  
157
  /**
158
   * Initializes the Hazelcast service
159
   */
160
  public void init() throws ServiceException {
161
    
162
    logMetacat.debug("HazelcastService.init() called.");
163
    
164
	String configFileName = null;
165
	Config config = null;
166
	try {
167
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
168
		config = new FileSystemXmlConfig(configFileName);
169
	} catch (Exception e) {
170
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
171
		logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName);
172
		// make sure we have the config
173
		try {
174
			config = new FileSystemXmlConfig(configFileName);
175
		} catch (FileNotFoundException e1) {
176
			String msg = e.getMessage();
177
			logMetacat.error(msg);
178
			throw new ServiceException(msg);
179
		}
180
	}
181

    
182
	Hazelcast.init(config);
183
  this.hzInstance = Hazelcast.getDefaultInstance();
184
  
185
    // Get configuration properties on instantiation
186
    try {
187
      groupName = 
188
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
189
      groupPassword = 
190
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
191
      addressList = 
192
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
193
      systemMetadataMap = 
194
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
195
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
196
//    nodeMap = 
197
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
198
      // Become a DataONE-process cluster client
199
//      String[] addresses = addressList.split(",");
200
//      hzClient = 
201
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
202
//      nodes = hzClient.getMap(nodeMap);
203
      
204
      // Get a reference to the shared system metadata map as a cluster member
205
      // NOTE: this loads the map from the backing store and can take a long time for large collections
206
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
207
      
208
      // Get a reference to the shared identifiers set as a cluster member
209
      identifiers = Hazelcast.getSet(identifiersSet);
210
      
211
      // Listen for changes to the system metadata map
212
      systemMetadata.addEntryListener(this, true);
213
      
214
      // Listen for members added/removed
215
      hzInstance.getCluster().addMembershipListener(this);
216
      
217
    } catch (PropertyNotFoundException e) {
218

    
219
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
220
        "The error message was: " + e.getMessage();
221
      logMetacat.error(msg);
222
      
223
    }
224
    
225
    // make sure we have all metadata locally
226
    try {
227
	    // add index for resynch() method
228
    	// can only be added once, TODO: figure out how this works
229
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
230
		//resynch();
231
	} catch (Exception e) {
232
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
233
		logMetacat.error(msg, e);
234
	}
235
        
236
  }
237
  
238
  /**
239
   * Get the system metadata map
240
   * 
241
   * @return systemMetadata - the hazelcast map of system metadata
242
   * @param identifier - the identifier of the object as a string
243
   */
244
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
245
	  return systemMetadata;
246
  }
247
  
248
  /**
249
   * Get the identifiers set
250
   * @return identifiers - the set of unique DataONE identifiers in the cluster
251
   */
252
  public ISet<Identifier> getIdentifiers() {
253
      return identifiers;
254
      
255
  }
256

    
257
  /**
258
   * When Metacat changes the underlying store, we need to refresh the
259
   * in-memory representation of it.
260
   * @param guid
261
   */
262
  public void refreshSystemMetadataEntry(String guid) {
263
	Identifier identifier = new Identifier();
264
	identifier.setValue(guid);
265
	// force hazelcast to update system metadata in memory from the store
266
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
267
	
268
  }
269

    
270
  public Lock getLock(String identifier) {
271
    
272
    Lock lock = null;
273
    
274
    try {
275
        lock = getInstance().getHazelcastInstance().getLock(identifier);
276
        
277
    } catch (RuntimeException e) {
278
        logMetacat.info("Couldn't get a lock for identifier " + 
279
            identifier + " !!");
280
    }
281
    return lock;
282
      
283
  }
284
  
285
  /**
286
   * Get the DataONE hazelcast node map
287
   * @return nodes - the hazelcast map of nodes
288
   */
289
//  public IMap<NodeReference, Node> getNodesMap() {
290
//	  return nodes;
291
//  }
292
  
293
  /**
294
   * Indicate whether or not this service is refreshable.
295
   *
296
   * @return refreshable - the boolean refreshable status
297
   */
298
  public boolean refreshable() {
299
    // TODO: Determine the consequences of restarting the Hazelcast instance
300
    // Set this to true if it's okay to drop from the cluster, lose the maps,
301
    // and start back up again
302
    return false;
303
    
304
  }
305
  
306
  /**
307
   * Stop the HazelcastService. When stopped, the service will no longer
308
   * respond to requests.
309
   */
310
  public void stop() throws ServiceException {
311
    
312
    Hazelcast.getLifecycleService().shutdown();
313
    
314
  }
315

    
316
  public HazelcastInstance getHazelcastInstance() {
317
      return this.hzInstance;
318
      
319
  }
320
  
321
  /**
322
   * Refresh the Hazelcast service by restarting it
323
   */
324
  @Override
325
  protected void doRefresh() throws ServiceException {
326

    
327
    // TODO: verify that the correct config file is still used
328
    Hazelcast.getLifecycleService().restart();
329
    
330
  }
331
  
332
  /**
333
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
334
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
335
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
336
	 * 
337
	 * @param event - The EntryEvent that occurred
338
	 */
339
	@Override
340
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
341
	  
342
	  logMetacat.info("SystemMetadata entry added event on identifier " + 
343
	      event.getKey().getValue());
344
		// handle as update - that method will create if necessary
345
		entryUpdated(event);
346

    
347
	}
348

    
349
	/**
350
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
351
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
352
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
353
	 * 
354
	 * @param event - The EntryEvent that occurred
355
	 */
356
	@Override
357
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
358

    
359
      logMetacat.info("SystemMetadata entry evicted event on identifier " + 
360
          event.getKey().getValue());
361
      
362
	    // ensure identifiers are listed in the hzIdentifiers set
363
      if ( !identifiers.contains(event.getKey()) ) {
364
          identifiers.add(event.getKey());
365
      }
366
	  
367
	}
368
	
369
	/**
370
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
371
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
372
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
373
	 * 
374
	 * @param event - The EntryEvent that occurred
375
	 */
376
	@Override
377
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
378
		
379
    logMetacat.info("SystemMetadata entry removed event on identifier " + 
380
        event.getKey().getValue());
381

    
382
	  // we typically don't remove objects in Metacat, but can remove System Metadata
383
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
384

    
385
    // keep the hzIdentifiers set in sync with the systemmetadata table
386
    if ( identifiers.contains(event.getKey()) ) {
387
        identifiers.remove(event.getKey());
388
        
389
    }
390

    
391
	}
392
	
393
	/**
394
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
395
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
396
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
397
	 * 
398
	 * @param event - The EntryEvent that occurred
399
	 */
400
	@Override
401
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
402
	
403
			logMetacat.debug("Entry added/updated to System Metadata map: " + 
404
			    event.getKey().getValue());
405
			PartitionService partitionService = Hazelcast.getPartitionService();
406
			Partition partition = partitionService.getPartition(event.getKey());
407
			Member ownerMember = partition.getOwner();
408
			if (!ownerMember.localMember()) {
409
				// need to pull the entry into the local store
410
				saveLocally(event.getValue());
411
			}
412
	
413
			// TODO evaluate the type of system metadata change, decide if it
414
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
415
			// iteratively lock the PID, create and submit the tasks, and expect a
416
			// result back. Deal with exceptions.
417
			SystemMetadata sysmeta = event.getValue();
418
			if (sysmeta != null) {
419
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
420
				// TODO: do we need to do anything explicit here?
421
			}
422

    
423
	     // ensure identifiers are listed in the hzIdentifiers set
424
      if ( !identifiers.contains(event.getKey()) ) {
425
          identifiers.add(event.getKey());
426
      }
427

    
428
	}
429
	
430
	/**
431
	 * Save SystemMetadata to local store if needed
432
	 * @param sm
433
	 */
434
	private void saveLocally(SystemMetadata sm) {
435
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
436
		try {
437
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
438
				IdentifierManager.getInstance().insertSystemMetadata(sm);
439
				
440
			} else {
441
				IdentifierManager.getInstance().updateSystemMetadata(sm);
442
				
443
			}
444
		} catch (McdbDocNotFoundException e) {
445
			logMetacat.error("Could not save System Metadata to local store.", e);
446
			
447
		} catch (SQLException e) {
448
	      logMetacat.error("Could not save System Metadata to local store.", e);
449
	      
450
    } catch (InvalidSystemMetadata e) {
451
        logMetacat.error("Could not save System Metadata to local store.", e);
452
        
453
    }
454
	}
455
	
456
	public void resynch() throws Exception {
457
		
458
		// get the CN that is online
459
		// TODO: do we even need to use a specific CN?
460
		// All the System Metadata records should be available via the shared map
461
//		NodeList allNodes = CNodeService.getInstance().listNodes();
462
//		Node onlineCN = null;
463
//		for (Node node: allNodes.getNodeList()) {
464
//			if (node.getType().equals(NodeType.CN)) {
465
//				if (node.getState().equals(NodeState.UP)) {
466
//					onlineCN = node;
467
//					break;
468
//				}
469
//			}
470
//		}
471
		
472
		// get the list of items that have changed since X
473
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
474
		EntryObject e = new PredicateBuilder().getEntryObject();
475
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
476
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
477
		for (SystemMetadata sm: updatedSystemMetadata) {
478
			saveLocally(sm);
479
		}
480
	}
481

    
482
	/**
483
	 * When there is missing SystemMetadata on the local member,
484
	 * we retrieve it from the shared map and add it to the local
485
	 * backing store for safe keeping.
486
	 */
487
	@Override
488
	public void memberAdded(MembershipEvent event) {
489
		Member member = event.getMember();
490
		logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress());
491
		boolean isLocal = member.localMember();
492
		if (isLocal) {
493
			logMetacat.debug("Member islocal: " + member.getInetSocketAddress());
494
			List<String> localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true);
495
			if (localIds != null) {
496
				logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size());
497
				for (String localId: localIds) {
498
					logMetacat.debug("Processing system metadata for localId: " + localId);
499
					try {
500
						String docid = DocumentUtil.getSmartDocId(localId);
501
						int rev = DocumentUtil.getRevisionFromAccessionNumber(localId);
502
						String guid = IdentifierManager.getInstance().getGUID(docid, rev);
503
						logMetacat.debug("Found mapped guid: " + guid);
504
						Identifier pid = new Identifier();
505
						pid.setValue(guid);
506
						SystemMetadata sm = systemMetadata.get(pid);
507
						logMetacat.debug("Found shared system metadata for guid: " + guid);
508
						saveLocally(sm);
509
						logMetacat.debug("Saved shared system metadata locally for guid: " + guid);
510
					} catch (Exception e) {
511
						logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e);
512
					}
513
				}
514
			}
515
		}
516
	}
517

    
518
	@Override
519
	public void memberRemoved(MembershipEvent event) {
520
		// TODO Auto-generated method stub
521
		
522
	}
523

    
524
}
(1-1/3)