Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: Implements a service for managing a Hazelcast cluster member
4
 *  Copyright: 2011 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Christopher Jones
7
 * 
8
 *   '$Author: cjones $'
9
 *     '$Date: 2011-11-29 08:55:35 -0800 (Tue, 29 Nov 2011) $'
10
 * '$Revision: 6702 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.dataone.hazelcast;
28

    
29
import java.io.FileNotFoundException;
30
import java.util.Collection;
31
import java.util.Date;
32

    
33
import org.apache.log4j.Logger;
34
import org.dataone.service.types.v1.Identifier;
35
import org.dataone.service.types.v1.Node;
36
import org.dataone.service.types.v1.NodeReference;
37
import org.dataone.service.types.v1.SystemMetadata;
38

    
39
import com.hazelcast.client.HazelcastClient;
40
import com.hazelcast.config.Config;
41
import com.hazelcast.config.FileSystemXmlConfig;
42
import com.hazelcast.core.EntryEvent;
43
import com.hazelcast.core.EntryListener;
44
import com.hazelcast.core.Hazelcast;
45
import com.hazelcast.core.ILock;
46
import com.hazelcast.core.IMap;
47
import com.hazelcast.core.InstanceEvent;
48
import com.hazelcast.core.InstanceListener;
49
import com.hazelcast.core.Member;
50
import com.hazelcast.partition.Partition;
51
import com.hazelcast.partition.PartitionService;
52
import com.hazelcast.query.EntryObject;
53
import com.hazelcast.query.Predicate;
54
import com.hazelcast.query.PredicateBuilder;
55

    
56
import edu.ucsb.nceas.metacat.IdentifierManager;
57
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
58
import edu.ucsb.nceas.metacat.dataone.D1NodeService;
59
import edu.ucsb.nceas.metacat.properties.PropertyService;
60
import edu.ucsb.nceas.metacat.shared.BaseService;
61
import edu.ucsb.nceas.metacat.shared.ServiceException;
62
import edu.ucsb.nceas.utilities.FileUtil;
63
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
64
/**
65
 * The Hazelcast service enables Metacat as a Hazelcast cluster member
66
 */
