Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.log4j.Logger;
43
import org.dataone.client.v2.CNode;
44
import org.dataone.client.v2.MNode;
45
import org.dataone.client.v2.itk.D1Client;
46
import org.dataone.service.cn.v2.CNAuthorization;
47
import org.dataone.service.cn.v2.CNCore;
48
import org.dataone.service.cn.v2.CNRead;
49
import org.dataone.service.cn.v2.CNReplication;
50
import org.dataone.service.cn.v2.CNView;
51
import org.dataone.service.exceptions.BaseException;
52
import org.dataone.service.exceptions.IdentifierNotUnique;
53
import org.dataone.service.exceptions.InsufficientResources;
54
import org.dataone.service.exceptions.InvalidRequest;
55
import org.dataone.service.exceptions.InvalidSystemMetadata;
56
import org.dataone.service.exceptions.InvalidToken;
57
import org.dataone.service.exceptions.NotAuthorized;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.NotImplemented;
60
import org.dataone.service.exceptions.ServiceFailure;
61
import org.dataone.service.exceptions.UnsupportedType;
62
import org.dataone.service.exceptions.VersionMismatch;
63
import org.dataone.service.types.v1.AccessPolicy;
64
import org.dataone.service.types.v1.Checksum;
65
import org.dataone.service.types.v1.ChecksumAlgorithmList;
66
import org.dataone.service.types.v1.Event;
67
import org.dataone.service.types.v1.Identifier;
68
import org.dataone.service.types.v1.NodeReference;
69
import org.dataone.service.types.v1.NodeType;
70
import org.dataone.service.types.v1.ObjectFormatIdentifier;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v1_1.QueryEngineDescription;
80
import org.dataone.service.types.v1_1.QueryEngineList;
81
import org.dataone.service.types.v2.Node;
82
import org.dataone.service.types.v2.NodeList;
83
import org.dataone.service.types.v2.ObjectFormat;
84
import org.dataone.service.types.v2.ObjectFormatList;
85
import org.dataone.service.types.v2.SystemMetadata;
86
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
87
import org.dataone.service.util.TypeMarshaller;
88
import org.jibx.runtime.JiBXException;
89

    
90
import edu.ucsb.nceas.metacat.DBUtil;
91
import edu.ucsb.nceas.metacat.EventLog;
92
import edu.ucsb.nceas.metacat.IdentifierManager;
93
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
94
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
95
import edu.ucsb.nceas.metacat.properties.PropertyService;
96
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
97

    
98
/**
99
 * Represents Metacat's implementation of the DataONE Coordinating Node 
100
 * service API. Methods implement the various CN* interfaces, and methods common
101
 * to both Member Node and Coordinating Node interfaces are found in the
102
 * D1NodeService super class.
103
 *
104
 */
105
public class CNodeService extends D1NodeService implements CNAuthorization,
106
    CNCore, CNRead, CNReplication, CNView {
107

    
108
  /* the logger instance */
109
  private Logger logMetacat = null;
110
  public final static String V2V1MISSMATCH = "The Coordinating Node is not authorized to make systemMetadata changes on this object. Please make changes directly on the authoritative Member Node.";
111

    
112
  /**
113
   * singleton accessor
114
   */
115
  public static CNodeService getInstance(HttpServletRequest request) { 
116
    return new CNodeService(request);
117
  }
118
  
119
  /**
120
   * Constructor, private for singleton access
121
   */
122
  private CNodeService(HttpServletRequest request) {
123
    super(request);
124
    logMetacat = Logger.getLogger(CNodeService.class);
125
        
126
  }
127
    
128
  /**
129
   * Set the replication policy for an object given the object identifier
130
   * It only is applied to objects whose authoritative mn is a v1 node.
131
   * @param session - the Session object containing the credentials for the Subject
132
   * @param pid - the object identifier for the given object
133
   * @param policy - the replication policy to be applied
134
   * 
135
   * @return true or false
136
   * 
137
   * @throws NotImplemented
138
   * @throws NotAuthorized
139
   * @throws ServiceFailure
140
   * @throws InvalidRequest
141
   * @throws VersionMismatch
142
   * 
143
   */
144
  @Override
145
  public boolean setReplicationPolicy(Session session, Identifier pid,
146
      ReplicationPolicy policy, long serialVersion) 
147
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
148
      InvalidRequest, InvalidToken, VersionMismatch {
149
      
150
      // do we have a valid pid?
151
      if (pid == null || pid.getValue().trim().equals("")) {
152
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
153
          
154
      }
155
      
156
      //only allow pid to be passed
157
      String serviceFailure = "4882";
158
      String notFound = "4884";
159
      checkV1SystemMetaPidExist(pid, serviceFailure, "The object for given PID "+pid.getValue()+" couldn't be identified if it exists",  notFound, 
160
              "No object could be found for given PID: "+pid.getValue());
161
      
162
      /*String serviceFailureCode = "4882";
163
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
164
      if(sid != null) {
165
          pid = sid;
166
      }*/
167
      // The lock to be used for this identifier
168
      Lock lock = null;
169
      
170
      // get the subject
171
      Subject subject = session.getSubject();
172
      
173
      // are we allowed to do this?
174
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
175
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
176
                  + " not allowed by " + subject.getValue() + " on "
177
                  + pid.getValue());
178
          
179
      }
180
      
181
      SystemMetadata systemMetadata = null;
182
      try {
183
          lock = HazelcastService.getInstance().getLock(pid.getValue());
184
          lock.lock();
185
          logMetacat.debug("Locked identifier " + pid.getValue());
186

    
187
          try {
188
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
189
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
190
                  
191
              }
192
              
193
              // did we get it correctly?
194
              if ( systemMetadata == null ) {
195
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
196
                  
197
              }
198
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
199
              String version = checker.getVersion("MNStorage");
200
              if(version == null) {
201
                  throw new ServiceFailure("4882", "Couldn't determine the version of the MNStorge for the "+pid.getValue());
202
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
203
                  //we don't apply this method to an object whose authoritative node is v2
204
                  throw new NotAuthorized("4881", V2V1MISSMATCH);
205
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
206
                  //we don't understand this version (it is not v1 or v2)
207
                  throw new InvalidRequest("4883", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
208
              }
209
              // does the request have the most current system metadata?
210
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
211
                 String msg = "The requested system metadata version number " + 
212
                     serialVersion + " differs from the current version at " +
213
                     systemMetadata.getSerialVersion().longValue() +
214
                     ". Please get the latest copy in order to modify it.";
215
                 throw new VersionMismatch("4886", msg);
216
                 
217
              }
218
              
219
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
220
              throw new NotFound("4884", "No record found for: " + pid.getValue());
221
            
222
          }
223
          
224
          // set the new policy
225
          systemMetadata.setReplicationPolicy(policy);
226
          
227
          // update the metadata
228
          try {
229
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
230
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
231
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
232
              notifyReplicaNodes(systemMetadata);
233
              
234
          } catch (RuntimeException e) {
235
              throw new ServiceFailure("4882", e.getMessage());
236
          
237
          }
238
          
239
      } catch (RuntimeException e) {
240
          throw new ServiceFailure("4882", e.getMessage());
241
          
242
      } finally {
243
          lock.unlock();
244
          logMetacat.debug("Unlocked identifier " + pid.getValue());
245
          
246
      }
247
    
248
      return true;
249
  }
250

    
251
  /**
252
   * Deletes the replica from the given Member Node
253
   * NOTE: MN.delete() may be an "archive" operation. TBD.
254
   * @param session
255
   * @param pid
256
   * @param nodeId
257
   * @param serialVersion
258
   * @return
259
   * @throws InvalidToken
260
   * @throws ServiceFailure
261
   * @throws NotAuthorized
262
   * @throws NotFound
263
   * @throws NotImplemented
264
   * @throws VersionMismatch
265
   */
266
  @Override
267
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
268
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
269
	  
270
	  	// The lock to be used for this identifier
271
		Lock lock = null;
272

    
273
		// get the subject
274
		Subject subject = session.getSubject();
275

    
276
		// are we allowed to do this?
277
		/*boolean isAuthorized = false;
278
		try {
279
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
280
		} catch (InvalidRequest e) {
281
			throw new ServiceFailure("4882", e.getDescription());
282
		}
283
		if (!isAuthorized) {
284
			throw new NotAuthorized("4881", Permission.WRITE
285
					+ " not allowed by " + subject.getValue() + " on "
286
					+ pid.getValue());
287

    
288
		}*/
