Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: cjones $'
9
 *     '$Date: 2012-01-13 11:17:49 -0800 (Fri, 13 Jan 2012) $'
10
 * '$Revision: 6904 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.sql.SQLException;
31
import java.util.Collection;
32
import java.util.Date;
33
import java.util.concurrent.locks.Lock;
34

    
35
import org.apache.log4j.Logger;
36
import org.dataone.service.exceptions.InvalidSystemMetadata;
37
import org.dataone.service.types.v1.Identifier;
38
import org.dataone.service.types.v1.Node;
39
import org.dataone.service.types.v1.NodeReference;
40
import org.dataone.service.types.v1.SystemMetadata;
41

    
42
import com.hazelcast.config.Config;
43
import com.hazelcast.config.FileSystemXmlConfig;
44
import com.hazelcast.core.EntryEvent;
45
import com.hazelcast.core.EntryListener;
46
import com.hazelcast.core.Hazelcast;
47
import com.hazelcast.core.HazelcastInstance;
48
import com.hazelcast.core.IMap;
49
import com.hazelcast.core.ISet;
50
import com.hazelcast.core.InstanceEvent;
51
import com.hazelcast.core.InstanceListener;
52
import com.hazelcast.core.Member;
53
import com.hazelcast.partition.Partition;
54
import com.hazelcast.partition.PartitionService;
55
import com.hazelcast.query.EntryObject;
56
import com.hazelcast.query.Predicate;
57
import com.hazelcast.query.PredicateBuilder;
58

    
59
import edu.ucsb.nceas.metacat.IdentifierManager;
60
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
61
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
62
import edu.ucsb.nceas.metacat.properties.PropertyService;
63
import edu.ucsb.nceas.metacat.shared.BaseService;
64
import edu.ucsb.nceas.metacat.shared.ServiceException;
65
import edu.ucsb.nceas.utilities.FileUtil;
66
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
67
/**
68
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
69
 */
70
public class HazelcastService extends BaseService
71
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
72
  
73
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
74

    
75
/* The instance of the logging class */
76
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
77
  
78
  /* The singleton instance of the hazelcast service */
79
  private static HazelcastService hzService = null;
80
  
81
  /* The Hazelcast configuration */
82
  private Config hzConfig;
83
  
84
  /* The instance of the Hazelcast client */
85
//  private HazelcastClient hzClient;
86

    
87
  /* The name of the DataONE Hazelcast cluster group */
88
  private String groupName;
89

    
90
  /* The name of the DataONE Hazelcast cluster password */
91
  private String groupPassword;
92
  
93
  /* The name of the DataONE Hazelcast cluster IP addresses */
94
  private String addressList;
95
  
96
  /* The name of the node map */
97
  private String nodeMap;
98

    
99
  /* The name of the system metadata map */
100
  private String systemMetadataMap;
101
  
102
  /* The Hazelcast distributed task id generator namespace */
103
  private String taskIds;
104
  
105
  /* The Hazelcast distributed node map */
106
  private IMap<NodeReference, Node> nodes;
107

    
108
  /* The Hazelcast distributed system metadata map */
109
  private IMap<Identifier, SystemMetadata> systemMetadata;
110
  
111
  /* The name of the identifiers set */
112
  private String identifiersSet;
113
  
114
  /* The Hazelcast distributed identifiers set */
115
  private ISet<Identifier> identifiers;
116

    
117
  private HazelcastInstance hzInstance;
118
      
119
  /*
120
   * Constructor: Creates an instance of the hazelcast service. Since
121
   * this uses a singleton pattern, use getInstance() to gain the instance.
122
   */
123
  private HazelcastService() {
124
    
125
    super();
126
    _serviceName="HazelcastService";
127
    
128
    try {
129
      init();
130
      
131
    } catch (ServiceException se) {
132
      logMetacat.error("There was a problem creating the HazelcastService. " +
133
                       "The error message was: " + se.getMessage());
134
      
135
    }
136
    
137
  }
138
  
139
  /**
140
   *  Get the instance of the HazelcastService that has been instantiated,
141
   *  or instantiate one if it has not been already.
142
   *
143
   * @return hazelcastService - The instance of the hazelcast service
144
   */
145
  public static HazelcastService getInstance(){
146
    
147
    if ( hzService == null ) {
148
      
149
      hzService = new HazelcastService();
150
      
151
    }
152
    return hzService;
153
  }
154
  
155
  /**
156
   * Initializes the Hazelcast service
157
   */