67
public class HazelcastService extends BaseService
68
  implements InstanceListener, EntryListener<Identifier, SystemMetadata> {
69
  
70
  private static final String SINCE_PROPERTY = "dateSysMetadataModified";
71

    
72
/* The instance of the logging class */
73
  private static Logger logMetacat = Logger.getLogger(HazelcastService.class);
74
  
75
  /* The singleton instance of the hazelcast service */
76
  private static HazelcastService hzService = null;
77
  
78
  /* The Hazelcast configuration */
79
  private Config hzConfig;
80
  
81
  /* The instance of the Hazelcast client */
82
//  private HazelcastClient hzClient;
83

    
84
  /* The name of the DataONE Hazelcast cluster group */
85
  private String groupName;
86

    
87
  /* The name of the DataONE Hazelcast cluster password */
88
  private String groupPassword;
89
  
90
  /* The name of the DataONE Hazelcast cluster IP addresses */
91
  private String addressList;
92
  
93
  /* The name of the node map */
94
  private String nodeMap;
95

    
96
  /* The name of the system metadata map */
97
  private String systemMetadataMap;
98
  
99
  /* The Hazelcast distributed task id generator namespace */
100
  private String taskIds;
101
  
102
  /* The Hazelcast distributed node map */
103
  private IMap<NodeReference, Node> nodes;
104

    
105
  /* The Hazelcast distributed system metadata map */
106
  private IMap<Identifier, SystemMetadata> systemMetadata;
107
      
108
  /*
109
   * Constructor: Creates an instance of the hazelcast service. Since
110
   * this uses a singleton pattern, use getInstance() to gain the instance.
111
   */
112
  private HazelcastService() {
113
    
114
    super();
115
    _serviceName="HazelcastService";
116
    
117
    try {
118
      init();
119
      
120
    } catch (ServiceException se) {
121
      logMetacat.error("There was a problem creating the HazelcastService. " +
122
                       "The error message was: " + se.getMessage());
123
      
124
    }
125
    
126
  }
127
  
128
  /**
129
   *  Get the instance of the HazelcastService that has been instantiated,
130
   *  or instantiate one if it has not been already.
131
   *
132
   * @return hazelcastService - The instance of the hazelcast service
133
   */
134
  public static HazelcastService getInstance(){
135
    
136
    if ( hzService == null ) {
137
      
138
      hzService = new HazelcastService();
139
      
140
    }
141
    return hzService;
142
  }
143
  
144
  /**
145
   * Initializes the Hazelcast service
146
   */
147
  public void init() throws ServiceException {
148
    
149
    logMetacat.debug("HazelcastService.doRefresh() called.");
150
    
151
	String configFileName = null;
152
	Config config = null;
153
	try {
154
		configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath");
155
		config = new FileSystemXmlConfig(configFileName);
156
	} catch (Exception e) {
157
		logMetacat.warn("Custom Hazelcast configuration not defined, using default.", e);
158
		configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml";
159
		// make sure we have the config
160
		try {
161
			config = new FileSystemXmlConfig(configFileName);
162
		} catch (FileNotFoundException e1) {
163
			String msg = e.getMessage();
164
			logMetacat.error(msg);
165
			throw new ServiceException(msg);
166
		}
167
	}
168

    
169
	Hazelcast.init(config);
170
    
171
    // Get configuration properties on instantiation
172
    try {
173
      groupName = 
174
        PropertyService.getProperty("dataone.hazelcast.processCluster.groupName");
175
      groupPassword = 
176
        PropertyService.getProperty("dataone.hazelcast.processCluster.password");
177
      addressList = 
178
        PropertyService.getProperty("dataone.hazelcast.processCluster.instances");
179
//      nodeMap = 
180
//        PropertyService.getProperty("dataone.hazelcast.processCluster.nodesMap");
181
      systemMetadataMap = 
182
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap");
183

    
184
      // Become a DataONE-process cluster client
185
//      String[] addresses = addressList.split(",");
186
//      hzClient = 
187
//        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
188
//      nodes = hzClient.getMap(nodeMap);
189
      
190
      // Get a reference to the shared system metadata map as a cluster member
191
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
192
      
193
      // Listen for changes to the system metadata map
194
      systemMetadata.addEntryListener(this, true);
195
      
196
    } catch (PropertyNotFoundException e) {
197

    
198
      String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " +
199
        "The error message was: " + e.getMessage();
200
      logMetacat.error(msg);
201
      
202
    }
203
    
204
    // make sure we have all metadata locally
205
    try {
206
	    // add index for resynch() method
207
    	// can only be added once, TODO: figure out how this works
208
	    //systemMetadata.addIndex(SINCE_PROPERTY, true);
209
		//resynch();
210
	} catch (Exception e) {
211
		String msg = "Problem synchronizing system metadata. " + e.getMessage();
212
		logMetacat.error(msg, e);
213
	}
214
        
215
  }
216
  
217
  /**
218
   * Get the system metadata map
219
   * 
220
   * @return systemMetadata - the hazelcast map of system metadata
221
   */
222
  public IMap<Identifier,SystemMetadata> getSystemMetadataMap() {
223
	  return systemMetadata;
224
  }
225

    
226
  public ILock getLock(Identifier identifier) {
227
    
228
    ILock lock = null;
229
    
230
    try {
231
        lock = getInstance().getLock(identifier);
232
        
233
    } catch (RuntimeException e) {
234
        logMetacat.info("Couldn't get a lock for identifier " + 
235
            identifier.getValue() + " !!");
236
    }
237
    return lock;
238
      
239
  }
240
  
241
  /**
242
   * Get the DataONE hazelcast node map
243
   * @return nodes - the hazelcast map of nodes
244
   */
245
//  public IMap<NodeReference, Node> getNodesMap() {
246
//	  return nodes;
247
//  }
248
  
249
  /**
250
   * Indicate whether or not this service is refreshable.
251
   *
252
   * @return refreshable - the boolean refreshable status
253
   */
254
  public boolean refreshable() {
255
    // TODO: Determine the consequences of restarting the Hazelcast instance
256
    // Set this to true if it's okay to drop from the cluster, lose the maps,
257
    // and start back up again
258
    return false;
259
    
260
  }
261
  
262
  /**
263
   * Stop the HazelcastService. When stopped, the service will no longer
264
   * respond to requests.
265
   */
266
  public void stop() throws ServiceException {
267
    
268
    Hazelcast.getLifecycleService().shutdown();
269
    
270
  }
271

    
272
  /**
273
   * Listen for new Hazelcast member events
274
   */
275
  @Override
276
  public void instanceCreated(InstanceEvent event) {
277
    logMetacat.info("New Hazelcast instance created: " +
278
      event.getInstance().getId() + ", " +
279
      event.getInstance().getInstanceType());
280
    
281
  }
282

    
283
  @Override
284
  public void instanceDestroyed(InstanceEvent event) {
285
    logMetacat.info("Hazelcast instance removed: " +
286
        event.getInstance().getId() + ", " +
287
        event.getInstance().getInstanceType());
288
    
289
  }
290
  
291
  /**
292
   * Refresh the Hazelcast service by restarting it
293
   */
294
  @Override
295
  protected void doRefresh() throws ServiceException {
296

    
297
    // TODO: verify that the correct config file is still used
298
    Hazelcast.getLifecycleService().restart();
299
    
300
  }
301
  
302
  /**
303
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
304
	 * added events in the hzSystemMetadata map. Evaluate the entry and create
305
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
306
	 * 
307
	 * @param event - The EntryEvent that occurred
308
	 */
309
	@Override
310
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> event) {
311
		// handle as update - that method will create if necessary
312
		entryUpdated(event);
313
	}