289
		if(session == null) {
290
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
291
		} else {
292
		    if(!isCNAdmin(session)) {
293
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
294
		    }
295
		}
296

    
297
		SystemMetadata systemMetadata = null;
298
		try {
299
			lock = HazelcastService.getInstance().getLock(pid.getValue());
300
			lock.lock();
301
			logMetacat.debug("Locked identifier " + pid.getValue());
302

    
303
			try {
304
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
305
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
306
				}
307

    
308
				// did we get it correctly?
309
				if (systemMetadata == null) {
310
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
311
				}
312

    
313
				// does the request have the most current system metadata?
314
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
315
					String msg = "The requested system metadata version number "
316
							+ serialVersion
317
							+ " differs from the current version at "
318
							+ systemMetadata.getSerialVersion().longValue()
319
							+ ". Please get the latest copy in order to modify it.";
320
					throw new VersionMismatch("4886", msg);
321

    
322
				}
323

    
324
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
325
				throw new NotFound("4884", "No record found for: " + pid.getValue());
326

    
327
			}
328
			  
329
			// check permissions
330
			// TODO: is this necessary?
331
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
332
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
333
			if (!isAllowed) {
334
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
335
			}*/
336
			  
337
			// delete the replica from the given node
338
			// CSJ: use CN.delete() to truly delete a replica, semantically
339
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
340
			//D1Client.getMN(nodeId).delete(session, pid);
341
			  
342
			// reflect that change in the system metadata
343
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
344
			for (Replica r: systemMetadata.getReplicaList()) {
345
				  if (r.getReplicaMemberNode().equals(nodeId)) {
346
					  updatedReplicas.remove(r);
347
					  break;
348
				  }
349
			}
350
			systemMetadata.setReplicaList(updatedReplicas);
351

    
352
			// update the metadata
353
			try {
354
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
355
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
356
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
357
			} catch (RuntimeException e) {
358
				throw new ServiceFailure("4882", e.getMessage());
359
			}
360

    
361
		} catch (RuntimeException e) {
362
			throw new ServiceFailure("4882", e.getMessage());
363
		} finally {
364
			lock.unlock();
365
			logMetacat.debug("Unlocked identifier " + pid.getValue());
366
		}
367

    
368
		return true;	  
369
	  
370
  }
371
  
372
  /**
373
   * Deletes an object from the Coordinating Node
374
   * 
375
   * @param session - the Session object containing the credentials for the Subject
376
   * @param pid - The object identifier to be deleted
377
   * 
378
   * @return pid - the identifier of the object used for the deletion
379
   * 
380
   * @throws InvalidToken
381
   * @throws ServiceFailure
382
   * @throws NotAuthorized
383
   * @throws NotFound
384
   * @throws NotImplemented
385
   * @throws InvalidRequest
386
   */
387
  @Override
388
  public Identifier delete(Session session, Identifier pid) 
389
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
390
      
391
      String localId = null;      // The corresponding docid for this pid
392
	  Lock lock = null;           // The lock to be used for this identifier
393
      CNode cn = null;            // a reference to the CN to get the node list    
394
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
395
      List<Node> nodeList = null; // the list of nodes in this CN environment
396

    
397
      // check for a valid session
398
      if (session == null) {
399
        	throw new InvalidToken("4963", "No session has been provided");
400
        	
401
      }
402

    
403
      // do we have a valid pid?
404
      if (pid == null || pid.getValue().trim().equals("")) {
405
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
406
          
407
      }
408
      
409
      String serviceFailureCode = "4962";
410
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
411
      if(sid != null) {
412
          pid = sid;
413
      }
414

    
415
	  // check that it is CN/admin
416
	  boolean allowed = isAdminAuthorized(session);
417
	  
418
	  // additional check if it is the authoritative node if it is not the admin
419
      if(!allowed) {
420
          allowed = isAuthoritativeMNodeAdmin(session, pid);
421
          
422
      }
423
	  
424
	  if (!allowed) {
425
		  String msg = "The subject " + session.getSubject().getValue() + 
426
			  " is not allowed to call delete() on a Coordinating Node.";
427
		  logMetacat.info(msg);
428
		  throw new NotAuthorized("4960", msg);
429
		  
430
	  }
431
	  
432
	  // Don't defer to superclass implementation without a locally registered identifier
433
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
434
      // Check for the existing identifier
435
      try {
436
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
437
          super.delete(session, pid);
438
          
439
      } catch (McdbDocNotFoundException e) {
440
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
441
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
442
    	  
443
          try {
444
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
445
  			  lock.lock();
446
  			  logMetacat.debug("Locked identifier " + pid.getValue());
447

    
448
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
449
			  if ( sysMeta != null ) {
450
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
451
				sysMeta.setArchived(true);
452
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
453
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
454
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
455
	            //since this is cn, we don't need worry about the mn solr index.
456
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
457
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
458
	            String username = session.getSubject().getValue();//just for logging purpose
459
                //since data objects were not registered in the identifier table, we use pid as the docid
460
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
461
				
462
			  } else {
463
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
464
					  ". Couldn't obtain the system metadata record.");
465
				  
466
			  }
467
			  
468
		  } catch (RuntimeException re) {
469
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
470
				  ". The error message was: " + re.getMessage());
471
			  
472
		  } finally {
473
			  lock.unlock();
474
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
475

    
476
		  }
477

    
478
          // NOTE: cannot log the delete without localId
479
//          EventLog.getInstance().log(request.getRemoteAddr(), 
480
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
481
//                  pid.getValue(), Event.DELETE.xmlValue());
482

    
483
      } catch (SQLException e) {
484
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
485
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
486
      }
487

    
488
      // get the node list
489
      try {
490
          cn = D1Client.getCN();
491
          nodeList = cn.listNodes().getNodeList();
492
          
493
      } catch (Exception e) { // handle BaseException and other I/O issues
494
          
495
          // swallow errors since the call is not critical
496
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
497
              " due to communication issues with the CN: " + e.getMessage());
498
          
499
      }
500

    
501
	  // notify the replicas
502
	  if (systemMetadata.getReplicaList() != null) {
503
		  for (Replica replica: systemMetadata.getReplicaList()) {
504
			  NodeReference replicaNode = replica.getReplicaMemberNode();
505
			  try {
506
                  if (nodeList != null) {
507
                      // find the node type
508
                      for (Node node : nodeList) {
509
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
510
                              nodeType = node.getType();
511
                              break;
512
              
513
                          }
514
                      }
515
                  }
516
                  
517
                  // only send call MN.delete() to avoid an infinite loop with the CN
518
                  if (nodeType != null && nodeType == NodeType.MN) {
519
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
520
                  }
521
                  
522
			  } catch (Exception e) {
523
				  // all we can really do is log errors and carry on with life
524
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
525
					  " from replica MN: " + replicaNode.getValue(), e);
526
			}
527
			  
528
		  }
529
	  }
530
	  
531
	  return pid;
532
      
533
  }
534
  
535
  /**
536
   * Archives an object from the Coordinating Node
537
   * 
538
   * @param session - the Session object containing the credentials for the Subject
539
   * @param pid - The object identifier to be deleted
540
   * 
541
   * @return pid - the identifier of the object used for the deletion
542
   * 
543
   * @throws InvalidToken
544
   * @throws ServiceFailure
545
   * @throws NotAuthorized
546
   * @throws NotFound
547
   * @throws NotImplemented
548
   * @throws InvalidRequest
549
   */
550
  @Override
551
  public Identifier archive(Session session, Identifier pid) 
552
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
553

    
554
      String localId = null; // The corresponding docid for this pid
555
	  Lock lock = null;      // The lock to be used for this identifier
556
      CNode cn = null;            // a reference to the CN to get the node list    
557
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
558
      List<Node> nodeList = null; // the list of nodes in this CN environment
559
      
560

    
561
      // check for a valid session
562
      if (session == null) {
563
        	throw new InvalidToken("4973", "No session has been provided");
564
        	
565
      }
566

    
567
      // do we have a valid pid?
568
      if (pid == null || pid.getValue().trim().equals("")) {
569
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
570
          
571
      }
572

    
573
	  // check that it is CN/admin
574
	  boolean allowed = isAdminAuthorized(session);
575
	  
576
	  String serviceFailureCode = "4972";
577
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
578
	  if(sid != null) {
579
	        pid = sid;
580
	  }
581
	  
582
	  //check if it is the authoritative member node
583
	  if(!allowed) {
584
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
585
	  }
586
	  
