Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.commons.io.IOUtils;
43
import org.apache.log4j.Logger;
44
import org.dataone.client.v2.CNode;
45
import org.dataone.client.v2.MNode;
46
import org.dataone.client.v2.itk.D1Client;
47
import org.dataone.cn.index.messaging.IndexTaskMessagingClient;
48
import org.dataone.cn.index.messaging.IndexTaskMessagingClientFactory;
49
import org.dataone.cn.index.task.IndexTask;
50
import org.dataone.cn.index.task.IndexTaskGenerator;
51
import org.dataone.exceptions.MarshallingException;
52
import org.dataone.service.cn.v2.CNAuthorization;
53
import org.dataone.service.cn.v2.CNCore;
54
import org.dataone.service.cn.v2.CNRead;
55
import org.dataone.service.cn.v2.CNReplication;
56
import org.dataone.service.cn.v2.CNView;
57
import org.dataone.service.exceptions.BaseException;
58
import org.dataone.service.exceptions.IdentifierNotUnique;
59
import org.dataone.service.exceptions.InsufficientResources;
60
import org.dataone.service.exceptions.InvalidRequest;
61
import org.dataone.service.exceptions.InvalidSystemMetadata;
62
import org.dataone.service.exceptions.InvalidToken;
63
import org.dataone.service.exceptions.NotAuthorized;
64
import org.dataone.service.exceptions.NotFound;
65
import org.dataone.service.exceptions.NotImplemented;
66
import org.dataone.service.exceptions.ServiceFailure;
67
import org.dataone.service.exceptions.UnsupportedType;
68
import org.dataone.service.exceptions.VersionMismatch;
69
import org.dataone.service.types.v1.AccessPolicy;
70
import org.dataone.service.types.v1.Checksum;
71
import org.dataone.service.types.v1.ChecksumAlgorithmList;
72
import org.dataone.service.types.v1.Event;
73
import org.dataone.service.types.v1.Identifier;
74
import org.dataone.service.types.v1.NodeReference;
75
import org.dataone.service.types.v1.NodeType;
76
import org.dataone.service.types.v1.ObjectFormatIdentifier;
77
import org.dataone.service.types.v1.ObjectList;
78
import org.dataone.service.types.v1.ObjectLocationList;
79
import org.dataone.service.types.v1.Permission;
80
import org.dataone.service.types.v1.Replica;
81
import org.dataone.service.types.v1.ReplicationPolicy;
82
import org.dataone.service.types.v1.ReplicationStatus;
83
import org.dataone.service.types.v1.Session;
84
import org.dataone.service.types.v1.Subject;
85
import org.dataone.service.types.v1_1.QueryEngineDescription;
86
import org.dataone.service.types.v1_1.QueryEngineList;
87
import org.dataone.service.types.v2.Node;
88
import org.dataone.service.types.v2.NodeList;
89
import org.dataone.service.types.v2.ObjectFormat;
90
import org.dataone.service.types.v2.ObjectFormatList;
91
import org.dataone.service.types.v2.SystemMetadata;
92
import org.dataone.service.util.TypeMarshaller;
93

    
94
import edu.ucsb.nceas.metacat.DBUtil;
95
import edu.ucsb.nceas.metacat.EventLog;
96
import edu.ucsb.nceas.metacat.IdentifierManager;
97
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
98
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
99
import edu.ucsb.nceas.metacat.properties.PropertyService;
100
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
101

    
102
/**
103
 * Represents Metacat's implementation of the DataONE Coordinating Node 
104
 * service API. Methods implement the various CN* interfaces, and methods common
105
 * to both Member Node and Coordinating Node interfaces are found in the
106
 * D1NodeService super class.
107
 *
108
 */
109
public class CNodeService extends D1NodeService implements CNAuthorization,
110
    CNCore, CNRead, CNReplication, CNView {
111

    
112
  /* the logger instance */
113
  private Logger logMetacat = null;
114
  public final static String V2V1MISSMATCH = "The Coordinating Node is not authorized to make systemMetadata changes on this object. Please make changes directly on the authoritative Member Node.";
115

    
116
  private IndexTaskMessagingClient indexTaskClient = null;
117
  /**
118
   * singleton accessor
119
   */
120
  public static CNodeService getInstance(HttpServletRequest request) { 
121
    return new CNodeService(request);
122
  }
123
  
124
  /**
125
   * Constructor, private for singleton access
126
   */
127
  private CNodeService(HttpServletRequest request) {
128
    super(request);
129
    logMetacat = Logger.getLogger(CNodeService.class);
130
    try {
131
        indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
132
    } catch (Exception e) {
133
        logMetacat.warn("CNodeService.constructorr - the client for sumbitting the index tasks couldn't be initialized since "+e.getMessage(), e);
134
    }
135
  }
136
  
137
 
138
  
139
  /*
140
   * Submit a index task to the message broker.
141
   */
142
  private void submitAddIndexTask(SystemMetadata sysmeta, String methodName, boolean isData)  {
143
      try {
144
          if(indexTaskClient == null) {
145
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
146
          }
147
          String objectURI = null;
148
          if(!isData) {
149
              String localId = IdentifierManager.getInstance().getLocalId(sysmeta.getIdentifier().getValue());
150
              objectURI = IdentifierManager.getInstance().getObjectFilePath(localId, isScienceMetadata(sysmeta));
151
              logMetacat.info("CNodeService.sbmitAddIndexTask - the URI for the object "+sysmeta.getIdentifier().getValue()+" is "+objectURI);
152
          }
153
          IndexTaskGenerator generator = new IndexTaskGenerator();
154
          //The objectURI will be null for data file
155
          IndexTask task = generator.generateAddTask(sysmeta, objectURI);
156
          indexTaskClient.submit(task);
157
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
158
      } catch (Exception e) {
159
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
160
      }
161
  }
162
  
163
  /*
164
   * Submit an update index task to the message broker.
165
   */
166
  private void submitUpdateIndexTask(SystemMetadata sysmeta, String methodName) {
167
      try {
168
          if(indexTaskClient == null) {
169
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
170
          }
171
          String localId = IdentifierManager.getInstance().getLocalId(sysmeta.getIdentifier().getValue());
172
          String objectURI = IdentifierManager.getInstance().getObjectFilePath(localId, isScienceMetadata(sysmeta));
173
          logMetacat.info("CNodeService.sbmitUpdateIndexTask - the URI for the object "+sysmeta.getIdentifier().getValue()+" is "+objectURI);
174
          IndexTaskGenerator generator = new IndexTaskGenerator();
175
          IndexTask task = generator.generateUpdateTask(sysmeta, objectURI);
176
          indexTaskClient.submit(task);
177
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
178
      } catch (Exception e) {
179
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
180
      }
181
  }
182
  
183
  /*
184
   * Submit a delete index task to the message broker.
185
   */
186
  private void submitDeleteIndexTask(SystemMetadata sysmeta, String methodName) {
187
      try {
188
          if(indexTaskClient == null) {
189
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
190
          }
191
          IndexTaskGenerator generator = new IndexTaskGenerator();
192
          IndexTask task = generator.generateDeleteTask(sysmeta);
193
          indexTaskClient.submit(task);
194
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
195
      } catch (Exception e) {
196
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
197
      }
198
  }
199
    
200
  /**
201
   * Set the replication policy for an object given the object identifier
202
   * It only is applied to objects whose authoritative mn is a v1 node.
203
   * @param session - the Session object containing the credentials for the Subject
204
   * @param pid - the object identifier for the given object
205
   * @param policy - the replication policy to be applied
206
   * 
207
   * @return true or false
208
   * 
209
   * @throws NotImplemented
210
   * @throws NotAuthorized
211
   * @throws ServiceFailure
212
   * @throws InvalidRequest
213
   * @throws VersionMismatch
214
   * 
215
   */
216
  @Override
217
  public boolean setReplicationPolicy(Session session, Identifier pid,
218
      ReplicationPolicy policy, long serialVersion) 
219
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
220
      InvalidRequest, InvalidToken, VersionMismatch {
221
      
222
      // do we have a valid pid?
223
      if (pid == null || pid.getValue().trim().equals("")) {
224
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
225
          
226
      }
227
      
228
      //only allow pid to be passed
229
      String serviceFailure = "4882";
230
      String notFound = "4884";
231
      checkV1SystemMetaPidExist(pid, serviceFailure, "The object for given PID "+pid.getValue()+" couldn't be identified if it exists",  notFound, 
232
              "No object could be found for given PID: "+pid.getValue());
233
      
234
      /*String serviceFailureCode = "4882";
235
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
236
      if(sid != null) {
237
          pid = sid;
238
      }*/
239
      // The lock to be used for this identifier
240
      Lock lock = null;
241
      
242
      // get the subject
243
      Subject subject = session.getSubject();
244
      
245
      // are we allowed to do this?
246
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
247
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
248
                  + " not allowed by " + subject.getValue() + " on "
249
                  + pid.getValue());
250
          
251
      }
252
      
253
      SystemMetadata systemMetadata = null;