158
  public void init() throws ServiceException {
159
    
160
    logMetacat.debug("HazelcastService.doRefresh() called.");
161
    
162
	String configFileName = null;
163
	Config config = null;
164
	try {
165
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
166
		config = new FileSystemXmlConfig(configFileName);
167
	} catch (Exception e) {
168
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
169
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
170
		// make sure we have the config
171
		try {
172
			config = new FileSystemXmlConfig(configFileName);
173
		} catch (FileNotFoundException e1) {
174
			String msg = e.getMessage();
175
			logMetacat.error(msg);
176
			throw new ServiceException(msg);
177
		}
178
	}
179

    
180
	Hazelcast.init(config);
181
  this.hzInstance = Hazelcast.getDefaultInstance();
182
  
183
    // Get configuration properties on instantiation
184
    try {
185
      groupName = 
186
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
187
      groupPassword = 
188
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
189
      addressList = 
190
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
191
      systemMetadataMap = 
192
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
193
      identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet");
194
//    nodeMap = 
195
//    PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
196
      // Become a DataONE-process cluster client
197
//      String[] addresses = addressList.split(",");
198
//      hzClient = 
199
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
200
//      nodes = hzClient.getMap(nodeMap);
201
      
202
      // Get a reference to the shared system metadata map as a cluster member
203
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
204
      
205
      // Get a reference to the shared identifiers set as a cluster member
206
      identifiers = Hazelcast.getSet(identifiersSet);
207
      
208
      // Listen for changes to the system metadata map
209
      systemMetadata.addEntryListener(this, true);
210
      
211
    } catch (PropertyNotFoundException e) {
212

    
213
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
214
        "The error message was: " + e.getMessage();
215
      logMetacat.error(msg);
216
      
217
    }
218
    
219
    // make sure we have all metadata locally
220
    try {
221
	    // add index for resynch() method
222
    	// can only be added once, TODO: figure out how this works
223
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
224
		//resynch();
225
	} catch (Exception e) {
226
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
227
		logMetacat.error(msg, e);
228
	}
229
        
230
  }
231
  
232
  /**
233
   * Get the system metadata map
234
   * 
235
   * @return systemMetadata - the hazelcast map of system metadata
236
   * @param identifier - the identifier of the object as a string
237
   */
238
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
239
	  return systemMetadata;
240
  }
241
  
242
  /**
243
   * Get the identifiers set
244
   * @return identifiers - the set of unique DataONE identifiers in the cluster
245
   */
246
  public ISet<Identifier> getIdentifiers() {
247
      return identifiers;
248
      
249
  }
250

    
251
  /**
252
   * When Metacat changes the underlying store, we need to refresh the
253
   * in-memory representation of it.
254
   * @param guid
255
   */
256
  public void refreshSystemMetadataEntry(String guid) {
257
	Identifier identifier = new Identifier();
258
	identifier.setValue(guid);
259
	// force hazelcast to update system metadata in memory from the store
260
	HazelcastService.getInstance().getSystemMetadataMap().evict(identifier);
261
	
262
  }
263

    
264
  public Lock getLock(String identifier) {
265
    
266
    Lock lock = null;
267
    
268
    try {
269
        lock = getInstance().getHazelcastInstance().getLock(identifier);
270
        
271
    } catch (RuntimeException e) {
272
        logMetacat.info("Couldn't get a lock for identifier " + 
273
            identifier + " !!");
274
    }
275
    return lock;
276
      
277
  }
278
  
279
  /**
280
   * Get the DataONE hazelcast node map
281
   * @return nodes - the hazelcast map of nodes
282
   */
283
//  public IMap<NodeReference, Node> getNodesMap() {
284
//	  return nodes;
285
//  }
286
  
287
  /**
288
   * Indicate whether or not this service is refreshable.
289
   *
290
   * @return refreshable - the boolean refreshable status
291
   */
292
  public boolean refreshable() {
293
    // TODO: Determine the consequences of restarting the Hazelcast instance
294
    // Set this to true if it's okay to drop from the cluster, lose the maps,
295
    // and start back up again
296
    return false;
297
    
298
  }
299
  
300
  /**
301
   * Stop the HazelcastService. When stopped, the service will no longer
302
   * respond to requests.
303
   */
304
  public void stop() throws ServiceException {
305
    
306
    Hazelcast.getLifecycleService().shutdown();
307
    
308
  }
309

    
310
  /**
311
   * Listen for new Hazelcast member events
312
   */
313
  @Override
314
  public void instanceCreated(InstanceEvent event) {
315
    logMetacat.info("New Hazelcast instance created: " +
316
      event.getInstance().getId() + ", " +
317
      event.getInstance().getInstanceType());
318
    
319
  }
320

    
321
  @Override