587
	  if (!allowed) {
588
		  String msg = "The subject " + session.getSubject().getValue() + 
589
				  " is not allowed to call archive() on a Coordinating Node.";
590
		  logMetacat.info(msg);
591
		  throw new NotAuthorized("4970", msg);
592
	  }
593
	  
594
      // Check for the existing identifier
595
      try {
596
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
597
          super.archive(session, pid);
598
          
599
      } catch (McdbDocNotFoundException e) {
600
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
601
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
602
    	  
603
          try {
604
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
605
  			  lock.lock();
606
  			  logMetacat.debug("Locked identifier " + pid.getValue());
607

    
608
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
609
			  if ( sysMeta != null ) {
610
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
611
				sysMeta.setArchived(true);
612
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
613
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
614
			    // notify the replicas
615
				notifyReplicaNodes(sysMeta);
616
				  
617
			  } else {
618
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
619
					  ". Couldn't obtain the system metadata record.");
620
				  
621
			  }
622
			  
623
		  } catch (RuntimeException re) {
624
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
625
				  ". The error message was: " + re.getMessage());
626
			  
627
		  } finally {
628
			  lock.unlock();
629
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
630

    
631
		  }
632

    
633
          // NOTE: cannot log the archive without localId
634
//          EventLog.getInstance().log(request.getRemoteAddr(), 
635
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
636
//                  pid.getValue(), Event.DELETE.xmlValue());
637

    
638
      } catch (SQLException e) {
639
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
640
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
641
      }
642

    
643
	  return pid;
644
      
645
  }
646
  
647
  /**
648
   * Set the obsoletedBy attribute in System Metadata
649
   * @param session
650
   * @param pid
651
   * @param obsoletedByPid
652
   * @param serialVersion
653
   * @return
654
   * @throws NotImplemented
655
   * @throws NotFound
656
   * @throws NotAuthorized
657
   * @throws ServiceFailure
658
   * @throws InvalidRequest
659
   * @throws InvalidToken
660
   * @throws VersionMismatch
661
   */
662
  @Override
663
  public boolean setObsoletedBy(Session session, Identifier pid,
664
			Identifier obsoletedByPid, long serialVersion)
665
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
666
			InvalidRequest, InvalidToken, VersionMismatch {
667
      
668
      // do we have a valid pid?
669
      if (pid == null || pid.getValue().trim().equals("")) {
670
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
671
          
672
      }
673
      
674
      /*String serviceFailureCode = "4941";
675
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
676
      if(sid != null) {
677
          pid = sid;
678
      }*/
679
      
680
      // do we have a valid pid?
681
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
682
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
683
          
684
      }
685
      
686
      try {
687
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
688
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
689
          }
690
      } catch (SQLException ee) {
691
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
692
      }
693
      
694

    
695
		// The lock to be used for this identifier
696
		Lock lock = null;
697

    
698
		// get the subject
699
		Subject subject = session.getSubject();
700

    
701
		// are we allowed to do this?
702
		if (!isAuthorized(session, pid, Permission.WRITE)) {
703
			throw new NotAuthorized("4881", Permission.WRITE
704
					+ " not allowed by " + subject.getValue() + " on "
705
					+ pid.getValue());
706

    
707
		}
708

    
709

    
710
		SystemMetadata systemMetadata = null;
711
		try {
712
			lock = HazelcastService.getInstance().getLock(pid.getValue());
713
			lock.lock();
714
			logMetacat.debug("Locked identifier " + pid.getValue());
715

    
716
			try {
717
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
718
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
719
				}
720

    
721
				// did we get it correctly?
722
				if (systemMetadata == null) {
723
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
724
				}
725

    
726
				// does the request have the most current system metadata?
727
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
728
					String msg = "The requested system metadata version number "
729
							+ serialVersion
730
							+ " differs from the current version at "
731
							+ systemMetadata.getSerialVersion().longValue()
732
							+ ". Please get the latest copy in order to modify it.";
733
					throw new VersionMismatch("4886", msg);
734

    
735
				}
736

    
737
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
738
				throw new NotFound("4884", "No record found for: " + pid.getValue());
739

    
740
			}
741

    
742
			// set the new policy
743
			systemMetadata.setObsoletedBy(obsoletedByPid);
744

    
745
			// update the metadata
746
			try {
747
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
748
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
749
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
750
			} catch (RuntimeException e) {
751
				throw new ServiceFailure("4882", e.getMessage());
752
			}
753

    
754
		} catch (RuntimeException e) {
755
			throw new ServiceFailure("4882", e.getMessage());
756
		} finally {
757
			lock.unlock();
758
			logMetacat.debug("Unlocked identifier " + pid.getValue());
759
		}
760

    
761
		return true;
762
	}
763
  
764
  
765
  /**
766
   * Set the replication status for an object given the object identifier
767
   * 
768
   * @param session - the Session object containing the credentials for the Subject
769
   * @param pid - the object identifier for the given object
770
   * @param status - the replication status to be applied
771
   * 
772
   * @return true or false
773
   * 
774
   * @throws NotImplemented
775
   * @throws NotAuthorized
776
   * @throws ServiceFailure
777
   * @throws InvalidRequest
778
   * @throws InvalidToken
779
   * @throws NotFound
780
   * 
781
   */
782
  @Override
783
  public boolean setReplicationStatus(Session session, Identifier pid,
784
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
785
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
786
      InvalidRequest, NotFound {
787
	  
788
	  // cannot be called by public
789
	  if (session == null) {
790
		  throw new NotAuthorized("4720", "Session cannot be null");
791
	  } 
792
	  
793
	  /*else {
794
	      if(!isCNAdmin(session)) {
795
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
796
        }
797
	  }*/
798
	  
799
	// do we have a valid pid?
800
      if (pid == null || pid.getValue().trim().equals("")) {
801
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
802
          
803
      }
804
      
805
      /*String serviceFailureCode = "4700";
806
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
807
      if(sid != null) {
808
          pid = sid;
809
      }*/
810
      
811
      // The lock to be used for this identifier
812
      Lock lock = null;
813
      
814
      boolean allowed = false;
815
      int replicaEntryIndex = -1;
816
      List<Replica> replicas = null;
817
      // get the subject
818
      Subject subject = session.getSubject();
819
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
820
          " is " + status.toString());
821
      
822
      SystemMetadata systemMetadata = null;
823

    
824
      try {
825
          lock = HazelcastService.getInstance().getLock(pid.getValue());
826
          lock.lock();
827
          logMetacat.debug("Locked identifier " + pid.getValue());
828

    
829
          try {      
830
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
831

    
832
              // did we get it correctly?
833
              if ( systemMetadata == null ) {
834
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
835
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
836
                  
837
              }
838
              replicas = systemMetadata.getReplicaList();
839
              int count = 0;
840
              
841
              // was there a failure? log it
842
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
843
                 String msg = "The replication request of the object identified by " + 
844
                     pid.getValue() + " failed.  The error message was " +
845
                     failure.getMessage() + ".";
846
                 logMetacat.error(msg);
847
              }
848
              
849
              if (replicas.size() > 0 && replicas != null) {
850
                  // find the target replica index in the replica list
851
                  for (Replica replica : replicas) {
852
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
853
                      String targetNodeStr = targetNode.getValue();
854
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
855
                  
856
                      if (replicaNodeStr.equals(targetNodeStr)) {
857
                          replicaEntryIndex = count;
858
                          logMetacat.debug("replica entry index is: "
859
                                  + replicaEntryIndex);
860
                          break;
861
                      }
862
                      count++;
863
                  
864
                  }
865
              }
866
              // are we allowed to do this? only CNs and target MNs are allowed
867
              CNode cn = D1Client.getCN();
868
              List<Node> nodes = cn.listNodes().getNodeList();
869
              
870
              // find the node in the node list
871
              for ( Node node : nodes ) {
872
                  
873
                  NodeReference nodeReference = node.getIdentifier();
874
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
875
                      nodeReference.getValue());
876
                  
877
                  // allow target MN certs
878
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
879
                      node.getType().equals(NodeType.MN)) {
880
                      List<Subject> nodeSubjects = node.getSubjectList();
881
                      
882
                      // check if the session subject is in the node subject list
883
                      for (Subject nodeSubject : nodeSubjects) {
884
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
885
                                  nodeSubject.getValue() + " and " + subject.getValue());
886
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
887
                              
888
                              // lastly limit to COMPLETED, INVALIDATED,
889
                              // and FAILED status updates from MNs only
890
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
891
                                   status.equals(ReplicationStatus.INVALIDATED) ||
892
                                   status.equals(ReplicationStatus.FAILED)) {
893
                                  allowed = true;
894
                                  break;
895
                                  
896
                              }                              
897
                          }