254
      try {
255
          lock = HazelcastService.getInstance().getLock(pid.getValue());
256
          lock.lock();
257
          logMetacat.debug("Locked identifier " + pid.getValue());
258

    
259
          try {
260
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
261
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
262
                  
263
              }
264
              
265
              // did we get it correctly?
266
              if ( systemMetadata == null ) {
267
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
268
                  
269
              }
270
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
271
              String version = checker.getVersion("MNRead");
272
              if(version == null) {
273
                  throw new ServiceFailure("4882", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
274
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
275
                  //we don't apply this method to an object whose authoritative node is v2
276
                  throw new NotAuthorized("4881", V2V1MISSMATCH);
277
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
278
                  //we don't understand this version (it is not v1 or v2)
279
                  throw new InvalidRequest("4883", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
280
              }
281
              // does the request have the most current system metadata?
282
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
283
                 String msg = "The requested system metadata version number " + 
284
                     serialVersion + " differs from the current version at " +
285
                     systemMetadata.getSerialVersion().longValue() +
286
                     ". Please get the latest copy in order to modify it.";
287
                 throw new VersionMismatch("4886", msg);
288
                 
289
              }
290
              
291
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
292
              throw new NotFound("4884", "No record found for: " + pid.getValue());
293
            
294
          }
295
          
296
          // set the new policy
297
          systemMetadata.setReplicationPolicy(policy);
298
          
299
          // update the metadata
300
          try {
301
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
302
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
303
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
304
              //submit the index task
305
              String methodName ="setReplicationPolicy";
306
              submitUpdateIndexTask(systemMetadata, methodName);
307
              notifyReplicaNodes(systemMetadata);
308
              
309
          } catch (RuntimeException e) {
310
              throw new ServiceFailure("4882", e.getMessage());
311
          
312
          }
313
          
314
      } catch (RuntimeException e) {
315
          throw new ServiceFailure("4882", e.getMessage());
316
          
317
      } finally {
318
          lock.unlock();
319
          logMetacat.debug("Unlocked identifier " + pid.getValue());
320
          
321
      }
322
    
323
      return true;
324
  }
325

    
326
  /**
327
   * Deletes the replica from the given Member Node
328
   * NOTE: MN.delete() may be an "archive" operation. TBD.
329
   * @param session
330
   * @param pid
331
   * @param nodeId
332
   * @param serialVersion
333
   * @return
334
   * @throws InvalidToken
335
   * @throws ServiceFailure
336
   * @throws NotAuthorized
337
   * @throws NotFound
338
   * @throws NotImplemented
339
   * @throws VersionMismatch
340
   */
341
  @Override
342
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
343
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
344
	  
345
	  	// The lock to be used for this identifier
346
		Lock lock = null;
347

    
348
		// get the subject
349
		Subject subject = session.getSubject();
350

    
351
		// are we allowed to do this?
352
		/*boolean isAuthorized = false;
353
		try {
354
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
355
		} catch (InvalidRequest e) {
356
			throw new ServiceFailure("4882", e.getDescription());
357
		}
358
		if (!isAuthorized) {
359
			throw new NotAuthorized("4881", Permission.WRITE
360
					+ " not allowed by " + subject.getValue() + " on "
361
					+ pid.getValue());
362

    
363
		}*/
364
		if(session == null) {
365
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
366
		} else {
367
		    if(!isCNAdmin(session)) {
368
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
369
		    }
370
		}
371

    
372
		SystemMetadata systemMetadata = null;
373
		try {
374
			lock = HazelcastService.getInstance().getLock(pid.getValue());
375
			lock.lock();
376
			logMetacat.debug("Locked identifier " + pid.getValue());
377

    
378
			try {
379
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
380
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
381
				}
382

    
383
				// did we get it correctly?
384
				if (systemMetadata == null) {
385
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
386
				}
387

    
388
				// does the request have the most current system metadata?
389
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
390
					String msg = "The requested system metadata version number "
391
							+ serialVersion
392
							+ " differs from the current version at "
393
							+ systemMetadata.getSerialVersion().longValue()
394
							+ ". Please get the latest copy in order to modify it.";
395
					throw new VersionMismatch("4886", msg);
396

    
397
				}
398

    
399
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
400
				throw new NotFound("4884", "No record found for: " + pid.getValue());
401

    
402
			}
403
			  
404
			// check permissions
405
			// TODO: is this necessary?
406
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
407
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
408
			if (!isAllowed) {
409
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
410
			}*/
411
			  
412
			// delete the replica from the given node
413
			// CSJ: use CN.delete() to truly delete a replica, semantically
414
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
415
			//D1Client.getMN(nodeId).delete(session, pid);
416
			  
417
			// reflect that change in the system metadata
418
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
419
			for (Replica r: systemMetadata.getReplicaList()) {
420
				  if (r.getReplicaMemberNode().equals(nodeId)) {
421
					  updatedReplicas.remove(r);
422
					  break;
423
				  }
424
			}
425
			systemMetadata.setReplicaList(updatedReplicas);
426

    
427
			// update the metadata
428
			try {
429
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
430
				//we don't need to update the modification date.
431
				//systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
432
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
433
				//submit the index task
434
	            String methodName ="deleteReplicationMetadata";
435
	            submitUpdateIndexTask(systemMetadata, methodName);
436
			} catch (RuntimeException e) {
437
				throw new ServiceFailure("4882", e.getMessage());
438
			}
439

    
440
		} catch (RuntimeException e) {
441
			throw new ServiceFailure("4882", e.getMessage());
442
		} finally {
443
			lock.unlock();
444
			logMetacat.debug("Unlocked identifier " + pid.getValue());
445
		}
446

    
447
		return true;	  
448
	  
449
  }
450
  
451
  /**
452
   * Deletes an object from the Coordinating Node
453
   * 
454
   * @param session - the Session object containing the credentials for the Subject
455
   * @param pid - The object identifier to be deleted
456
   * 
457
   * @return pid - the identifier of the object used for the deletion
458
   * 
459
   * @throws InvalidToken
460
   * @throws ServiceFailure
461
   * @throws NotAuthorized
462
   * @throws NotFound
463
   * @throws NotImplemented
464
   * @throws InvalidRequest
465
   */
466
  @Override
467
  public Identifier delete(Session session, Identifier pid) 
468
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
469
      
470
      String localId = null;      // The corresponding docid for this pid
471
	  Lock lock = null;           // The lock to be used for this identifier
472
      CNode cn = null;            // a reference to the CN to get the node list    
473
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
474
      List<Node> nodeList = null; // the list of nodes in this CN environment
475

    
476
      // check for a valid session
477
      if (session == null) {
478
        	throw new InvalidToken("4963", "No session has been provided");
479
        	
480
      }
481

    
482
      // do we have a valid pid?
483
      if (pid == null || pid.getValue().trim().equals("")) {
484
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
485
          
486
      }
487
      
488
      String serviceFailureCode = "4962";
489
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
490
      if(sid != null) {
491
          pid = sid;
492
      }
493

    
494
	  // check that it is CN/admin
495
	  boolean allowed = isAdminAuthorized(session);
496
	  
497
	  // additional check if it is the authoritative node if it is not the admin
498
      if(!allowed) {
499
          allowed = isAuthoritativeMNodeAdmin(session, pid);
500
          
501
      }
502
	  
503
	  if (!allowed) {
504
		  String msg = "The subject " + session.getSubject().getValue() + 
505
			  " is not allowed to call delete() on a Coordinating Node.";
506
		  logMetacat.info(msg);
507
		  throw new NotAuthorized("4960", msg);
508
		  
509
	  }
510
	  
511
	  // Don't defer to superclass implementation without a locally registered identifier
512
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
513
      // Check for the existing identifier
514
      try {
515
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
516
          super.delete(session, pid);
517
          //submit the index task
518
          String methodName="delete";
519
          submitDeleteIndexTask(systemMetadata, methodName);
520
      } catch (McdbDocNotFoundException e) {
521
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
522
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
523
    	  
524
          try {
525
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
526
  			  lock.lock();
527
  			  logMetacat.debug("Locked identifier " + pid.getValue());
528

    
529
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
530
			  if ( sysMeta != null ) {
531
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
532
				sysMeta.setArchived(true);
533
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
534
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
535
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
536
	            //since this is cn, we don't need worry about the mn solr index.
537
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
538
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
539
	            //submit the index task
540
	            String methodName="delete";
541
	            submitDeleteIndexTask(sysMeta, methodName);
542
	            String username = session.getSubject().getValue();//just for logging purpose
543
                //since data objects were not registered in the identifier table, we use pid as the docid
544
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
545
				
546
			  } else {
547
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
548
					  ". Couldn't obtain the system metadata record.");
549
				  
550
			  }
551
			  
552
		  } catch (RuntimeException re) {
553
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
554
				  ". The error message was: " + re.getMessage());
555
			  
556
		  } finally {
557
			  lock.unlock();
558
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
559

    
560
		  }
561

    
562
          // NOTE: cannot log the delete without localId
563
//          EventLog.getInstance().log(request.getRemoteAddr(), 
564
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
565
//                  pid.getValue(), Event.DELETE.xmlValue());
566

    
567
      } catch (SQLException e) {
568
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
569
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
570
      }
571
      
572

    
573
      // get the node list
574
      try {
575
          cn = D1Client.getCN();
576
          nodeList = cn.listNodes().getNodeList();
577
          
578
      } catch (Exception e) { // handle BaseException and other I/O issues
579
          
580
          // swallow errors since the call is not critical
581
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
582
              " due to communication issues with the CN: " + e.getMessage());
583
          
584
      }
585

    
586
	  // notify the replicas
587
	  if (systemMetadata.getReplicaList() != null) {
588
		  for (Replica replica: systemMetadata.getReplicaList()) {
589
			  NodeReference replicaNode = replica.getReplicaMemberNode();
590
			  try {
591
                  if (nodeList != null) {
592
                      // find the node type
593
                      for (Node node : nodeList) {
594
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
595
                              nodeType = node.getType();
596
                              break;
597
              
598
                          }
599
                      }
600
                  }
601
                  
602
                  // only send call MN.delete() to avoid an infinite loop with the CN
603
                  if (nodeType != null && nodeType == NodeType.MN) {
604
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
605
                  }
606
                  
607
			  } catch (Exception e) {
608
				  // all we can really do is log errors and carry on with life
609
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
610
					  " from replica MN: " + replicaNode.getValue(), e);
611
			}