314

    
315
	/**
316
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
317
	 * evicted events in the hzSystemMetadata map.  Evaluate the entry and create
318
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
319
	 * 
320
	 * @param event - The EntryEvent that occurred
321
	 */
322
	@Override
323
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> event) {
324
	  // nothing to do, entries are still in the backing store
325
	  
326
	}
327
	
328
	/**
329
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
330
	 * removed events in the hzSystemMetadata map.  Evaluate the entry and create
331
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
332
	 * 
333
	 * @param event - The EntryEvent that occurred
334
	 */
335
	@Override
336
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> event) {
337
		// we typically don't remove objects in Metacat, but can remove System Metadata
338
		IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue());
339
	  
340
	}
341
	
342
	/**
343
	 * Implement the EntryListener interface for Hazelcast, reponding to entry
344
	 * updated events in the hzSystemMetadata map.  Evaluate the entry and create
345
	 * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE)
346
	 * 
347
	 * @param event - The EntryEvent that occurred
348
	 */
349
	@Override
350
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> event) {
351
	
352
			logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue());
353
			PartitionService partitionService = Hazelcast.getPartitionService();
354
			Partition partition = partitionService.getPartition(event.getKey());
355
			Member ownerMember = partition.getOwner();
356
			if (!ownerMember.localMember()) {
357
				// need to pull the entry into the local store
358
				saveLocally(event.getValue());
359
			}
360
	
361
			// TODO evaluate the type of system metadata change, decide if it
362
			// warrants a replication event, what type (DATA, METADATA, RESOURCE),
363
			// iteratively lock the PID, create and submit the tasks, and expect a
364
			// result back. Deal with exceptions.
365
			SystemMetadata sysmeta = event.getValue();
366
			if (sysmeta != null) {
367
				boolean isMetadata = D1NodeService.isScienceMetadata(event.getValue());
368
				// TODO: do we need to do anything explicit here?
369
			}
370
	  
371
	}
372
	
373
	/**
374
	 * Save SystemMetadata to local store if needed
375
	 * @param sm
376
	 */
377
	private void saveLocally(SystemMetadata sm) {
378
		logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue());
379
		try {
380
			if (!IdentifierManager.getInstance().systemMetadataExists(sm.getIdentifier().getValue())) {
381
				IdentifierManager.getInstance().createSystemMetadata(sm);
382
			} else {
383
				IdentifierManager.getInstance().updateSystemMetadata(sm);
384
			}
385
		} catch (McdbDocNotFoundException e) {
386
			logMetacat.error(
387
					"Could not save System Metadata to local store.", e);
388
		}
389
	}
390
	
391
	public void resynch() throws Exception {
392
		
393
		// get the CN that is online
394
		// TODO: do we even need to use a specific CN?
395
		// All the System Metadata records should be available via the shared map
396
//		NodeList allNodes = CNodeService.getInstance().listNodes();
397
//		Node onlineCN = null;
398
//		for (Node node: allNodes.getNodeList()) {
399
//			if (node.getType().equals(NodeType.CN)) {
400
//				if (node.getState().equals(NodeState.UP)) {
401
//					onlineCN = node;
402
//					break;
403
//				}
404
//			}
405
//		}
406
		
407
		// get the list of items that have changed since X
408
		Date since = IdentifierManager.getInstance().getLastModifiedDate();
409
		EntryObject e = new PredicateBuilder().getEntryObject();
410
		Predicate predicate = e.get(SINCE_PROPERTY).greaterEqual(since);
411
		Collection<SystemMetadata> updatedSystemMetadata = getSystemMetadataMap().values(predicate);
412
		for (SystemMetadata sm: updatedSystemMetadata) {
413
			saveLocally(sm);
414
		}
415
	}
416

    
417
}
(1-1/3)