898
                      }                 
899
                  }
900
              }
901

    
902
              if ( !allowed ) {
903
                  //check for CN admin access
904
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
905
                  allowed = isCNAdmin(session);
906
                  
907
              }              
908
              
909
              if ( !allowed ) {
910
                  String msg = "The subject identified by "
911
                          + subject.getValue()
912
                          + " is not a CN or MN, and does not have permission to set the replication status for "
913
                          + "the replica identified by "
914
                          + targetNode.getValue() + ".";
915
                  logMetacat.info(msg);
916
                  throw new NotAuthorized("4720", msg);
917
                  
918
              }
919

    
920
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
921
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
922
                " : " + e.getMessage());
923
            
924
          }
925
          
926
          Replica targetReplica = new Replica();
927
          // set the status for the replica
928
          if ( replicaEntryIndex != -1 ) {
929
              targetReplica = replicas.get(replicaEntryIndex);
930
              
931
              // don't allow status to change from COMPLETED to anything other
932
              // than INVALIDATED: prevents overwrites from race conditions
933
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
934
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
935
            	  throw new InvalidRequest("4730", "Status state change from " +
936
            			  targetReplica.getReplicationStatus() + " to " +
937
            			  status.toString() + "is prohibited for identifier " +
938
            			  pid.getValue() + " and target node " + 
939
            			  targetReplica.getReplicaMemberNode().getValue());
940
              }
941
              
942
              if(targetReplica.getReplicationStatus().equals(status)) {
943
                  //There is no change in the status, we do nothing.
944
                  return true;
945
              }
946
              
947
              targetReplica.setReplicationStatus(status);
948
              
949
              logMetacat.debug("Set the replication status for " + 
950
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
951
                  targetReplica.getReplicationStatus() + " for identifier " +
952
                  pid.getValue());
953
              
954
          } else {
955
              // this is a new entry, create it
956
              targetReplica.setReplicaMemberNode(targetNode);
957
              targetReplica.setReplicationStatus(status);
958
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
959
              replicas.add(targetReplica);
960
              
961
          }
962
          
963
          systemMetadata.setReplicaList(replicas);
964
                
965
          // update the metadata
966
          try {
967
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
968
              // Based on CN behavior discussion 9/16/15, we no longer want to 
969
              // update the modified date for changes to the replica list
970
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
971
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
972

    
973
              if ( !status.equals(ReplicationStatus.QUEUED) && 
974
            	   !status.equals(ReplicationStatus.REQUESTED)) {
975
                  
976
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
977
                          "\tNODE:\t" + targetNode.getValue() + 
978
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
979
                
980
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
981
                          "\tPID:\t"  + pid.getValue() + 
982
                          "\tNODE:\t" + targetNode.getValue() + 
983
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
984
              }
985

    
986
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
987
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
988
                      " on target node " + targetNode + ". The exception was: " +
989
                      failure.getMessage());
990
              }
991
              
992
			  // update the replica nodes about the completed replica when complete
993
              if (status.equals(ReplicationStatus.COMPLETED)) {
994
				notifyReplicaNodes(systemMetadata);
995
			}
996
          
997
          } catch (RuntimeException e) {
998
              throw new ServiceFailure("4700", e.getMessage());
999
          
1000
          }
1001
          
1002
    } catch (RuntimeException e) {
1003
        String msg = "There was a RuntimeException getting the lock for " +
1004
            pid.getValue();
1005
        logMetacat.info(msg);
1006
        
1007
    } finally {
1008
        lock.unlock();
1009
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1010
        
1011
    }
1012
      
1013
      return true;
1014
  }
1015
  
1016
/**
1017
   * Return the checksum of the object given the identifier 
1018
   * 
1019
   * @param session - the Session object containing the credentials for the Subject
1020
   * @param pid - the object identifier for the given object
1021
   * 
1022
   * @return checksum - the checksum of the object
1023
   * 
1024
   * @throws InvalidToken
1025
   * @throws ServiceFailure
1026
   * @throws NotAuthorized
1027
   * @throws NotFound
1028
   * @throws NotImplemented
1029
   */
1030
  @Override
1031
  public Checksum getChecksum(Session session, Identifier pid)
1032
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1033
    NotImplemented {
1034
    
1035
	boolean isAuthorized = false;
1036
	try {
1037
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1038
	} catch (InvalidRequest e) {
1039
		throw new ServiceFailure("1410", e.getDescription());
1040
	}  
1041
    if (!isAuthorized) {
1042
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1043
    }
1044
    
1045
    SystemMetadata systemMetadata = null;
1046
    Checksum checksum = null;
1047
    
1048
    try {
1049
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1050

    
1051
        if (systemMetadata == null ) {
1052
            String error ="";
1053
            String localId = null;
1054
            try {
1055
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1056
              
1057
             } catch (Exception e) {
1058
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1059
            }
1060
            
1061
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1062
                error = DELETEDMESSAGE;
1063
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1064
                error = DELETEDMESSAGE;
1065
            }
1066
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1067
        }
1068
        checksum = systemMetadata.getChecksum();
1069
        
1070
    } catch (RuntimeException e) {
1071
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1072
            pid.getValue() + ". The error message was: " + e.getMessage());
1073
      
1074
    }
1075
    
1076
    return checksum;
1077
  }
1078

    
1079
  /**
1080
   * Resolve the location of a given object
1081
   * 
1082
   * @param session - the Session object containing the credentials for the Subject
1083
   * @param pid - the object identifier for the given object
1084
   * 
1085
   * @return objectLocationList - the list of nodes known to contain the object
1086
   * 
1087
   * @throws InvalidToken
1088
   * @throws ServiceFailure
1089
   * @throws NotAuthorized
1090
   * @throws NotFound
1091
   * @throws NotImplemented
1092
   */
1093
  @Override
1094
  public ObjectLocationList resolve(Session session, Identifier pid)
1095
    throws InvalidToken, ServiceFailure, NotAuthorized,
1096
    NotFound, NotImplemented {
1097

    
1098
    throw new NotImplemented("4131", "resolve not implemented");
1099

    
1100
  }
1101

    
1102
  /**
1103
   * Metacat does not implement this method at the CN level
1104
   */
1105
  @Override
1106
  public ObjectList search(Session session, String queryType, String query)
1107
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1108
    NotImplemented {
1109

    
1110
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1111
	  
1112
//    ObjectList objectList = null;
1113
//    try {
1114
//        objectList = 
1115
//          IdentifierManager.getInstance().querySystemMetadata(
1116
//              null, //startTime, 
1117
//              null, //endTime,
1118
//              null, //objectFormat, 
1119
//              false, //replicaStatus, 
1120
//              0, //start, 
1121
//              1000 //count
1122
//              );
1123
//        
1124
//    } catch (Exception e) {
1125
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1126
//    }
1127
//
1128
//      return objectList;
1129
		  
1130
  }
1131
  
1132
  /**
1133
   * Returns the object format registered in the DataONE Object Format 
1134
   * Vocabulary for the given format identifier
1135
   * 
1136
   * @param fmtid - the identifier of the format requested
1137
   * 
1138
   * @return objectFormat - the object format requested
1139
   * 
1140
   * @throws ServiceFailure
1141
   * @throws NotFound
1142
   * @throws InsufficientResources
1143
   * @throws NotImplemented
1144
   */
1145
  @Override
1146
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1147
    throws ServiceFailure, NotFound, NotImplemented {
1148
     
1149
      return ObjectFormatService.getInstance().getFormat(fmtid);
1150
      
1151
  }
1152

    
1153
    @Override
1154
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1155
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1156

    
1157
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1158
                "format ID: " + format.getFormatId() + "\n" + 
1159
                "format name: " + format.getFormatName() + "\n" + 
1160
                "format type: " + format.getFormatType() );
1161
        
1162
        // FIXME remove:
1163
        if (true)
1164
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1165
        
1166
        if (!isAdminAuthorized(session))
1167
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1168

    
1169
        String separator = ".";
1170
        try {
1171
            separator = PropertyService.getProperty("document.accNumSeparator");
1172
        } catch (PropertyNotFoundException e) {
1173
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1174
        }
1175

    
1176
        // find pids of last and next ObjectFormatList
1177
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1178
        int lastRev = -1;