612
			  
613
		  }
614
	  }
615
	  
616
	  return pid;
617
      
618
  }
619
  
620
  /**
621
   * Archives an object from the Coordinating Node
622
   * 
623
   * @param session - the Session object containing the credentials for the Subject
624
   * @param pid - The object identifier to be deleted
625
   * 
626
   * @return pid - the identifier of the object used for the deletion
627
   * 
628
   * @throws InvalidToken
629
   * @throws ServiceFailure
630
   * @throws NotAuthorized
631
   * @throws NotFound
632
   * @throws NotImplemented
633
   * @throws InvalidRequest
634
   */
635
  @Override
636
  public Identifier archive(Session session, Identifier pid) 
637
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
638

    
639
      String localId = null; // The corresponding docid for this pid
640
	  Lock lock = null;      // The lock to be used for this identifier
641
      CNode cn = null;            // a reference to the CN to get the node list    
642
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
643
      List<Node> nodeList = null; // the list of nodes in this CN environment
644
      
645

    
646
      // check for a valid session
647
      if (session == null) {
648
        	throw new InvalidToken("4973", "No session has been provided");
649
        	
650
      }
651

    
652
      // do we have a valid pid?
653
      if (pid == null || pid.getValue().trim().equals("")) {
654
          throw new InvalidToken("4973", "The provided identifier was invalid.");
655
          
656
      }
657

    
658
	  // check that it is CN/admin
659
	  boolean allowed = isAdminAuthorized(session);
660
	  
661
	  String serviceFailureCode = "4972";
662
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
663
	  if(sid != null) {
664
	        pid = sid;
665
	  }
666
	  
667
	  //check if it is the authoritative member node
668
	  if(!allowed) {
669
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
670
	  }
671
	  
672
	  //check if the session has the change permission
673
	  if(!allowed) {
674
	      try {
675
	          allowed = userHasPermission(session, pid, Permission.CHANGE_PERMISSION);
676
	      } catch (InvalidRequest e) {
677
	          throw new InvalidToken("4973","CN.archive method couldn't determine if the client has the change permssion on this pid "+pid.getValue()+ " since "+e.getMessage());
678
	      }
679
	      
680
	  }
681
	  if (!allowed) {
682
		  String msg = "The subject " + session.getSubject().getValue() + 
683
				  " doesn't have the change permission to archive the object "+pid.getValue();
684
		  logMetacat.warn(msg);
685
		  throw new NotAuthorized("4970", msg);
686
	  }
687
	  
688
      try {
689
          HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
690
          logMetacat.debug("CNodeService.archive - lock the system metadata for "+pid.getValue());
691
          SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
692
          D1NodeVersionChecker checker = new D1NodeVersionChecker(sysMeta.getAuthoritativeMemberNode());
693
          String version = checker.getVersion("MNRead");
694
          if(version == null) {
695
              throw new ServiceFailure("4972", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
696
          } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
697
              //we don't apply this method to an object whose authoritative node is v2
698
              throw new NotAuthorized("4970", V2V1MISSMATCH);
699
          } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
700
              //we don't understand this version (it is not v1 or v2)
701
              throw new NotImplemented("4974", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
702
          }
703
          boolean needModifyDate = true;
704
          archiveCNObjectWithNotificationReplica(session, pid, sysMeta, needModifyDate);
705
      
706
      } finally {
707
          HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
708
          logMetacat.debug("CNodeService.archive - unlock the system metadata for "+pid.getValue());
709
      }
710

    
711
	  return pid;
712
      
713
  }
714
  
715
  
716
  /**
717
   * Archive a object on cn and notify the replica. This method doesn't lock the system metadata map. The caller should lock it.
718
   * This method doesn't check the authorization; this method only accept a pid.
719
   * @param session
720
   * @param pid
721
   * @param sysMeta
722
   * @param notifyReplica
723
   * @return
724
   * @throws InvalidToken
725
   * @throws ServiceFailure
726
   * @throws NotAuthorized
727
   * @throws NotFound
728
   * @throws NotImplemented
729
   */
730
  private Identifier archiveCNObjectWithNotificationReplica(Session session, Identifier pid, SystemMetadata sysMeta, boolean needModifyDate) 
731
                  throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
732
          boolean logArchive = true;
733
          archiveCNObject(logArchive, session, pid, sysMeta, needModifyDate);
734
          //submit the delete index task
735
          String methodName = "archive";
736
          submitDeleteIndexTask(sysMeta, methodName);
737
          // notify the replicas
738
          notifyReplicaNodes(sysMeta);
739
          return pid;
740
    }
741
  
742
  
743
  
744
  /**
745
   * Set the obsoletedBy attribute in System Metadata
746
   * @param session
747
   * @param pid
748
   * @param obsoletedByPid
749
   * @param serialVersion
750
   * @return
751
   * @throws NotImplemented
752
   * @throws NotFound
753
   * @throws NotAuthorized
754
   * @throws ServiceFailure
755
   * @throws InvalidRequest
756
   * @throws InvalidToken
757
   * @throws VersionMismatch
758
   */
759
  @Override
760
  public boolean setObsoletedBy(Session session, Identifier pid,
761
			Identifier obsoletedByPid, long serialVersion)
762
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
763
			InvalidRequest, InvalidToken, VersionMismatch {
764
      
765
      // do we have a valid pid?
766
      if (pid == null || pid.getValue().trim().equals("")) {
767
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
768
          
769
      }
770
      
771
      /*String serviceFailureCode = "4941";
772
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
773
      if(sid != null) {
774
          pid = sid;
775
      }*/
776
      
777
      // do we have a valid pid?
778
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
779
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
780
          
781
      }
782
      
783
      try {
784
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
785
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
786
          }
787
      } catch (SQLException ee) {
788
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
789
      }
790
      
791

    
792
		// The lock to be used for this identifier
793
		Lock lock = null;
794

    
795
		// get the subject
796
		Subject subject = session.getSubject();
797

    
798
		// are we allowed to do this?
799
		if (!isAuthorized(session, pid, Permission.WRITE)) {
800
			throw new NotAuthorized("4881", Permission.WRITE
801
					+ " not allowed by " + subject.getValue() + " on "
802
					+ pid.getValue());
803

    
804
		}
805

    
806

    
807
		SystemMetadata systemMetadata = null;
808
		try {
809
			lock = HazelcastService.getInstance().getLock(pid.getValue());
810
			lock.lock();
811
			logMetacat.debug("Locked identifier " + pid.getValue());
812

    
813
			try {
814
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
815
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
816
				}
817

    
818
				// did we get it correctly?
819
				if (systemMetadata == null) {
820
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
821
				}
822

    
823
				// does the request have the most current system metadata?
824
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
825
					String msg = "The requested system metadata version number "
826
							+ serialVersion
827
							+ " differs from the current version at "
828
							+ systemMetadata.getSerialVersion().longValue()
829
							+ ". Please get the latest copy in order to modify it.";
830
					throw new VersionMismatch("4886", msg);
831

    
832
				}
833
				
834
				 //only apply to the object whose authoritative member node is v1.
835
	              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
836
	              String version = checker.getVersion("MNRead");
837
	              if(version == null) {
838
	                  throw new ServiceFailure("4941", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
839
	              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
840
	                  //we don't apply this method to an object whose authoritative node is v2
841
	                  throw new NotAuthorized("4945", V2V1MISSMATCH);
842
	              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
843
	                  //we don't understand this version (it is not v1 or v2)
844
	                  throw new InvalidRequest("4942", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
845
	              }
846

    
847
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
848
				throw new NotFound("4884", "No record found for: " + pid.getValue());
849

    
850
			}
851

    
852
			// set the new policy
853
			systemMetadata.setObsoletedBy(obsoletedByPid);
854

    
855
			// update the metadata
856
			try {
857
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
858
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
859
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
860
			} catch (RuntimeException e) {
861
				throw new ServiceFailure("4882", e.getMessage());
862
			}
863
			//submit an update index task
864
			String methodName="setObsoletedBy";
865
			submitUpdateIndexTask(systemMetadata, methodName);
866

    
867
		} catch (RuntimeException e) {
868
			throw new ServiceFailure("4882", e.getMessage());
869
		} finally {
870
			lock.unlock();
871
			logMetacat.debug("Unlocked identifier " + pid.getValue());
872
		}
873

    
874
		return true;
875
	}
876
  
877
  
878
  /**
879
   * Set the replication status for an object given the object identifier
880
   * 
881
   * @param session - the Session object containing the credentials for the Subject
882
   * @param pid - the object identifier for the given object
883
   * @param status - the replication status to be applied
884
   * 
885
   * @return true or false
886
   * 
887
   * @throws NotImplemented
888
   * @throws NotAuthorized
889
   * @throws ServiceFailure
890
   * @throws InvalidRequest
891
   * @throws InvalidToken
892
   * @throws NotFound
893
   * 
894
   */
895
  @Override