322
  public void instanceDestroyed(InstanceEvent event) {
323
    logMetacat.info("Hazelcast instance removed: " +
324
        event.getInstance().getId() + ", " +
325
        event.getInstance().getInstanceType());
326
    
327
  }
328

    
329
  public HazelcastInstance getHazelcastInstance() {
330
      return this.hzInstance;
331
      
332
  }
333
  
334
  /**
335
   * Refresh the Hazelcast service by restarting it
336
   */
337
  @Override
338
  protected void doRefresh() throws ServiceException {
339

    
340
    // TODO: verify that the correct config file is still used
341
    Hazelcast.getLifecycleService().restart();
342
    
343
  }
344
  
345
  /**
346
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
347
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
348
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
349
	 * 
350
	 * @param event - The EntryEvent that occurred
351
	 */
352
	@Override
353
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
354
		// handle as update - that method will create if necessary
355
		entryUpdated(event);
356
	}
357

    
358
	/**
359
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
360
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
361
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
362
	 * 
363
	 * @param event - The EntryEvent that occurred
364
	 */
365
	@Override
366
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
367
	    // ensure identifiers are listed in the hzIdentifiers set
368
      if ( !identifiers.contains(event.getKey()) ) {
369
          identifiers.add(event.getKey());
370
      }
371
	  
372
	}
373
	
374
	/**
375
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
376
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
377
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
378
	 * 
379
	 * @param event - The EntryEvent that occurred
380
	 */
381
	@Override
382
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
383
		// we typically don't remove objects in Metacat, but can remove System Metadata
384
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
385

    
386
    // keep the hzIdentifiers set in sync with the systemmetadata table
387
    if ( identifiers.contains(event.getKey()) ) {
388
        identifiers.remove(event.getKey());
389
        
390
    }
391

    
392
	}
393
	
394
	/**
395
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
396
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
397
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
398
	 * 
399
	 * @param event - The EntryEvent that occurred
400
	 */
401
	@Override
402
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
403
	
404
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
405
			PartitionService partitionService = Hazelcast.getPartitionService();
406
			Partition partition = partitionService.getPartition(event.getKey());
407
			Member ownerMember = partition.getOwner();
408
			if (!ownerMember.localMember()) {
409
				// need to pull the entry into the local store
410
				saveLocally(event.getValue());
411
			}
412
	
413
			// TODO evaluate the type of system metadata change, decide if it
414
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
415
			// iteratively lock the PID, create and submit the tasks, and expect a
416
			// result back. Deal with exceptions.
417
			SystemMetadata sysmeta = event.getValue();
418
			if (sysmeta != null) {
419
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
420
				// TODO: do we need to do anything explicit here?
421
			}
422

    
423
	     // ensure identifiers are listed in the hzIdentifiers set
424
      if ( !identifiers.contains(event.getKey()) ) {
425
          identifiers.add(event.getKey());
426
      }
427

    
428
	}
429
	
430
	/**
431
	 * Save SystemMetadata to local store if needed
432
	 * @param sm
433
	 */
434
	private void saveLocally(SystemMetadata sm) {
435
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
436
		try {
437
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
438
				IdentifierManager.getInstance().createSystemMetadata(sm);
439
				
440
			} else {
441
				IdentifierManager.getInstance().updateSystemMetadata(sm);
442
				
443
			}
444
		} catch (McdbDocNotFoundException e) {
445
			logMetacat.error("Could not save System Metadata to local store.", e);
446
			
447
		} catch (SQLException e) {
448
	      logMetacat.error("Could not save System Metadata to local store.", e);
449
	      
450
    } catch (InvalidSystemMetadata e) {
451
        logMetacat.error("Could not save System Metadata to local store.", e);
452
        
453
    }
454
	}
455
	
456
	public void resynch() throws Exception {
457
		
458
		// get the CN that is online
459
		// TODO: do we even need to use a specific CN?
460
		// All the System Metadata records should be available via the shared map
461
//		NodeList allNodes = CNodeService.getInstance().listNodes();
462
//		Node onlineCN = null;
463
//		for (Node node: allNodes.getNodeList()) {
464
//			if (node.getType().equals(NodeType.CN)) {
465
//				if (node.getState().equals(NodeState.UP)) {
466
//					onlineCN = node;
467
//					break;
468
//				}
469
//			}
470
//		}
471
		
472
		// get the list of items that have changed since X
473
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
474
		EntryObject e = new PredicateBuilder().getEntryObject();
475
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
476
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
477
		for (SystemMetadata sm: updatedSystemMetadata) {
478
			saveLocally(sm);
479
		}
480
	}
481

    
482
}
(1-1/3)