1179
        try {
1180
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1181
        } catch (SQLException e) {
1182
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1183
        }
1184
        int nextRev = lastRev + 1;
1185
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1186
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1187
        
1188
        Identifier lastPid = new Identifier();
1189
        lastPid.setValue(lastDocID);
1190
        Identifier nextPid = new Identifier();
1191
        nextPid.setValue(nextDocID);
1192
        
1193
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1194
                + "Next ObjectFormatList document ID: " + nextDocID);
1195
        
1196
        // add new format to the current ObjectFormatList
1197
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1198
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1199
        innerList.add(format);
1200

    
1201
        // get existing (last) sysmeta and make a copy
1202
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1203
        SystemMetadata nextSysmeta = new SystemMetadata();
1204
        try {
1205
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1206
        } catch (IllegalAccessException | InvocationTargetException e) {
1207
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1208
        }
1209
        
1210
        // create the new object format list, and update the old sysmeta with obsoletedBy
1211
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1212
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1213
        
1214
        // TODO add to ObjectFormatService local cache?
1215
        
1216
        return formatId;
1217
    }
1218

    
1219
    /**
1220
     * Creates the object for the next / updated version of the ObjectFormatList.
1221
     * 
1222
     * @param session
1223
     * @param lastPid
1224
     * @param nextPid
1225
     * @param objectFormatList
1226
     * @param lastSysmeta
1227
     */
1228
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1229
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1230
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1231
        
1232
        PipedInputStream is = new PipedInputStream();
1233
        PipedOutputStream os = null;
1234
        
1235
        try {
1236
            os = new PipedOutputStream(is);
1237
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1238
        } catch (JiBXException | IOException e) {
1239
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1240
        } finally {
1241
            try {
1242
                os.flush();
1243
                os.close();
1244
            } catch (IOException ioe) {
1245
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1246
            }
1247
        }
1248
        
1249
        BigInteger docSize = lastSysmeta.getSize();
1250
        try {
1251
            docSize = BigInteger.valueOf(is.available());
1252
        } catch (IOException e) {
1253
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1254
        }
1255
        
1256
        lastSysmeta.setIdentifier(nextPid);
1257
        lastSysmeta.setObsoletes(lastPid);
1258
        lastSysmeta.setSize(docSize); 
1259
        lastSysmeta.setSubmitter(session.getSubject());
1260
        lastSysmeta.setDateUploaded(new Date());
1261
        
1262
        // create new object format list
1263
        try {
1264
            create(session, nextPid, is, lastSysmeta);
1265
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1266
                | InvalidSystemMetadata | InvalidRequest e) {
1267
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1268
        }
1269
    }
1270
  
1271
    /**
1272
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1273
     * by setting the obsoletedBy value to the pid of the new version of the 
1274
     * ObjectFormatList.
1275
     * 
1276
     * @param session
1277
     * @param lastPid
1278
     * @param obsoletedByPid
1279
     * @param lastSysmeta
1280
     * @throws ServiceFailure
1281
     */
1282
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1283
            throws ServiceFailure {
1284
        
1285
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1286
        
1287
        try {
1288
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1289
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1290
                | InvalidSystemMetadata | InvalidToken e) {
1291
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1292
        }
1293
    }
1294
  /**
1295
   * Returns a list of all object formats registered in the DataONE Object 
1296
   * Format Vocabulary
1297
    * 
1298
   * @return objectFormatList - The list of object formats registered in 
1299
   *                            the DataONE Object Format Vocabulary
1300
   * 
1301
   * @throws ServiceFailure
1302
   * @throws NotImplemented
1303
   * @throws InsufficientResources
1304
   */
1305
  @Override
1306
  public ObjectFormatList listFormats() 
1307
    throws ServiceFailure, NotImplemented {
1308

    
1309
    return ObjectFormatService.getInstance().listFormats();
1310
  }
1311

    
1312
  /**
1313
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1314
    * 
1315
   * @return nodeList - List of nodes from the registry
1316
   * 
1317
   * @throws ServiceFailure
1318
   * @throws NotImplemented
1319
   */
1320
  @Override
1321
  public NodeList listNodes() 
1322
    throws NotImplemented, ServiceFailure {
1323

    
1324
    throw new NotImplemented("4800", "listNodes not implemented");
1325
  }
1326

    
1327
  /**
1328
   * Provides a mechanism for adding system metadata independently of its 
1329
   * associated object, such as when adding system metadata for data objects.
1330
    * 
1331
   * @param session - the Session object containing the credentials for the Subject
1332
   * @param pid - The identifier of the object to register the system metadata against
1333
   * @param sysmeta - The system metadata to be registered
1334
   * 
1335
   * @return true if the registration succeeds
1336
   * 
1337
   * @throws NotImplemented
1338
   * @throws NotAuthorized
1339
   * @throws ServiceFailure
1340
   * @throws InvalidRequest
1341
   * @throws InvalidSystemMetadata
1342
   */
1343
  @Override
1344
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1345
      SystemMetadata sysmeta) 
1346
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1347
      InvalidSystemMetadata {
1348

    
1349
      // The lock to be used for this identifier
1350
      Lock lock = null;
1351

    
1352
      // TODO: control who can call this?
1353
      if (session == null) {
1354
          //TODO: many of the thrown exceptions do not use the correct error codes
1355
          //check these against the docs and correct them
1356
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1357
                  "  If you are not logged in, please do so and retry the request.");
1358
      } else {
1359
          //only CN is allwoed
1360
          if(!isCNAdmin(session)) {
1361
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1362
          }
1363
      }
1364
      // the identifier can't be an SID
1365
      try {
1366
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1367
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1368
          }
1369
      } catch (SQLException sqle) {
1370
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1371
      }
1372
      
1373
      // verify that guid == SystemMetadata.getIdentifier()
1374
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1375
          "|" + sysmeta.getIdentifier().getValue());
1376
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1377
          throw new InvalidRequest("4863", 
1378
              "The identifier in method call (" + pid.getValue() + 
1379
              ") does not match identifier in system metadata (" +
1380
              sysmeta.getIdentifier().getValue() + ").");
1381
      }
1382
      
1383
      //check if the sid is legitimate in the system metadata
1384
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1385
      Identifier sid = sysmeta.getSeriesId();
1386
      if(sid != null) {
1387
          if (!isValidIdentifier(sid)) {
1388
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1389
          }
1390
      }
1391

    
1392
      try {
1393
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1394
          lock.lock();
1395
          logMetacat.debug("Locked identifier " + pid.getValue());
1396
          logMetacat.debug("Checking if identifier exists...");
1397
          // Check that the identifier does not already exist
1398
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1399
              throw new InvalidRequest("4863", 
1400
                  "The identifier is already in use by an existing object.");
1401
          
1402
          }
1403
          
1404
          // insert the system metadata into the object store
1405
          logMetacat.debug("Starting to insert SystemMetadata...");
1406
          try {
1407
              sysmeta.setSerialVersion(BigInteger.ONE);
1408
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1409
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1410
              
1411
          } catch (RuntimeException e) {
1412
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1413
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1414
                  e.getClass() + ": " + e.getMessage());
1415
              
1416
          }
1417
          
1418
      } catch (RuntimeException e) {
1419
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1420
                  e.getClass() + ": " + e.getMessage());
1421
          
1422
      }  finally {
1423
          lock.unlock();
1424
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1425
          
1426
      }
1427

    
1428
      
1429
      logMetacat.debug("Returning from registerSystemMetadata");
1430
      
1431
      try {
1432
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1433
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1434
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1435
    	          localId, "registerSystemMetadata");
1436
      } catch (McdbDocNotFoundException e) {
1437
    	  // do nothing, no localId to log with
1438
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1439
      } catch (SQLException ee) {
1440
          // do nothing, no localId to log with
1441
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1442
      }
1443
      
1444
      
1445
      return pid;
1446
  }
1447
  
1448
  /**
1449
   * Given an optional scope and format, reserves and returns an identifier 
1450
   * within that scope and format that is unique and will not be 
1451
   * used by any other sessions. 
1452
    * 
1453
   * @param session - the Session object containing the credentials for the Subject
1454
   * @param pid - The identifier of the object to register the system metadata against
1455
   * @param scope - An optional string to be used to qualify the scope of 
1456
   *                the identifier namespace, which is applied differently 
1457
   *                depending on the format requested. If scope is not 
1458
   *                supplied, a default scope will be used.
1459
   * @param format - The optional name of the identifier format to be used, 
1460
   *                  drawn from a DataONE-specific vocabulary of identifier 
1461
   *                 format names, including several common syntaxes such 
1462
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1463
   *                 format is not supplied by the caller, the CN service 
1464
   *                 will use a default identifier format, which may change 
1465
   *                 over time.
1466
   * 
1467
   * @return true if the registration succeeds
1468
   * 
1469
   * @throws InvalidToken
1470
   * @throws ServiceFailure
1471
   * @throws NotAuthorized
1472
   * @throws IdentifierNotUnique
1473
   * @throws NotImplemented
1474
   */