896
  public boolean setReplicationStatus(Session session, Identifier pid,
897
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
898
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
899
      InvalidRequest, NotFound {
900
	  
901
	  // cannot be called by public
902
	  if (session == null) {
903
		  throw new NotAuthorized("4720", "Session cannot be null");
904
	  } 
905
	  
906
	  /*else {
907
	      if(!isCNAdmin(session)) {
908
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
909
        }
910
	  }*/
911
	  
912
	// do we have a valid pid?
913
      if (pid == null || pid.getValue().trim().equals("")) {
914
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
915
          
916
      }
917
      
918
      /*String serviceFailureCode = "4700";
919
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
920
      if(sid != null) {
921
          pid = sid;
922
      }*/
923
      
924
      // The lock to be used for this identifier
925
      Lock lock = null;
926
      
927
      boolean allowed = false;
928
      int replicaEntryIndex = -1;
929
      List<Replica> replicas = null;
930
      // get the subject
931
      Subject subject = session.getSubject();
932
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
933
          " is " + status.toString());
934
      
935
      SystemMetadata systemMetadata = null;
936

    
937
      try {
938
          lock = HazelcastService.getInstance().getLock(pid.getValue());
939
          lock.lock();
940
          logMetacat.debug("Locked identifier " + pid.getValue());
941

    
942
          try {      
943
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
944

    
945
              // did we get it correctly?
946
              if ( systemMetadata == null ) {
947
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
948
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
949
                  
950
              }
951
              replicas = systemMetadata.getReplicaList();
952
              int count = 0;
953
              
954
              // was there a failure? log it
955
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
956
                 String msg = "The replication request of the object identified by " + 
957
                     pid.getValue() + " failed.  The error message was " +
958
                     failure.getMessage() + ".";
959
                 logMetacat.error(msg);
960
              }
961
              
962
              if (replicas.size() > 0 && replicas != null) {
963
                  // find the target replica index in the replica list
964
                  for (Replica replica : replicas) {
965
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
966
                      String targetNodeStr = targetNode.getValue();
967
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
968
                  
969
                      if (replicaNodeStr.equals(targetNodeStr)) {
970
                          replicaEntryIndex = count;
971
                          logMetacat.debug("replica entry index is: "
972
                                  + replicaEntryIndex);
973
                          break;
974
                      }
975
                      count++;
976
                  
977
                  }
978
              }
979
              // are we allowed to do this? only CNs and target MNs are allowed
980
              CNode cn = D1Client.getCN();
981
              List<Node> nodes = cn.listNodes().getNodeList();
982
              
983
              // find the node in the node list
984
              for ( Node node : nodes ) {
985
                  
986
                  NodeReference nodeReference = node.getIdentifier();
987
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
988
                      nodeReference.getValue());
989
                  
990
                  // allow target MN certs
991
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
992
                      node.getType().equals(NodeType.MN)) {
993
                      List<Subject> nodeSubjects = node.getSubjectList();
994
                      
995
                      // check if the session subject is in the node subject list
996
                      for (Subject nodeSubject : nodeSubjects) {
997
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
998
                                  nodeSubject.getValue() + " and " + subject.getValue());
999
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
1000
                              
1001
                              // lastly limit to COMPLETED, INVALIDATED,
1002
                              // and FAILED status updates from MNs only
1003
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
1004
                                   status.equals(ReplicationStatus.INVALIDATED) ||
1005
                                   status.equals(ReplicationStatus.FAILED)) {
1006
                                  allowed = true;
1007
                                  break;
1008
                                  
1009
                              }                              
1010
                          }
1011
                      }                 
1012
                  }
1013
              }
1014

    
1015
              if ( !allowed ) {
1016
                  //check for CN admin access
1017
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
1018
                  allowed = isCNAdmin(session);
1019
                  
1020
              }              
1021
              
1022
              if ( !allowed ) {
1023
                  String msg = "The subject identified by "
1024
                          + subject.getValue()
1025
                          + " is not a CN or MN, and does not have permission to set the replication status for "
1026
                          + "the replica identified by "
1027
                          + targetNode.getValue() + ".";
1028
                  logMetacat.info(msg);
1029
                  throw new NotAuthorized("4720", msg);
1030
                  
1031
              }
1032

    
1033
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1034
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
1035
                " : " + e.getMessage());
1036
            
1037
          }
1038
          
1039
          Replica targetReplica = new Replica();
1040
          // set the status for the replica
1041
          if ( replicaEntryIndex != -1 ) {
1042
              targetReplica = replicas.get(replicaEntryIndex);
1043
              
1044
              // don't allow status to change from COMPLETED to anything other
1045
              // than INVALIDATED: prevents overwrites from race conditions
1046
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1047
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
1048
            	  throw new InvalidRequest("4730", "Status state change from " +
1049
            			  targetReplica.getReplicationStatus() + " to " +
1050
            			  status.toString() + "is prohibited for identifier " +
1051
            			  pid.getValue() + " and target node " + 
1052
            			  targetReplica.getReplicaMemberNode().getValue());
1053
              }
1054
              
1055
              if(targetReplica.getReplicationStatus().equals(status)) {
1056
                  //There is no change in the status, we do nothing.
1057
                  return true;
1058
              }
1059
              
1060
              targetReplica.setReplicationStatus(status);
1061
              
1062
              logMetacat.debug("Set the replication status for " + 
1063
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
1064
                  targetReplica.getReplicationStatus() + " for identifier " +
1065
                  pid.getValue());
1066
              
1067
          } else {
1068
              // this is a new entry, create it
1069
              targetReplica.setReplicaMemberNode(targetNode);
1070
              targetReplica.setReplicationStatus(status);
1071
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
1072
              replicas.add(targetReplica);
1073
              
1074
          }
1075
          
1076
          systemMetadata.setReplicaList(replicas);
1077
                
1078
          // update the metadata
1079
          try {
1080
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1081
              // Based on CN behavior discussion 9/16/15, we no longer want to 
1082
              // update the modified date for changes to the replica list
1083
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1084
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1085
              
1086
              //submit the index task
1087
              String methodName ="setReplicationStatus";
1088
              submitUpdateIndexTask(systemMetadata, methodName);
1089

    
1090
              if ( !status.equals(ReplicationStatus.QUEUED) && 
1091
            	   !status.equals(ReplicationStatus.REQUESTED)) {
1092
                  
1093
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
1094
                          "\tNODE:\t" + targetNode.getValue() + 
1095
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1096
                
1097
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
1098
                          "\tPID:\t"  + pid.getValue() + 
1099
                          "\tNODE:\t" + targetNode.getValue() + 
1100
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1101
              }
1102

    
1103
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
1104
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
1105
                      " on target node " + targetNode + ". The exception was: " +
1106
                      failure.getMessage());
1107
              }
1108
              
1109
			  // update the replica nodes about the completed replica when complete, failed or invalid
1110
              if (status.equals(ReplicationStatus.COMPLETED) || status.equals(ReplicationStatus.FAILED) ||
1111
                      status.equals(ReplicationStatus.INVALIDATED)) {
1112
				notifyReplicaNodes(systemMetadata);
1113
			}
1114
          
1115
          } catch (RuntimeException e) {
1116
              throw new ServiceFailure("4700", e.getMessage());
1117
          
1118
          }
1119
          
1120
    } catch (RuntimeException e) {
1121
        String msg = "There was a RuntimeException getting the lock for " +
1122
            pid.getValue();
1123
        logMetacat.info(msg);
1124
        
1125
    } finally {
1126
        lock.unlock();
1127
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1128
        
1129
    }
1130
      
1131
      return true;
1132
  }
1133
  
1134
/**
1135
   * Return the checksum of the object given the identifier 
1136
   * 
1137
   * @param session - the Session object containing the credentials for the Subject
1138
   * @param pid - the object identifier for the given object
1139
   * 
1140
   * @return checksum - the checksum of the object
1141
   * 
1142
   * @throws InvalidToken
1143
   * @throws ServiceFailure
1144
   * @throws NotAuthorized
1145
   * @throws NotFound
1146
   * @throws NotImplemented
1147
   */
1148
  @Override
1149
  public Checksum getChecksum(Session session, Identifier pid)
1150
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1151
    NotImplemented {
1152
    
1153
	boolean isAuthorized = false;
1154
	try {
1155
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1156
	} catch (InvalidRequest e) {
1157
		throw new ServiceFailure("1410", e.getDescription());
1158
	}  
1159
    if (!isAuthorized) {
1160
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1161
    }
1162
    
1163
    SystemMetadata systemMetadata = null;
1164
    Checksum checksum = null;
1165
    
1166
    try {
1167
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1168

    
1169
        if (systemMetadata == null ) {
1170
            String error ="";
1171
            String localId = null;
1172
            try {
1173
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1174
              
1175
             } catch (Exception e) {
1176
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1177
            }
1178
            
1179
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1180
                error = DELETEDMESSAGE;
1181
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1182
                error = DELETEDMESSAGE;
1183
            }
1184
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1185
        }
1186
        checksum = systemMetadata.getChecksum();
1187
        
1188
    } catch (RuntimeException e) {
1189
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1190
            pid.getValue() + ". The error message was: " + e.getMessage());
1191
      
1192
    }
1193
    
1194
    return checksum;
1195
  }
1196

    
1197
  /**
1198
   * Resolve the location of a given object
1199
   * 
1200
   * @param session - the Session object containing the credentials for the Subject
1201
   * @param pid - the object identifier for the given object
1202
   * 
1203
   * @return objectLocationList - the list of nodes known to contain the object
1204
   * 
1205
   * @throws InvalidToken
1206
   * @throws ServiceFailure
1207
   * @throws NotAuthorized
1208
   * @throws NotFound
1209
   * @throws NotImplemented
1210
   */
1211
  @Override
1212
  public ObjectLocationList resolve(Session session, Identifier pid)
1213
    throws InvalidToken, ServiceFailure, NotAuthorized,
1214
    NotFound, NotImplemented {
1215

    
1216
    throw new NotImplemented("4131", "resolve not implemented");
1217

    
1218
  }
1219

    
1220
  /**
1221
   * Metacat does not implement this method at the CN level
1222
   */
1223
  @Override
1224
  public ObjectList search(Session session, String queryType, String query)
1225
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1226
    NotImplemented {
1227

    
1228
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1229
	  
1230
//    ObjectList objectList = null;
1231
//    try {
1232
//        objectList = 
1233
//          IdentifierManager.getInstance().querySystemMetadata(
1234
//              null, //startTime, 
1235
//              null, //endTime,
1236
//              null, //objectFormat, 
1237
//              false, //replicaStatus, 
1238
//              0, //start, 
1239
//              1000 //count
1240
//              );
1241
//        
1242
//    } catch (Exception e) {
1243
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1244
//    }
1245
//
1246
//      return objectList;
1247
		  
1248
  }
1249
  
1250
  /**
1251
   * Returns the object format registered in the DataONE Object Format 
1252
   * Vocabulary for the given format identifier
1253
   * 
1254
   * @param fmtid - the identifier of the format requested
1255
   * 
1256
   * @return objectFormat - the object format requested
1257
   * 
1258
   * @throws ServiceFailure
1259
   * @throws NotFound
1260
   * @throws InsufficientResources
1261
   * @throws NotImplemented
1262
   */
1263
  @Override
1264
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1265
    throws ServiceFailure, NotFound, NotImplemented {
1266
     
1267
      return ObjectFormatService.getInstance().getFormat(fmtid);
1268
      
1269
  }
1270

    
1271
    @Override
1272
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1273
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1274

    
1275
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1276
                "format ID: " + format.getFormatId() + "\n" + 
1277
                "format name: " + format.getFormatName() + "\n" + 
1278
                "format type: " + format.getFormatType() );
1279
        
1280
        // FIXME remove:
1281
        if (true)
1282
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1283
        
1284
        if (!isAdminAuthorized(session))
1285
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1286

    
1287
        String separator = ".";
1288
        try {
1289
            separator = PropertyService.getProperty("document.accNumSeparator");
1290
        } catch (PropertyNotFoundException e) {
1291
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1292
        }
1293

    
1294
        // find pids of last and next ObjectFormatList
1295
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1296
        int lastRev = -1;
1297
        try {
1298
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1299
        } catch (SQLException e) {
1300
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1301
        }
1302
        int nextRev = lastRev + 1;
1303
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1304
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1305
        
1306
        Identifier lastPid = new Identifier();
1307
        lastPid.setValue(lastDocID);
1308
        Identifier nextPid = new Identifier();
1309
        nextPid.setValue(nextDocID);
1310
        
1311
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1312
                + "Next ObjectFormatList document ID: " + nextDocID);
1313
        
1314
        // add new format to the current ObjectFormatList
1315
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1316
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1317
        innerList.add(format);
1318

    
1319
        // get existing (last) sysmeta and make a copy
1320
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1321
        SystemMetadata nextSysmeta = new SystemMetadata();
1322
        try {
1323
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1324
        } catch (IllegalAccessException | InvocationTargetException e) {
1325
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1326
        }
1327
        
1328
        // create the new object format list, and update the old sysmeta with obsoletedBy
1329
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1330
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1331
        
1332
        // TODO add to ObjectFormatService local cache?
1333
        
1334
        return formatId;
1335
    }
1336

    
1337
    /**
1338
     * Creates the object for the next / updated version of the ObjectFormatList.
1339
     * 
1340
     * @param session
1341
     * @param lastPid
1342
     * @param nextPid
1343
     * @param objectFormatList
1344
     * @param lastSysmeta
1345
     */
1346
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1347
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1348
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1349
        
1350
        PipedInputStream is = new PipedInputStream();
1351
        PipedOutputStream os = null;
1352
        
1353
        try {
1354
            os = new PipedOutputStream(is);
1355
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1356
        } catch (MarshallingException | IOException e) {
1357
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1358
        } finally {
1359
            try {
1360
                os.flush();
1361
                os.close();
1362
            } catch (IOException ioe) {
1363
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1364
            }
1365
        }
1366
        
1367
        BigInteger docSize = lastSysmeta.getSize();
1368
        try {
1369
            docSize = BigInteger.valueOf(is.available());
1370
        } catch (IOException e) {
1371
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1372
        }
1373
        
1374
        lastSysmeta.setIdentifier(nextPid);
1375
        lastSysmeta.setObsoletes(lastPid);
1376
        lastSysmeta.setSize(docSize); 
1377
        lastSysmeta.setSubmitter(session.getSubject());
1378
        lastSysmeta.setDateUploaded(new Date());
1379
        
1380
        // create new object format list
1381
        try {
1382
            create(session, nextPid, is, lastSysmeta);
1383
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1384
                | InvalidSystemMetadata | InvalidRequest e) {
1385
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1386
        }
1387
    }
1388
  
1389
    /**
1390
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1391
     * by setting the obsoletedBy value to the pid of the new version of the 
1392
     * ObjectFormatList.
1393
     * 
1394
     * @param session
1395
     * @param lastPid
1396
     * @param obsoletedByPid
1397
     * @param lastSysmeta
1398
     * @throws ServiceFailure
1399
     */
1400
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1401
            throws ServiceFailure {
1402
        
1403
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1404
        
1405
        try {
1406
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1407
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1408
                | InvalidSystemMetadata | InvalidToken e) {
1409
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1410
        }
1411
    }
1412
  /**
1413
   * Returns a list of all object formats registered in the DataONE Object 
1414
   * Format Vocabulary
1415
    * 
1416
   * @return objectFormatList - The list of object formats registered in 
1417
   *                            the DataONE Object Format Vocabulary
1418
   * 
1419
   * @throws ServiceFailure
1420
   * @throws NotImplemented
1421
   * @throws InsufficientResources
1422
   */
1423
  @Override
1424
  public ObjectFormatList listFormats() 
1425
    throws ServiceFailure, NotImplemented {
1426

    
1427
    return ObjectFormatService.getInstance().listFormats();
1428
  }
1429

    
1430
  /**
1431
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1432
    * 
1433
   * @return nodeList - List of nodes from the registry
1434
   * 
1435
   * @throws ServiceFailure
1436
   * @throws NotImplemented
1437
   */
1438
  @Override
1439
  public NodeList listNodes() 
1440
    throws NotImplemented, ServiceFailure {
1441

    
1442
    throw new NotImplemented("4800", "listNodes not implemented");
1443
  }
1444

    
1445
  /**
1446
   * Provides a mechanism for adding system metadata independently of its 
1447
   * associated object, such as when adding system metadata for data objects.
1448
    * 
1449
   * @param session - the Session object containing the credentials for the Subject
1450
   * @param pid - The identifier of the object to register the system metadata against
1451
   * @param sysmeta - The system metadata to be registered
1452
   * 
1453
   * @return true if the registration succeeds
1454
   * 
1455
   * @throws NotImplemented
1456
   * @throws NotAuthorized
1457
   * @throws ServiceFailure
1458
   * @throws InvalidRequest
1459
   * @throws InvalidSystemMetadata
1460
   */
1461
  @Override
1462
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1463
      SystemMetadata sysmeta) 
1464
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1465
      InvalidSystemMetadata {
1466

    
1467
      // The lock to be used for this identifier
1468
      Lock lock = null;
1469

    
1470
      // TODO: control who can call this?
1471
      if (session == null) {
1472
          //TODO: many of the thrown exceptions do not use the correct error codes
1473
          //check these against the docs and correct them
1474
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1475
                  "  If you are not logged in, please do so and retry the request.");
1476
      } else {
1477
          //only CN is allwoed
1478
          if(!isCNAdmin(session)) {
1479
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1480
          }
1481
      }
1482
      // the identifier can't be an SID
1483
      try {
1484
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1485
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1486
          }
1487
      } catch (SQLException sqle) {
1488
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1489
      }
1490
      
1491
      // verify that guid == SystemMetadata.getIdentifier()
1492
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1493
          "|" + sysmeta.getIdentifier().getValue());
1494
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1495
          throw new InvalidRequest("4863", 
1496
              "The identifier in method call (" + pid.getValue() + 
1497
              ") does not match identifier in system metadata (" +
1498
              sysmeta.getIdentifier().getValue() + ").");
1499
      }
1500
      
1501
      //check if the sid is legitimate in the system metadata
1502
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1503
      Identifier sid = sysmeta.getSeriesId();
1504
      if(sid != null) {
1505
          if (!isValidIdentifier(sid)) {
1506
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1507
          }
1508
      }
1509

    
1510
      try {
1511
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1512
          lock.lock();
1513
          logMetacat.debug("Locked identifier " + pid.getValue());
1514
          logMetacat.debug("Checking if identifier exists...");
1515
          // Check that the identifier does not already exist
1516
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1517
              throw new InvalidRequest("4863", 
1518
                  "The identifier is already in use by an existing object.");
1519
          
1520
          }
1521
          
1522
          // insert the system metadata into the object store
1523
          logMetacat.debug("Starting to insert SystemMetadata...");
1524
          try {
1525
              //for the object whose authoriative mn is v1. we need reset the modification date.
1526
              //d1-sync already set the serial version. so we don't need do again.
1527
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1528
              String version = checker.getVersion("MNRead");
1529
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1530
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1531
              }
1532
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1533
              
1534
          } catch (RuntimeException e) {
1535
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1536
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1537
                  e.getClass() + ": " + e.getMessage());
1538
              
1539
          }
1540
          
1541
      } catch (RuntimeException e) {
1542
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1543
                  e.getClass() + ": " + e.getMessage());
1544
          
1545
      }  finally {
1546
          lock.unlock();
1547
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1548
          
1549
      }
1550

    
1551
      
1552
      logMetacat.debug("Returning from registerSystemMetadata");
1553
      //submit the index task
1554
      String methodName="registerSystemMetadata";