1475
  @Override
1476
  public Identifier reserveIdentifier(Session session, Identifier pid)
1477
  throws InvalidToken, ServiceFailure,
1478
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1479

    
1480
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1481
  }
1482
  
1483
  @Override
1484
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1485
  throws InvalidToken, ServiceFailure,
1486
        NotAuthorized, NotImplemented, InvalidRequest {
1487
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1488
  }
1489
  
1490
  /**
1491
    * Checks whether the pid is reserved by the subject in the session param
1492
    * If the reservation is held on the pid by the subject, we return true.
1493
    * 
1494
   * @param session - the Session object containing the Subject
1495
   * @param pid - The identifier to check
1496
   * 
1497
   * @return true if the reservation exists for the subject/pid
1498
   * 
1499
   * @throws InvalidToken
1500
   * @throws ServiceFailure
1501
   * @throws NotFound - when the pid is not found (in use or in reservation)
1502
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1503
   * @throws IdentifierNotUnique - when the pid is in use
1504
   * @throws NotImplemented
1505
   */
1506

    
1507
  @Override
1508
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1509
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1510
      NotImplemented, InvalidRequest {
1511
  
1512
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1513
  }
1514

    
1515
  /**
1516
   * Changes ownership (RightsHolder) of the specified object to the 
1517
   * subject specified by userId
1518
    * 
1519
   * @param session - the Session object containing the credentials for the Subject
1520
   * @param pid - Identifier of the object to be modified
1521
   * @param userId - The subject that will be taking ownership of the specified object.
1522
   *
1523
   * @return pid - the identifier of the modified object
1524
   * 
1525
   * @throws ServiceFailure
1526
   * @throws InvalidToken
1527
   * @throws NotFound
1528
   * @throws NotAuthorized
1529
   * @throws NotImplemented
1530
   * @throws InvalidRequest
1531
   */  
1532
  @Override
1533
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1534
      long serialVersion)
1535
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1536
      NotImplemented, InvalidRequest, VersionMismatch {
1537
      
1538
      // The lock to be used for this identifier
1539
      Lock lock = null;
1540

    
1541
      // get the subject
1542
      Subject subject = session.getSubject();
1543
      
1544
      String serviceFailureCode = "4490";
1545
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1546
      if(sid != null) {
1547
          pid = sid;
1548
      }
1549
      
1550
      // are we allowed to do this?
1551
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1552
          throw new NotAuthorized("4440", "not allowed by "
1553
                  + subject.getValue() + " on " + pid.getValue());
1554
          
1555
      }
1556
      
1557
      SystemMetadata systemMetadata = null;
1558
      try {
1559
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1560
          lock.lock();
1561
          logMetacat.debug("Locked identifier " + pid.getValue());
1562

    
1563
          try {
1564
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1565
              
1566
              // does the request have the most current system metadata?
1567
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1568
                 String msg = "The requested system metadata version number " + 
1569
                     serialVersion + " differs from the current version at " +
1570
                     systemMetadata.getSerialVersion().longValue() +
1571
                     ". Please get the latest copy in order to modify it.";
1572
                 throw new VersionMismatch("4443", msg);
1573
              }
1574
              
1575
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1576
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1577
              
1578
          }
1579
              
1580
          // set the new rights holder
1581
          systemMetadata.setRightsHolder(userId);
1582
          
1583
          // update the metadata
1584
          try {
1585
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1586
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1587
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1588
              notifyReplicaNodes(systemMetadata);
1589
              
1590
          } catch (RuntimeException e) {
1591
              throw new ServiceFailure("4490", e.getMessage());
1592
          
1593
          }
1594
          
1595
      } catch (RuntimeException e) {
1596
          throw new ServiceFailure("4490", e.getMessage());
1597
          
1598
      } finally {
1599
          lock.unlock();
1600
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1601
      
1602
      }
1603
      
1604
      return pid;
1605
  }
1606

    
1607
  /**
1608
   * Verify that a replication task is authorized by comparing the target node's
1609
   * Subject (from the X.509 certificate-derived Session) with the list of 
1610
   * subjects in the known, pending replication tasks map.
1611
   * 
1612
   * @param originatingNodeSession - Session information that contains the 
1613
   *                                 identity of the calling user
1614
   * @param targetNodeSubject - Subject identifying the target node
1615
   * @param pid - the identifier of the object to be replicated
1616
   * @param replicatePermission - the execute permission to be granted
1617
   * 
1618
   * @throws ServiceFailure
1619
   * @throws NotImplemented
1620
   * @throws InvalidToken
1621
   * @throws NotAuthorized
1622
   * @throws InvalidRequest
1623
   * @throws NotFound
1624
   */
1625
  @Override
1626
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1627
    Subject targetNodeSubject, Identifier pid) 
1628
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1629
    NotFound, InvalidRequest {
1630
    
1631
    boolean isAllowed = false;
1632
    SystemMetadata sysmeta = null;
1633
    NodeReference targetNode = null;
1634
    
1635
    try {
1636
      // get the target node reference from the nodes list
1637
      CNode cn = D1Client.getCN();
1638
      List<Node> nodes = cn.listNodes().getNodeList();
1639
      
1640
      if ( nodes != null ) {
1641
        for (Node node : nodes) {
1642
            
1643
        	if (node.getSubjectList() != null) {
1644
        		
1645
	            for (Subject nodeSubject : node.getSubjectList()) {
1646
	            	
1647
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1648
	                    targetNode = node.getIdentifier();
1649
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1650
	                    break;
1651
	                }
1652
	            }
1653
        	}
1654
            
1655
            if ( targetNode != null) { break; }
1656
        }
1657
        
1658
      } else {
1659
          String msg = "Couldn't get the node list from the CN";
1660
          logMetacat.debug(msg);
1661
          throw new ServiceFailure("4872", msg);
1662
          
1663
      }
1664
      
1665
      // can't find a node listed with the given subject
1666
      if ( targetNode == null ) {
1667
          String msg = "There is no Member Node registered with a node subject " +
1668
              "matching " + targetNodeSubject.getValue();
1669
          logMetacat.info(msg);
1670
          throw new NotAuthorized("4871", msg);
1671
          
1672
      }
1673
      
1674
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1675
      
1676
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1677

    
1678
      if ( sysmeta != null ) {
1679
          
1680
          List<Replica> replicaList = sysmeta.getReplicaList();
1681
          
1682
          if ( replicaList != null ) {
1683
              
1684
              // find the replica with the status set to 'requested'
1685
              for (Replica replica : replicaList) {
1686
                  ReplicationStatus status = replica.getReplicationStatus();
1687
                  NodeReference listedNode = replica.getReplicaMemberNode();
1688
                  if ( listedNode != null && targetNode != null ) {
1689
                      logMetacat.debug("Comparing " + listedNode.getValue()
1690
                              + " to " + targetNode.getValue());
1691
                      
1692
                      if (listedNode.getValue().equals(targetNode.getValue())
1693
                              && status.equals(ReplicationStatus.REQUESTED)) {
1694
                          isAllowed = true;
1695
                          break;
1696

    
1697
                      }
1698
                  }
1699
              }
1700
          }
1701
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1702
              "to replicate: " + isAllowed + " for " + pid.getValue());
1703

    
1704
          
1705
      } else {
1706
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1707
          " is null.");
1708
          String error ="";
1709
          String localId = null;
1710
          try {
1711
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1712
            
1713
           } catch (Exception e) {
1714
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1715
          }
1716
          
1717
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1718
              error = DELETEDMESSAGE;
1719
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1720
              error = DELETEDMESSAGE;
1721
          }
1722
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1723
          
1724
      }
1725

    
1726
    } catch (RuntimeException e) {
1727
    	  ServiceFailure sf = new ServiceFailure("4872", 
1728
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1729
                e.getMessage());
1730
    	  sf.initCause(e);
1731
        throw sf;
1732
        
1733
    }
1734
      
1735
    return isAllowed;