1555
      boolean isData = true;
1556
      submitAddIndexTask(sysmeta, methodName, isData);
1557
      
1558
      try {
1559
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1560
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1561
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1562
    	          localId, "registerSystemMetadata");
1563
      } catch (McdbDocNotFoundException e) {
1564
    	  // do nothing, no localId to log with
1565
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1566
      } catch (SQLException ee) {
1567
          // do nothing, no localId to log with
1568
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1569
      }
1570
      
1571
      
1572
      return pid;
1573
  }
1574
  
1575
  /**
1576
   * Given an optional scope and format, reserves and returns an identifier 
1577
   * within that scope and format that is unique and will not be 
1578
   * used by any other sessions. 
1579
    * 
1580
   * @param session - the Session object containing the credentials for the Subject
1581
   * @param pid - The identifier of the object to register the system metadata against
1582
   * @param scope - An optional string to be used to qualify the scope of 
1583
   *                the identifier namespace, which is applied differently 
1584
   *                depending on the format requested. If scope is not 
1585
   *                supplied, a default scope will be used.
1586
   * @param format - The optional name of the identifier format to be used, 
1587
   *                  drawn from a DataONE-specific vocabulary of identifier 
1588
   *                 format names, including several common syntaxes such 
1589
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1590
   *                 format is not supplied by the caller, the CN service 
1591
   *                 will use a default identifier format, which may change 
1592
   *                 over time.
1593
   * 
1594
   * @return true if the registration succeeds
1595
   * 
1596
   * @throws InvalidToken
1597
   * @throws ServiceFailure
1598
   * @throws NotAuthorized
1599
   * @throws IdentifierNotUnique
1600
   * @throws NotImplemented
1601
   */
1602
  @Override
1603
  public Identifier reserveIdentifier(Session session, Identifier pid)
1604
  throws InvalidToken, ServiceFailure,
1605
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1606

    
1607
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1608
  }
1609
  
1610
  @Override
1611
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1612
  throws InvalidToken, ServiceFailure,
1613
        NotAuthorized, NotImplemented, InvalidRequest {
1614
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1615
  }
1616
  
1617
  /**
1618
    * Checks whether the pid is reserved by the subject in the session param
1619
    * If the reservation is held on the pid by the subject, we return true.
1620
    * 
1621
   * @param session - the Session object containing the Subject
1622
   * @param pid - The identifier to check
1623
   * 
1624
   * @return true if the reservation exists for the subject/pid
1625
   * 
1626
   * @throws InvalidToken
1627
   * @throws ServiceFailure
1628
   * @throws NotFound - when the pid is not found (in use or in reservation)
1629
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1630
   * @throws IdentifierNotUnique - when the pid is in use
1631
   * @throws NotImplemented
1632
   */
1633

    
1634
  @Override
1635
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1636
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1637
      NotImplemented, InvalidRequest {
1638
  
1639
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1640
  }
1641

    
1642
  /**
1643
   * Changes ownership (RightsHolder) of the specified object to the 
1644
   * subject specified by userId
1645
    * 
1646
   * @param session - the Session object containing the credentials for the Subject
1647
   * @param pid - Identifier of the object to be modified
1648
   * @param userId - The subject that will be taking ownership of the specified object.
1649
   *
1650
   * @return pid - the identifier of the modified object
1651
   * 
1652
   * @throws ServiceFailure
1653
   * @throws InvalidToken
1654
   * @throws NotFound
1655
   * @throws NotAuthorized
1656
   * @throws NotImplemented
1657
   * @throws InvalidRequest
1658
   */  
1659
  @Override
1660
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1661
      long serialVersion)
1662
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1663
      NotImplemented, InvalidRequest, VersionMismatch {
1664
      
1665
      // The lock to be used for this identifier
1666
      Lock lock = null;
1667
      
1668
      // do we have a valid pid?
1669
      if (pid == null || pid.getValue().trim().equals("")) {
1670
          throw new InvalidRequest("4442", "The provided identifier was invalid.");
1671
          
1672
      }
1673

    
1674
      // get the subject
1675
      Subject subject = session.getSubject();
1676
      
1677
      String serviceFailureCode = "4490";
1678
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1679
      if(sid != null) {
1680
          pid = sid;
1681
      }
1682
      
1683
      // are we allowed to do this?
1684
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1685
          throw new NotAuthorized("4440", "not allowed by "
1686
                  + subject.getValue() + " on " + pid.getValue());
1687
          
1688
      }
1689
      
1690
      SystemMetadata systemMetadata = null;
1691
      try {
1692
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1693
          lock.lock();
1694
          logMetacat.debug("Locked identifier " + pid.getValue());
1695

    
1696
          try {
1697
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1698
              
1699
              if(systemMetadata == null) {
1700
                  throw new NotFound("4460", "The object "+pid.getValue()+" doesn't exist in the node.");
1701
              }
1702
              // does the request have the most current system metadata?
1703
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1704
                 String msg = "The requested system metadata version number " + 
1705
                     serialVersion + " differs from the current version at " +
1706
                     systemMetadata.getSerialVersion().longValue() +
1707
                     ". Please get the latest copy in order to modify it.";
1708
                 throw new VersionMismatch("4443", msg);
1709
              }
1710
              
1711
              //only apply to the object whose authoritative member node is v1.
1712
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1713
              String version = checker.getVersion("MNRead");
1714
              if(version == null) {
1715
                  throw new ServiceFailure("4490", "Couldn't determine the MNRead version of the authoritative member node storage version for the pid "+pid.getValue());
1716
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1717
                  //we don't apply this method to an object whose authoritative node is v2
1718
                  throw new NotAuthorized("4440", V2V1MISSMATCH);
1719
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1720
                  //we don't understand this version (it is not v1 or v2)
1721
                  throw new InvalidRequest("4442", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1722
              }
1723
              
1724
              
1725
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1726
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1727
              
1728
          }
1729
              
1730
          // set the new rights holder
1731
          systemMetadata.setRightsHolder(userId);
1732
          
1733
          // update the metadata
1734
          try {
1735
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1736
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1737
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1738
              //submit the index task
1739
              String methodName ="setRightsHolder";
1740
              submitUpdateIndexTask(systemMetadata, methodName);
1741
              notifyReplicaNodes(systemMetadata);
1742
              
1743
          } catch (RuntimeException e) {
1744
              throw new ServiceFailure("4490", e.getMessage());
1745
          
1746
          }
1747
          
1748
      } catch (RuntimeException e) {
1749
          throw new ServiceFailure("4490", e.getMessage());
1750
          
1751
      } finally {
1752
          lock.unlock();
1753
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1754
      
1755
      }
1756
      
1757
      return pid;
1758
  }
1759

    
1760
  /**
1761
   * Verify that a replication task is authorized by comparing the target node's
1762
   * Subject (from the X.509 certificate-derived Session) with the list of 
1763
   * subjects in the known, pending replication tasks map.
1764
   * 
1765
   * @param originatingNodeSession - Session information that contains the 
1766
   *                                 identity of the calling user
1767
   * @param targetNodeSubject - Subject identifying the target node
1768
   * @param pid - the identifier of the object to be replicated
1769
   * @param replicatePermission - the execute permission to be granted
1770
   * 
1771
   * @throws ServiceFailure
1772
   * @throws NotImplemented
1773
   * @throws InvalidToken
1774
   * @throws NotAuthorized
1775
   * @throws InvalidRequest
1776
   * @throws NotFound
1777
   */
1778
  @Override
1779
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1780
    Subject targetNodeSubject, Identifier pid) 
1781
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1782
    NotFound, InvalidRequest {
1783
    
1784
    boolean isAllowed = false;
1785
    SystemMetadata sysmeta = null;
1786
    NodeReference targetNode = null;
1787
    
1788
    try {
1789
      // get the target node reference from the nodes list
1790
      CNode cn = D1Client.getCN();
1791
      List<Node> nodes = cn.listNodes().getNodeList();
1792
      
1793
      if ( nodes != null ) {
1794
        for (Node node : nodes) {
1795
            
1796
        	if (node.getSubjectList() != null) {
1797
        		
1798
	            for (Subject nodeSubject : node.getSubjectList()) {
1799
	            	
1800
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1801
	                    targetNode = node.getIdentifier();
1802
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1803
	                    break;
1804
	                }
1805
	            }
1806
        	}
1807
            
1808
            if ( targetNode != null) { break; }
1809
        }
1810
        
1811
      } else {
1812
          String msg = "Couldn't get the node list from the CN";
1813
          logMetacat.debug(msg);
1814
          throw new ServiceFailure("4872", msg);
1815
          
1816
      }
1817
      
1818
      // can't find a node listed with the given subject
1819
      if ( targetNode == null ) {
1820
          String msg = "There is no Member Node registered with a node subject " +
1821
              "matching " + targetNodeSubject.getValue();
1822
          logMetacat.info(msg);
1823
          throw new NotAuthorized("4871", msg);
1824
          
1825
      }
1826
      
1827
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1828
      
1829
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1830

    
1831
      if ( sysmeta != null ) {
1832
          
1833
          List<Replica> replicaList = sysmeta.getReplicaList();
1834
          
1835
          if ( replicaList != null ) {
1836
              
1837
              // find the replica with the status set to 'requested'
1838
              for (Replica replica : replicaList) {
1839
                  ReplicationStatus status = replica.getReplicationStatus();
1840
                  NodeReference listedNode = replica.getReplicaMemberNode();
1841
                  if ( listedNode != null && targetNode != null ) {
1842
                      logMetacat.debug("Comparing " + listedNode.getValue()
1843
                              + " to " + targetNode.getValue());
1844
                      
1845
                      if (listedNode.getValue().equals(targetNode.getValue())
1846
                              && status.equals(ReplicationStatus.REQUESTED)) {
1847
                          isAllowed = true;
1848
                          break;
1849

    
1850
                      }
1851
                  }
1852
              }
1853
          }
1854
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1855
              "to replicate: " + isAllowed + " for " + pid.getValue());
1856

    
1857
          
1858
      } else {
1859
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1860
          " is null.");
1861
          String error ="";
1862
          String localId = null;
1863
          try {
1864
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1865
            
1866
           } catch (Exception e) {
1867
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1868
          }
1869
          
1870
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1871
              error = DELETEDMESSAGE;
1872
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1873
              error = DELETEDMESSAGE;
1874
          }
1875
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1876
          
1877
      }
1878

    
1879
    } catch (RuntimeException e) {
1880
    	  ServiceFailure sf = new ServiceFailure("4872", 
1881
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1882
                e.getMessage());
1883
    	  sf.initCause(e);
1884
        throw sf;
1885
        
1886
    }
1887
      
1888
    return isAllowed;
1889
    
1890
  }
1891

    
1892
  /**
1893
   * Adds a new object to the Node, where the object is a science metadata object.
1894
   * 
1895
   * @param session - the Session object containing the credentials for the Subject
1896
   * @param pid - The object identifier to be created
1897
   * @param object - the object bytes
1898
   * @param sysmeta - the system metadata that describes the object  
1899
   * 
1900
   * @return pid - the object identifier created
1901
   * 
1902
   * @throws InvalidToken
1903
   * @throws ServiceFailure
1904
   * @throws NotAuthorized
1905
   * @throws IdentifierNotUnique
1906
   * @throws UnsupportedType
1907
   * @throws InsufficientResources
1908
   * @throws InvalidSystemMetadata
1909
   * @throws NotImplemented
1910
   * @throws InvalidRequest
1911
   */
1912
  public Identifier create(Session session, Identifier pid, InputStream object,
1913
    SystemMetadata sysmeta) 
1914
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1915
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1916
    NotImplemented, InvalidRequest {
1917
    
1918
    try {
1919
      // verify the pid is valid format
1920
      if (!isValidIdentifier(pid)) {
1921
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1922
      }
1923
      logMetacat.debug("CN.create -start to create the object with pid "+pid.getValue());
1924
      // The lock to be used for this identifier
1925
      Lock lock = null;
1926

    
1927
      try {
1928
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1929
          lock.lock();
1930
          // are we allowed?
1931
          boolean isAllowed = false;
1932
          isAllowed = isAdminAuthorized(session);
1933
          
1934
          // additional check if it is the authoritative node if it is not the admin
1935
          if(!isAllowed) {
1936
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1937
          }
1938

    
1939
          // proceed if we're called by a CN
1940
          if ( isAllowed ) {
1941
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1942
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1943
              Identifier sid = sysmeta.getSeriesId();
1944
              if(sid != null) {
1945
                  if (!isValidIdentifier(sid)) {
1946
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1947
                  }
1948
              }
1949
              // create the coordinating node version of the document      
1950
              logMetacat.debug("CN.create - after locking identifier, passing authorization check, continue to create the object " + pid.getValue());
1951
              sysmeta.setSerialVersion(BigInteger.ONE);
1952
              //for the object whose authoritative mn is v1. we need reset the modification date.
1953
              //for the object whose authoritative mn is v2. we just accept the modification date.
1954
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1955
              String version = checker.getVersion("MNRead");
1956
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1957
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1958
              }
1959
              //sysmeta.setArchived(false); // this is a create op, not update
1960
              
1961
              // the CN should have set the origin and authoritative member node fields
1962
              try {
1963
                  sysmeta.getOriginMemberNode().getValue();
1964
                  sysmeta.getAuthoritativeMemberNode().getValue();
1965
                  
1966
              } catch (NullPointerException npe) {
1967
                  throw new InvalidSystemMetadata("4896", 
1968
                      "Both the origin and authoritative member node identifiers need to be set.");
1969
                  
1970
              }
1971
              pid = super.create(session, pid, object, sysmeta);
1972
              // submit the index task to the message broker.
1973
              String methodName="create";
1974
              boolean isData = false;
1975
              submitAddIndexTask(sysmeta, methodName, isData);
1976
          } else {
1977
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1978
                  " isn't allowed to call create() on a Coordinating Node for pid "+pid.getValue();
1979
              logMetacat.error(msg);
1980
              throw new NotAuthorized("1100", msg);
1981
          }
1982
          
1983
      } catch (RuntimeException e) {
1984
          // Convert Hazelcast runtime exceptions to service failures
1985
          String msg = "There was a problem creating the object identified by " +
1986
              pid.getValue() + ". There error message was: " + e.getMessage();
1987
          throw new ServiceFailure("4893", msg);
1988
          
1989
      } finally {
1990
    	  if (lock != null) {
1991
	          lock.unlock();
1992
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1993
    	  }
1994
      }
1995
    } finally {
1996
        IOUtils.closeQuietly(object);
1997
    }
1998
    return pid;
1999

    
2000
  }
2001

    
2002
  /**
2003
   * Set access for a given object using the object identifier and a Subject
2004
   * under a given Session.
2005
   * This method only applies the objects whose authoritative mn is a v1 node.
2006
   * @param session - the Session object containing the credentials for the Subject
2007
   * @param pid - the object identifier for the given object to apply the policy
2008
   * @param policy - the access policy to be applied
2009
   * 
2010
   * @return true if the application of the policy succeeds
2011
   * @throws InvalidToken
2012
   * @throws ServiceFailure
2013
   * @throws NotFound
2014
   * @throws NotAuthorized
2015
   * @throws NotImplemented
2016
   * @throws InvalidRequest
2017
   */
2018
  public boolean setAccessPolicy(Session session, Identifier pid, 
2019
      AccessPolicy accessPolicy, long serialVersion) 
2020
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
2021
      NotImplemented, InvalidRequest, VersionMismatch {
2022
      
2023
   // do we have a valid pid?
2024
      if (pid == null || pid.getValue().trim().equals("")) {
2025
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
2026
          
2027
      }
2028
      
2029
      String serviceFailureCode = "4430";
2030
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
2031
      if(sid != null) {
2032
          pid = sid;
2033
      }
2034
      // The lock to be used for this identifier
2035
      Lock lock = null;
2036
      SystemMetadata systemMetadata = null;
2037
      
2038
      boolean success = false;
2039
      
2040
      // get the subject
2041
      Subject subject = session.getSubject();
2042
      
2043
      // are we allowed to do this?
2044
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
2045
          throw new NotAuthorized("4420", "not allowed by "
2046
                  + subject.getValue() + " on " + pid.getValue());
2047
      }
2048
      
2049
      try {
2050
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2051
          lock.lock();
2052
          logMetacat.debug("Locked identifier " + pid.getValue());
2053

    
2054
          try {
2055
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2056

    
2057
              if ( systemMetadata == null ) {
2058
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
2059
                  
2060
              }
2061
              // does the request have the most current system metadata?
2062
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2063
                 String msg = "The requested system metadata version number " + 
2064
                     serialVersion + " differs from the current version at " +
2065
                     systemMetadata.getSerialVersion().longValue() +
2066
                     ". Please get the latest copy in order to modify it.";
2067
                 throw new VersionMismatch("4402", msg);
2068
                 
2069
              }
2070
              
2071
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
2072
              String version = checker.getVersion("MNRead");
2073
              if(version == null) {
2074
                  throw new ServiceFailure("4430", "Couldn't determine the version of MNRead of the authoritative member node for the pid "+pid.getValue());
2075
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2076
                  //we don't apply this method to an object whose authoritative node is v2
2077
                  throw new NotAuthorized("4420", V2V1MISSMATCH);
2078
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2079
                  //we don't understand this version (it is not v1 or v2)
2080
                  throw new InvalidRequest("4402", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
2081
              }
2082
              
2083
          } catch (RuntimeException e) {
2084
              // convert Hazelcast RuntimeException to NotFound
2085
              throw new NotFound("4400", "No record found for: " + pid);
2086
            
2087
          }
2088
              
2089
          // set the access policy
2090
          systemMetadata.setAccessPolicy(accessPolicy);
2091
          
2092
          // update the system metadata
2093
          try {
2094
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2095
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2096
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2097
              //submit the index task
2098
              String methodName ="setAccessPolicy";
2099
              submitUpdateIndexTask(systemMetadata, methodName);
2100
              notifyReplicaNodes(systemMetadata);
2101
              
2102
          } catch (RuntimeException e) {
2103
              // convert Hazelcast RuntimeException to ServiceFailure
2104
              throw new ServiceFailure("4430", e.getMessage());
2105
            
2106
          }
2107
          
2108
      } catch (RuntimeException e) {
2109
          throw new ServiceFailure("4430", e.getMessage());
2110
          
2111
      } finally {
2112
          lock.unlock();
2113
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2114
        
2115
      }
2116

    
2117
    
2118
    // TODO: how do we know if the map was persisted?
2119
    success = true;
2120
    
2121
    return success;
2122
  }
2123

    
2124
  /**
2125
   * Full replacement of replication metadata in the system metadata for the 
2126
   * specified object, changes date system metadata modified
2127
   * 
2128
   * @param session - the Session object containing the credentials for the Subject
2129
   * @param pid - the object identifier for the given object to apply the policy
2130
   * @param replica - the replica to be updated
2131
   * @return
2132
   * @throws NotImplemented
2133
   * @throws NotAuthorized
2134
   * @throws ServiceFailure
2135
   * @throws InvalidRequest
2136
   * @throws NotFound
2137
   * @throws VersionMismatch
2138
   */