1736
    
1737
  }
1738

    
1739
  /**
1740
   * Adds a new object to the Node, where the object is a science metadata object.
1741
   * 
1742
   * @param session - the Session object containing the credentials for the Subject
1743
   * @param pid - The object identifier to be created
1744
   * @param object - the object bytes
1745
   * @param sysmeta - the system metadata that describes the object  
1746
   * 
1747
   * @return pid - the object identifier created
1748
   * 
1749
   * @throws InvalidToken
1750
   * @throws ServiceFailure
1751
   * @throws NotAuthorized
1752
   * @throws IdentifierNotUnique
1753
   * @throws UnsupportedType
1754
   * @throws InsufficientResources
1755
   * @throws InvalidSystemMetadata
1756
   * @throws NotImplemented
1757
   * @throws InvalidRequest
1758
   */
1759
  public Identifier create(Session session, Identifier pid, InputStream object,
1760
    SystemMetadata sysmeta) 
1761
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1762
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1763
    NotImplemented, InvalidRequest {
1764
       
1765
   // verify the pid is valid format
1766
      if (!isValidIdentifier(pid)) {
1767
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1768
      }
1769
      // The lock to be used for this identifier
1770
      Lock lock = null;
1771

    
1772
      try {
1773
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1774
          lock.lock();
1775
          // are we allowed?
1776
          boolean isAllowed = false;
1777
          isAllowed = isAdminAuthorized(session);
1778
          
1779
          // additional check if it is the authoritative node if it is not the admin
1780
          if(!isAllowed) {
1781
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1782
          }
1783

    
1784
          // proceed if we're called by a CN
1785
          if ( isAllowed ) {
1786
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1787
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1788
              Identifier sid = sysmeta.getSeriesId();
1789
              if(sid != null) {
1790
                  if (!isValidIdentifier(sid)) {
1791
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1792
                  }
1793
              }
1794
              // create the coordinating node version of the document      
1795
              logMetacat.debug("Locked identifier " + pid.getValue());
1796
              sysmeta.setSerialVersion(BigInteger.ONE);
1797
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1798
              //sysmeta.setArchived(false); // this is a create op, not update
1799
              
1800
              // the CN should have set the origin and authoritative member node fields
1801
              try {
1802
                  sysmeta.getOriginMemberNode().getValue();
1803
                  sysmeta.getAuthoritativeMemberNode().getValue();
1804
                  
1805
              } catch (NullPointerException npe) {
1806
                  throw new InvalidSystemMetadata("4896", 
1807
                      "Both the origin and authoritative member node identifiers need to be set.");
1808
                  
1809
              }
1810
              pid = super.create(session, pid, object, sysmeta);
1811

    
1812
          } else {
1813
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1814
                  " isn't allowed to call create() on a Coordinating Node.";
1815
              logMetacat.info(msg);
1816
              throw new NotAuthorized("1100", msg);
1817
          }
1818
          
1819
      } catch (RuntimeException e) {
1820
          // Convert Hazelcast runtime exceptions to service failures
1821
          String msg = "There was a problem creating the object identified by " +
1822
              pid.getValue() + ". There error message was: " + e.getMessage();
1823
          throw new ServiceFailure("4893", msg);
1824
          
1825
      } finally {
1826
    	  if (lock != null) {
1827
	          lock.unlock();
1828
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1829
    	  }
1830
      }
1831
      
1832
      return pid;
1833

    
1834
  }
1835

    
1836
  /**
1837
   * Set access for a given object using the object identifier and a Subject
1838
   * under a given Session.
1839
   * This method only applies the objects whose authoritative mn is a v1 node.
1840
   * @param session - the Session object containing the credentials for the Subject
1841
   * @param pid - the object identifier for the given object to apply the policy
1842
   * @param policy - the access policy to be applied
1843
   * 
1844
   * @return true if the application of the policy succeeds
1845
   * @throws InvalidToken
1846
   * @throws ServiceFailure
1847
   * @throws NotFound
1848
   * @throws NotAuthorized
1849
   * @throws NotImplemented
1850
   * @throws InvalidRequest
1851
   */
1852
  public boolean setAccessPolicy(Session session, Identifier pid, 
1853
      AccessPolicy accessPolicy, long serialVersion) 
1854
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1855
      NotImplemented, InvalidRequest, VersionMismatch {
1856
      
1857
   // do we have a valid pid?
1858
      if (pid == null || pid.getValue().trim().equals("")) {
1859
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1860
          
1861
      }
1862
      
1863
      String serviceFailureCode = "4430";
1864
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1865
      if(sid != null) {
1866
          pid = sid;
1867
      }
1868
      // The lock to be used for this identifier
1869
      Lock lock = null;
1870
      SystemMetadata systemMetadata = null;
1871
      
1872
      boolean success = false;
1873
      
1874
      // get the subject
1875
      Subject subject = session.getSubject();
1876
      
1877
      // are we allowed to do this?
1878
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1879
          throw new NotAuthorized("4420", "not allowed by "
1880
                  + subject.getValue() + " on " + pid.getValue());
1881
      }
1882
      
1883
      try {
1884
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1885
          lock.lock();
1886
          logMetacat.debug("Locked identifier " + pid.getValue());
1887

    
1888
          try {
1889
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1890

    
1891
              if ( systemMetadata == null ) {
1892
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1893
                  
1894
              }
1895
              // does the request have the most current system metadata?
1896
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1897
                 String msg = "The requested system metadata version number " + 
1898
                     serialVersion + " differs from the current version at " +
1899
                     systemMetadata.getSerialVersion().longValue() +
1900
                     ". Please get the latest copy in order to modify it.";
1901
                 throw new VersionMismatch("4402", msg);
1902
                 
1903
              }
1904
              
1905
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1906
              String version = checker.getVersion("MNStorage");
1907
              if(version == null) {
1908
                  throw new ServiceFailure("4430", "Couldn't determine the version of the MNStorge for the "+pid.getValue());
1909
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1910
                  //we don't apply this method to an object whose authoritative node is v2
1911
                  throw new NotAuthorized("4420", V2V1MISSMATCH);
1912
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1913
                  //we don't understand this version (it is not v1 or v2)
1914
                  throw new InvalidRequest("4402", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1915
              }
1916
              
1917
          } catch (RuntimeException e) {
1918
              // convert Hazelcast RuntimeException to NotFound
1919
              throw new NotFound("4400", "No record found for: " + pid);
1920
            
1921
          }
1922
              
1923
          // set the access policy
1924
          systemMetadata.setAccessPolicy(accessPolicy);
1925
          
1926
          // update the system metadata
1927
          try {
1928
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1929
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1930
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1931
              notifyReplicaNodes(systemMetadata);
1932
              
1933
          } catch (RuntimeException e) {
1934
              // convert Hazelcast RuntimeException to ServiceFailure
1935
              throw new ServiceFailure("4430", e.getMessage());
1936
            
1937
          }
1938
          
1939
      } catch (RuntimeException e) {
1940
          throw new ServiceFailure("4430", e.getMessage());
1941
          
1942
      } finally {
1943
          lock.unlock();
1944
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1945
        
1946
      }
1947

    
1948
    
1949
    // TODO: how do we know if the map was persisted?
1950
    success = true;
1951
    
1952
    return success;
1953
  }
1954

    
1955
  /**
1956
   * Full replacement of replication metadata in the system metadata for the 
1957
   * specified object, changes date system metadata modified
1958
   * 
1959
   * @param session - the Session object containing the credentials for the Subject
1960
   * @param pid - the object identifier for the given object to apply the policy
1961
   * @param replica - the replica to be updated
1962
   * @return
1963
   * @throws NotImplemented
1964
   * @throws NotAuthorized
1965
   * @throws ServiceFailure
1966
   * @throws InvalidRequest
1967
   * @throws NotFound
1968
   * @throws VersionMismatch
1969
   */
1970
  @Override
1971
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1972
      Replica replica, long serialVersion) 