2139
  @Override
2140
  public boolean updateReplicationMetadata(Session session, Identifier pid,
2141
      Replica replica, long serialVersion) 
2142
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
2143
      NotFound, VersionMismatch {
2144
      
2145
      // The lock to be used for this identifier
2146
      Lock lock = null;
2147
      
2148
      // get the subject
2149
      Subject subject = session.getSubject();
2150
      
2151
      // are we allowed to do this?
2152
      if(session == null) {
2153
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
2154
      } else {
2155
          if(!isCNAdmin(session)) {
2156
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
2157
        }
2158
      }
2159
      /*try {
2160

    
2161
          // what is the controlling permission?
2162
          if (!isAuthorized(session, pid, Permission.WRITE)) {
2163
              throw new NotAuthorized("4851", "not allowed by "
2164
                      + subject.getValue() + " on " + pid.getValue());
2165
          }
2166

    
2167
        
2168
      } catch (InvalidToken e) {
2169
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
2170
                  " on " + pid.getValue());  
2171
          
2172
      }*/
2173

    
2174
      SystemMetadata systemMetadata = null;
2175
      try {
2176
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2177
          lock.lock();
2178
          logMetacat.debug("Locked identifier " + pid.getValue());
2179

    
2180
          try {      
2181
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2182

    
2183
              // does the request have the most current system metadata?
2184
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2185
                 String msg = "The requested system metadata version number " + 
2186
                     serialVersion + " differs from the current version at " +
2187
                     systemMetadata.getSerialVersion().longValue() +
2188
                     ". Please get the latest copy in order to modify it.";
2189
                 throw new VersionMismatch("4855", msg);
2190
              }
2191
              
2192
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
2193
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
2194
                  " : " + e.getMessage());
2195
            
2196
          }
2197
              
2198
          // set the status for the replica
2199
          List<Replica> replicas = systemMetadata.getReplicaList();
2200
          NodeReference replicaNode = replica.getReplicaMemberNode();
2201
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2202
          int index = 0;
2203
          for (Replica listedReplica: replicas) {
2204
              
2205
              // remove the replica that we are replacing
2206
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2207
                      // don't allow status to change from COMPLETED to anything other
2208
                      // than INVALIDATED: prevents overwrites from race conditions
2209
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2210
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2211
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2212
                	  throw new InvalidRequest("4853", "Status state change from " +
2213
                			  listedReplica.getReplicationStatus() + " to " +
2214
                			  replicaStatus.toString() + "is prohibited for identifier " +
2215
                			  pid.getValue() + " and target node " + 
2216
                			  listedReplica.getReplicaMemberNode().getValue());
2217

    
2218
            	  }
2219
                  replicas.remove(index);
2220
                  break;
2221
                  
2222
              }
2223
              index++;
2224
          }
2225
          
2226
          // add the new replica item
2227
          replicas.add(replica);
2228
          systemMetadata.setReplicaList(replicas);
2229
          
2230
          // update the metadata
2231
          try {
2232
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2233
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2234
              // update the modified date for changes to the replica list
2235
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2236
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2237
              
2238
              //submit the index task
2239
              String methodName ="updateReplicationMetadata";
2240
              submitUpdateIndexTask(systemMetadata, methodName);
2241
              
2242
              // inform replica nodes of the change if the status is complete
2243
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2244
            	  notifyReplicaNodes(systemMetadata);
2245
            	  
2246
              }
2247
          } catch (RuntimeException e) {
2248
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2249
              throw new ServiceFailure("4852", e.getMessage());
2250
          
2251
          }
2252
          
2253
      } catch (RuntimeException e) {
2254
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2255
          throw new ServiceFailure("4852", e.getMessage());
2256
      
2257
      } finally {
2258
          lock.unlock();
2259
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2260
          
2261
      }
2262
    
2263
      return true;
2264
      
2265
  }
2266
  
2267
  /**
2268
   * 
2269
   */
2270
  @Override
2271
  public ObjectList listObjects(Session session, Date startTime, 
2272
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2273
      Integer start, Integer count)
2274
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2275
      ServiceFailure {
2276

    
2277
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2278
  }
2279

    
2280
  
2281
 	/**
2282
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2283
 	 * @return cal  the list of checksum algorithms
2284
 	 * 
2285
 	 * @throws ServiceFailure
2286
 	 * @throws NotImplemented
2287
 	 */
2288
  @Override
2289
  public ChecksumAlgorithmList listChecksumAlgorithms()
2290
			throws ServiceFailure, NotImplemented {
2291
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2292
		cal.addAlgorithm("MD5");
2293
		cal.addAlgorithm("SHA-1");
2294
		return cal;
2295
		
2296
	}
2297

    
2298
  /**
2299
   * Notify replica Member Nodes of system metadata changes for a given pid
2300
   * 
2301
   * @param currentSystemMetadata - the up to date system metadata
2302
   */
2303
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2304
      
2305
      Session session = null;
2306
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2307
      //MNode mn = null;
2308
      NodeReference replicaNodeRef = null;
2309
      CNode cn = null;
2310
      NodeType nodeType = null;
2311
      List<Node> nodeList = null;
2312
      
2313
      try {
2314
          cn = D1Client.getCN();
2315
          nodeList = cn.listNodes().getNodeList();
2316
          
2317
      } catch (Exception e) { // handle BaseException and other I/O issues
2318
          
2319
          // swallow errors since the call is not critical
2320
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2321
              "to communication issues with the CN: " + e.getMessage());
2322
          
2323
      }
2324
      
2325
      if ( replicaList != null ) {
2326
          
2327
          // iterate through the replicas and inform  MN replica nodes
2328
          for (Replica replica : replicaList) {
2329
              String replicationVersion = null;
2330
              replicaNodeRef = replica.getReplicaMemberNode();
2331
              try {
2332
                  if (nodeList != null) {
2333
                      // find the node type
2334
                      for (Node node : nodeList) {
2335
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2336
                              nodeType = node.getType();
2337
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2338
                              replicationVersion = checker.getVersion("MNRead");
2339
                              break;
2340
              
2341
                          }
2342
                      }
2343
                  }
2344
              
2345
                  // notify only MNs
2346
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2347
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2348
                          //connect to a v2 mn
2349
                          MNode mn = D1Client.getMN(replicaNodeRef);
2350
                          mn.systemMetadataChanged(session, 
2351
                              currentSystemMetadata.getIdentifier(), 
2352
                              currentSystemMetadata.getSerialVersion().longValue(),
2353
                              currentSystemMetadata.getDateSysMetadataModified());
2354
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2355
                          //connect to a v1 mn
2356
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2357
                          mn.systemMetadataChanged(session, 
2358
                                  currentSystemMetadata.getIdentifier(), 
2359
                                  currentSystemMetadata.getSerialVersion().longValue(),
2360
                                  currentSystemMetadata.getDateSysMetadataModified());
2361
                      }
2362
                      
2363
                  }
2364
              
2365
              } catch (Exception e) { // handle BaseException and other I/O issues
2366
              
2367
                  // swallow errors since the call is not critical
2368
                  logMetacat.error("Can't inform "
2369
                          + replicaNodeRef.getValue()
2370
                          + " of system metadata changes due "
2371
                          + "to communication issues with the CN: "
2372
                          + e.getMessage());
2373
              
2374
              }
2375
          }
2376
      }
2377
  }
2378
  
2379
  /**
2380
   * Update the system metadata of the specified pid.
2381
   * Note: the serial version and the replica list in the new system metadata will be ignored and the old values will be kept.
2382
   */
2383
  @Override
2384
  public boolean updateSystemMetadata(Session session, Identifier pid,
2385
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2386
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2387
   if(sysmeta == null) {
2388
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2389
   }
2390
   if(pid == null || pid.getValue() == null) {
2391
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2392
   }
2393

    
2394
   if (session == null) {
2395
       //TODO: many of the thrown exceptions do not use the correct error codes
2396
       //check these against the docs and correct them
2397
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2398
               "  If you are not logged in, please do so and retry the request.");
2399
   } else {
2400
         //only CN is allwoed
2401
         if(!isCNAdmin(session)) {
2402
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2403
         }
2404
   }
2405

    
2406
    //update the system metadata locally
2407
    boolean success = false;
2408
    try {
2409
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2410
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2411
       
2412
        if(currentSysmeta == null) {
2413
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2414
        }
2415
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2416
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2417
        sysmeta.setSerialVersion(currentSerialVersion);
2418
        List<Replica> replicas = currentSysmeta.getReplicaList();
2419
        sysmeta.setReplicaList(replicas);
2420
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2421
        boolean fromCN = true;
2422
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta, fromCN);
2423
        //submit the index task
2424
        String methodName="updateSystemMetadata";
2425
        submitUpdateIndexTask(sysmeta, methodName);
2426
    } finally {
2427
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2428
    }
2429
    return success;
2430
  }
2431
  
2432
    @Override
2433
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2434
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2435

    
2436
    }
2437

    
2438
	@Override
2439
	public QueryEngineDescription getQueryEngineDescription(Session session,
2440
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2441
			NotImplemented, NotFound {
2442
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2443

    
2444
	}
2445
	
2446
	@Override
2447
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2448
			ServiceFailure, NotAuthorized, NotImplemented {
2449
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2450

    
2451
	}
2452
	
2453
	@Override
2454
	public InputStream query(Session session, String queryEngine, String query)
2455
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2456
			NotImplemented, NotFound {
2457
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2458

    
2459
	}
2460
	
2461
	@Override
2462
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2463
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2464
	}
2465

    
2466
}
(1-1/9)