1973
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1974
      NotFound, VersionMismatch {
1975
      
1976
      // The lock to be used for this identifier
1977
      Lock lock = null;
1978
      
1979
      // get the subject
1980
      Subject subject = session.getSubject();
1981
      
1982
      // are we allowed to do this?
1983
      if(session == null) {
1984
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
1985
      } else {
1986
          if(!isCNAdmin(session)) {
1987
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
1988
        }
1989
      }
1990
      /*try {
1991

    
1992
          // what is the controlling permission?
1993
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1994
              throw new NotAuthorized("4851", "not allowed by "
1995
                      + subject.getValue() + " on " + pid.getValue());
1996
          }
1997

    
1998
        
1999
      } catch (InvalidToken e) {
2000
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
2001
                  " on " + pid.getValue());  
2002
          
2003
      }*/
2004

    
2005
      SystemMetadata systemMetadata = null;
2006
      try {
2007
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2008
          lock.lock();
2009
          logMetacat.debug("Locked identifier " + pid.getValue());
2010

    
2011
          try {      
2012
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2013

    
2014
              // does the request have the most current system metadata?
2015
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2016
                 String msg = "The requested system metadata version number " + 
2017
                     serialVersion + " differs from the current version at " +
2018
                     systemMetadata.getSerialVersion().longValue() +
2019
                     ". Please get the latest copy in order to modify it.";
2020
                 throw new VersionMismatch("4855", msg);
2021
              }
2022
              
2023
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
2024
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
2025
                  " : " + e.getMessage());
2026
            
2027
          }
2028
              
2029
          // set the status for the replica
2030
          List<Replica> replicas = systemMetadata.getReplicaList();
2031
          NodeReference replicaNode = replica.getReplicaMemberNode();
2032
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2033
          int index = 0;
2034
          for (Replica listedReplica: replicas) {
2035
              
2036
              // remove the replica that we are replacing
2037
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2038
                      // don't allow status to change from COMPLETED to anything other
2039
                      // than INVALIDATED: prevents overwrites from race conditions
2040
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2041
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2042
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2043
                	  throw new InvalidRequest("4853", "Status state change from " +
2044
                			  listedReplica.getReplicationStatus() + " to " +
2045
                			  replicaStatus.toString() + "is prohibited for identifier " +
2046
                			  pid.getValue() + " and target node " + 
2047
                			  listedReplica.getReplicaMemberNode().getValue());
2048

    
2049
            	  }
2050
                  replicas.remove(index);
2051
                  break;
2052
                  
2053
              }
2054
              index++;
2055
          }
2056
          
2057
          // add the new replica item
2058
          replicas.add(replica);
2059
          systemMetadata.setReplicaList(replicas);
2060
          
2061
          // update the metadata
2062
          try {
2063
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2064
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2065
              // update the modified date for changes to the replica list
2066
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2067
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2068
              
2069
              // inform replica nodes of the change if the status is complete
2070
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2071
            	  notifyReplicaNodes(systemMetadata);
2072
            	  
2073
              }
2074
          } catch (RuntimeException e) {
2075
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2076
              throw new ServiceFailure("4852", e.getMessage());
2077
          
2078
          }
2079
          
2080
      } catch (RuntimeException e) {
2081
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2082
          throw new ServiceFailure("4852", e.getMessage());
2083
      
2084
      } finally {
2085
          lock.unlock();
2086
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2087
          
2088
      }
2089
    
2090
      return true;
2091
      
2092
  }
2093
  
2094
  /**
2095
   * 
2096
   */
2097
  @Override
2098
  public ObjectList listObjects(Session session, Date startTime, 
2099
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2100
      Integer start, Integer count)
2101
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2102
      ServiceFailure {
2103

    
2104
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2105
  }
2106

    
2107
  
2108
 	/**
2109
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2110
 	 * @return cal  the list of checksum algorithms
2111
 	 * 
2112
 	 * @throws ServiceFailure
2113
 	 * @throws NotImplemented
2114
 	 */
2115
  @Override
2116
  public ChecksumAlgorithmList listChecksumAlgorithms()
2117
			throws ServiceFailure, NotImplemented {
2118
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2119
		cal.addAlgorithm("MD5");
2120
		cal.addAlgorithm("SHA-1");
2121
		return cal;
2122
		
2123
	}
2124

    
2125
  /**
2126
   * Notify replica Member Nodes of system metadata changes for a given pid
2127
   * 
2128
   * @param currentSystemMetadata - the up to date system metadata
2129
   */
2130
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2131
      
2132
      Session session = null;
2133
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2134
      //MNode mn = null;
2135
      NodeReference replicaNodeRef = null;
2136
      CNode cn = null;
2137
      NodeType nodeType = null;
2138
      List<Node> nodeList = null;
2139
      
2140
      try {
2141
          cn = D1Client.getCN();
2142
          nodeList = cn.listNodes().getNodeList();
2143
          
2144
      } catch (Exception e) { // handle BaseException and other I/O issues
2145
          
2146
          // swallow errors since the call is not critical
2147
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2148
              "to communication issues with the CN: " + e.getMessage());
2149
          
2150
      }
2151
      
2152
      if ( replicaList != null ) {
2153
          
2154
          // iterate through the replicas and inform  MN replica nodes
2155
          for (Replica replica : replicaList) {
2156
              String replicationVersion = null;
2157
              replicaNodeRef = replica.getReplicaMemberNode();
2158
              try {
2159
                  if (nodeList != null) {
2160
                      // find the node type
2161
                      for (Node node : nodeList) {
2162
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2163
                              nodeType = node.getType();
2164
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2165
                              replicationVersion = checker.getVersion("MNRead");
2166
                              break;
2167
              
2168
                          }
2169
                      }
2170
                  }
2171
              
2172
                  // notify only MNs
2173
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2174
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2175
                          //connect to a v2 mn
2176
                          MNode mn = D1Client.getMN(replicaNodeRef);
2177
                          mn.systemMetadataChanged(session, 
2178
                              currentSystemMetadata.getIdentifier(), 
2179
                              currentSystemMetadata.getSerialVersion().longValue(),
2180
                              currentSystemMetadata.getDateSysMetadataModified());
2181
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2182
                          //connect to a v1 mn
2183
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2184
                          mn.systemMetadataChanged(session, 
2185
                                  currentSystemMetadata.getIdentifier(), 
2186
                                  currentSystemMetadata.getSerialVersion().longValue(),
2187
                                  currentSystemMetadata.getDateSysMetadataModified());
2188
                      }
2189
                      
2190
                  }
2191
              
2192
              } catch (Exception e) { // handle BaseException and other I/O issues
2193
              
2194
                  // swallow errors since the call is not critical
2195
                  logMetacat.error("Can't inform "
2196
                          + replicaNodeRef.getValue()
2197
                          + " of system metadata changes due "
2198
                          + "to communication issues with the CN: "
2199
                          + e.getMessage());
2200
              
2201
              }
2202
          }
2203
      }
2204
  }
2205
  
2206
  /**
2207
   * Update the system metadata of the specified pid.
2208
   * Note: the serial version and the replica list in the new system metadata will be ignored and the old values will be kept.
2209
   */
2210
  @Override
2211
  public boolean updateSystemMetadata(Session session, Identifier pid,
2212
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2213
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2214
   if(sysmeta == null) {
2215
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2216
   }
2217
   if(pid == null || pid.getValue() == null) {
2218
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2219
   }
2220

    
2221
   if (session == null) {
2222
       //TODO: many of the thrown exceptions do not use the correct error codes
2223
       //check these against the docs and correct them
2224
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2225
               "  If you are not logged in, please do so and retry the request.");
2226
   } else {
2227
         //only CN is allwoed
2228
         if(!isCNAdmin(session)) {
2229
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2230
         }
2231
   }
2232

    
2233
    //update the system metadata locally
2234
    boolean success = false;
2235
    try {
2236
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2237
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2238
       
2239
        if(currentSysmeta == null) {
2240
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2241
        }
2242
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2243
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2244
        sysmeta.setSerialVersion(currentSerialVersion);
2245
        List<Replica> replicas = currentSysmeta.getReplicaList();
2246
        sysmeta.setReplicaList(replicas);
2247
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2248
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta);
2249
    } finally {
2250
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2251
    }
2252
    return success;
2253
  }
2254
  
2255
    @Override
2256
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2257
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2258

    
2259
    }
2260

    
2261
	@Override
2262
	public QueryEngineDescription getQueryEngineDescription(Session session,
2263
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2264
			NotImplemented, NotFound {
2265
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2266

    
2267
	}
2268
	
2269
	@Override
2270
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2271
			ServiceFailure, NotAuthorized, NotImplemented {
2272
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2273

    
2274
	}
2275
	
2276
	@Override
2277
	public InputStream query(Session session, String queryEngine, String query)
2278
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2279
			NotImplemented, NotFound {
2280
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2281

    
2282
	}
2283
	
2284
	@Override
2285
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2286
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2287
	}
2288

    
2289
}
(1-1/8)