Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.log4j.Logger;
43
import org.dataone.client.v2.CNode;
44
import org.dataone.client.v2.MNode;
45
import org.dataone.client.v2.itk.D1Client;
46
import org.dataone.service.cn.v2.CNAuthorization;
47
import org.dataone.service.cn.v2.CNCore;
48
import org.dataone.service.cn.v2.CNRead;
49
import org.dataone.service.cn.v2.CNReplication;
50
import org.dataone.service.cn.v2.CNView;
51
import org.dataone.service.exceptions.BaseException;
52
import org.dataone.service.exceptions.IdentifierNotUnique;
53
import org.dataone.service.exceptions.InsufficientResources;
54
import org.dataone.service.exceptions.InvalidRequest;
55
import org.dataone.service.exceptions.InvalidSystemMetadata;
56
import org.dataone.service.exceptions.InvalidToken;
57
import org.dataone.service.exceptions.NotAuthorized;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.NotImplemented;
60
import org.dataone.service.exceptions.ServiceFailure;
61
import org.dataone.service.exceptions.UnsupportedType;
62
import org.dataone.service.exceptions.VersionMismatch;
63
import org.dataone.service.types.v1.AccessPolicy;
64
import org.dataone.service.types.v1.Checksum;
65
import org.dataone.service.types.v1.ChecksumAlgorithmList;
66
import org.dataone.service.types.v1.Event;
67
import org.dataone.service.types.v1.Identifier;
68
import org.dataone.service.types.v1.NodeReference;
69
import org.dataone.service.types.v1.NodeType;
70
import org.dataone.service.types.v1.ObjectFormatIdentifier;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v1_1.QueryEngineDescription;
80
import org.dataone.service.types.v1_1.QueryEngineList;
81
import org.dataone.service.types.v2.Node;
82
import org.dataone.service.types.v2.NodeList;
83
import org.dataone.service.types.v2.ObjectFormat;
84
import org.dataone.service.types.v2.ObjectFormatList;
85
import org.dataone.service.types.v2.SystemMetadata;
86
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
87
import org.dataone.service.util.TypeMarshaller;
88
import org.jibx.runtime.JiBXException;
89

    
90
import edu.ucsb.nceas.metacat.DBUtil;
91
import edu.ucsb.nceas.metacat.EventLog;
92
import edu.ucsb.nceas.metacat.IdentifierManager;
93
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
94
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
95
import edu.ucsb.nceas.metacat.properties.PropertyService;
96
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
97

    
98
/**
99
 * Represents Metacat's implementation of the DataONE Coordinating Node 
100
 * service API. Methods implement the various CN* interfaces, and methods common
101
 * to both Member Node and Coordinating Node interfaces are found in the
102
 * D1NodeService super class.
103
 *
104
 */
105
public class CNodeService extends D1NodeService implements CNAuthorization,
106
    CNCore, CNRead, CNReplication, CNView {
107

    
108
  /* the logger instance */
109
  private Logger logMetacat = null;
110

    
111
  /**
112
   * singleton accessor
113
   */
114
  public static CNodeService getInstance(HttpServletRequest request) { 
115
    return new CNodeService(request);
116
  }
117
  
118
  /**
119
   * Constructor, private for singleton access
120
   */
121
  private CNodeService(HttpServletRequest request) {
122
    super(request);
123
    logMetacat = Logger.getLogger(CNodeService.class);
124
        
125
  }
126
    
127
  /**
128
   * Set the replication policy for an object given the object identifier
129
   * 
130
   * @param session - the Session object containing the credentials for the Subject
131
   * @param pid - the object identifier for the given object
132
   * @param policy - the replication policy to be applied
133
   * 
134
   * @return true or false
135
   * 
136
   * @throws NotImplemented
137
   * @throws NotAuthorized
138
   * @throws ServiceFailure
139
   * @throws InvalidRequest
140
   * @throws VersionMismatch
141
   * 
142
   */
143
  @Override
144
  public boolean setReplicationPolicy(Session session, Identifier pid,
145
      ReplicationPolicy policy, long serialVersion) 
146
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
147
      InvalidRequest, InvalidToken, VersionMismatch {
148
      
149
      // do we have a valid pid?
150
      if (pid == null || pid.getValue().trim().equals("")) {
151
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
152
          
153
      }
154
      
155
      /*String serviceFailureCode = "4882";
156
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
157
      if(sid != null) {
158
          pid = sid;
159
      }*/
160
      // The lock to be used for this identifier
161
      Lock lock = null;
162
      
163
      // get the subject
164
      Subject subject = session.getSubject();
165
      
166
      // are we allowed to do this?
167
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
168
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
169
                  + " not allowed by " + subject.getValue() + " on "
170
                  + pid.getValue());
171
          
172
      }
173
      
174
      SystemMetadata systemMetadata = null;
175
      try {
176
          lock = HazelcastService.getInstance().getLock(pid.getValue());
177
          lock.lock();
178
          logMetacat.debug("Locked identifier " + pid.getValue());
179

    
180
          try {
181
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
182
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
183
                  
184
              }
185
              
186
              // did we get it correctly?
187
              if ( systemMetadata == null ) {
188
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
189
                  
190
              }
191

    
192
              // does the request have the most current system metadata?
193
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
194
                 String msg = "The requested system metadata version number " + 
195
                     serialVersion + " differs from the current version at " +
196
                     systemMetadata.getSerialVersion().longValue() +
197
                     ". Please get the latest copy in order to modify it.";
198
                 throw new VersionMismatch("4886", msg);
199
                 
200
              }
201
              
202
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
203
              throw new NotFound("4884", "No record found for: " + pid.getValue());
204
            
205
          }
206
          
207
          // set the new policy
208
          systemMetadata.setReplicationPolicy(policy);
209
          
210
          // update the metadata
211
          try {
212
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
213
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
214
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
215
              notifyReplicaNodes(systemMetadata);
216
              
217
          } catch (RuntimeException e) {
218
              throw new ServiceFailure("4882", e.getMessage());
219
          
220
          }
221
          
222
      } catch (RuntimeException e) {
223
          throw new ServiceFailure("4882", e.getMessage());
224
          
225
      } finally {
226
          lock.unlock();
227
          logMetacat.debug("Unlocked identifier " + pid.getValue());
228
          
229
      }
230
    
231
      return true;
232
  }
233

    
234
  /**
235
   * Deletes the replica from the given Member Node
236
   * NOTE: MN.delete() may be an "archive" operation. TBD.
237
   * @param session
238
   * @param pid
239
   * @param nodeId
240
   * @param serialVersion
241
   * @return
242
   * @throws InvalidToken
243
   * @throws ServiceFailure
244
   * @throws NotAuthorized
245
   * @throws NotFound
246
   * @throws NotImplemented
247
   * @throws VersionMismatch
248
   */
249
  @Override
250
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
251
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
252
	  
253
	  	// The lock to be used for this identifier
254
		Lock lock = null;
255

    
256
		// get the subject
257
		Subject subject = session.getSubject();
258

    
259
		// are we allowed to do this?
260
		/*boolean isAuthorized = false;
261
		try {
262
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
263
		} catch (InvalidRequest e) {
264
			throw new ServiceFailure("4882", e.getDescription());
265
		}
266
		if (!isAuthorized) {
267
			throw new NotAuthorized("4881", Permission.WRITE
268
					+ " not allowed by " + subject.getValue() + " on "
269
					+ pid.getValue());
270

    
271
		}*/
272
		if(session == null) {
273
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
274
		} else {
275
		    if(!isCNAdmin(session)) {
276
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
277
		    }
278
		}
279

    
280
		SystemMetadata systemMetadata = null;
281
		try {
282
			lock = HazelcastService.getInstance().getLock(pid.getValue());
283
			lock.lock();
284
			logMetacat.debug("Locked identifier " + pid.getValue());
285

    
286
			try {
287
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
288
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
289
				}
290

    
291
				// did we get it correctly?
292
				if (systemMetadata == null) {
293
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
294
				}
295

    
296
				// does the request have the most current system metadata?
297
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
298
					String msg = "The requested system metadata version number "
299
							+ serialVersion
300
							+ " differs from the current version at "
301
							+ systemMetadata.getSerialVersion().longValue()
302
							+ ". Please get the latest copy in order to modify it.";
303
					throw new VersionMismatch("4886", msg);
304

    
305
				}
306

    
307
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
308
				throw new NotFound("4884", "No record found for: " + pid.getValue());
309

    
310
			}
311
			  
312
			// check permissions
313
			// TODO: is this necessary?
314
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
315
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
316
			if (!isAllowed) {
317
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
318
			}*/
319
			  
320
			// delete the replica from the given node
321
			// CSJ: use CN.delete() to truly delete a replica, semantically
322
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
323
			//D1Client.getMN(nodeId).delete(session, pid);
324
			  
325
			// reflect that change in the system metadata
326
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
327
			for (Replica r: systemMetadata.getReplicaList()) {
328
				  if (r.getReplicaMemberNode().equals(nodeId)) {
329
					  updatedReplicas.remove(r);
330
					  break;
331
				  }
332
			}
333
			systemMetadata.setReplicaList(updatedReplicas);
334

    
335
			// update the metadata
336
			try {
337
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
338
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
339
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
340
			} catch (RuntimeException e) {
341
				throw new ServiceFailure("4882", e.getMessage());
342
			}
343

    
344
		} catch (RuntimeException e) {
345
			throw new ServiceFailure("4882", e.getMessage());
346
		} finally {
347
			lock.unlock();
348
			logMetacat.debug("Unlocked identifier " + pid.getValue());
349
		}
350

    
351
		return true;	  
352
	  
353
  }
354
  
355
  /**
356
   * Deletes an object from the Coordinating Node
357
   * 
358
   * @param session - the Session object containing the credentials for the Subject
359
   * @param pid - The object identifier to be deleted
360
   * 
361
   * @return pid - the identifier of the object used for the deletion
362
   * 
363
   * @throws InvalidToken
364
   * @throws ServiceFailure
365
   * @throws NotAuthorized
366
   * @throws NotFound
367
   * @throws NotImplemented
368
   * @throws InvalidRequest
369
   */
370
  @Override
371
  public Identifier delete(Session session, Identifier pid) 
372
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
373
      
374
      String localId = null;      // The corresponding docid for this pid
375
	  Lock lock = null;           // The lock to be used for this identifier
376
      CNode cn = null;            // a reference to the CN to get the node list    
377
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
378
      List<Node> nodeList = null; // the list of nodes in this CN environment
379

    
380
      // check for a valid session
381
      if (session == null) {
382
        	throw new InvalidToken("4963", "No session has been provided");
383
        	
384
      }
385

    
386
      // do we have a valid pid?
387
      if (pid == null || pid.getValue().trim().equals("")) {
388
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
389
          
390
      }
391
      
392
      String serviceFailureCode = "4962";
393
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
394
      if(sid != null) {
395
          pid = sid;
396
      }
397

    
398
	  // check that it is CN/admin
399
	  boolean allowed = isAdminAuthorized(session);
400
	  
401
	  // additional check if it is the authoritative node if it is not the admin
402
      if(!allowed) {
403
          allowed = isAuthoritativeMNodeAdmin(session, pid);
404
          
405
      }
406
	  
407
	  if (!allowed) {
408
		  String msg = "The subject " + session.getSubject().getValue() + 
409
			  " is not allowed to call delete() on a Coordinating Node.";
410
		  logMetacat.info(msg);
411
		  throw new NotAuthorized("4960", msg);
412
		  
413
	  }
414
	  
415
	  // Don't defer to superclass implementation without a locally registered identifier
416
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
417
      // Check for the existing identifier
418
      try {
419
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
420
          super.delete(session, pid);
421
          
422
      } catch (McdbDocNotFoundException e) {
423
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
424
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
425
    	  
426
          try {
427
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
428
  			  lock.lock();
429
  			  logMetacat.debug("Locked identifier " + pid.getValue());
430

    
431
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
432
			  if ( sysMeta != null ) {
433
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
434
				sysMeta.setArchived(true);
435
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
436
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
437
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
438
	            //since this is cn, we don't need worry about the mn solr index.
439
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
440
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
441
	            String username = session.getSubject().getValue();//just for logging purpose
442
                //since data objects were not registered in the identifier table, we use pid as the docid
443
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
444
				
445
			  } else {
446
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
447
					  ". Couldn't obtain the system metadata record.");
448
				  
449
			  }
450
			  
451
		  } catch (RuntimeException re) {
452
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
453
				  ". The error message was: " + re.getMessage());
454
			  
455
		  } finally {
456
			  lock.unlock();
457
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
458

    
459
		  }
460

    
461
          // NOTE: cannot log the delete without localId
462
//          EventLog.getInstance().log(request.getRemoteAddr(), 
463
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
464
//                  pid.getValue(), Event.DELETE.xmlValue());
465

    
466
      } catch (SQLException e) {
467
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
468
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
469
      }
470

    
471
      // get the node list
472
      try {
473
          cn = D1Client.getCN();
474
          nodeList = cn.listNodes().getNodeList();
475
          
476
      } catch (Exception e) { // handle BaseException and other I/O issues
477
          
478
          // swallow errors since the call is not critical
479
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
480
              " due to communication issues with the CN: " + e.getMessage());
481
          
482
      }
483

    
484
	  // notify the replicas
485
	  if (systemMetadata.getReplicaList() != null) {
486
		  for (Replica replica: systemMetadata.getReplicaList()) {
487
			  NodeReference replicaNode = replica.getReplicaMemberNode();
488
			  try {
489
                  if (nodeList != null) {
490
                      // find the node type
491
                      for (Node node : nodeList) {
492
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
493
                              nodeType = node.getType();
494
                              break;
495
              
496
                          }
497
                      }
498
                  }
499
                  
500
                  // only send call MN.delete() to avoid an infinite loop with the CN
501
                  if (nodeType != null && nodeType == NodeType.MN) {
502
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
503
                  }
504
                  
505
			  } catch (Exception e) {
506
				  // all we can really do is log errors and carry on with life
507
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
508
					  " from replica MN: " + replicaNode.getValue(), e);
509
			}
510
			  
511
		  }
512
	  }
513
	  
514
	  return pid;
515
      
516
  }
517
  
518
  /**
519
   * Archives an object from the Coordinating Node
520
   * 
521
   * @param session - the Session object containing the credentials for the Subject
522
   * @param pid - The object identifier to be deleted
523
   * 
524
   * @return pid - the identifier of the object used for the deletion
525
   * 
526
   * @throws InvalidToken
527
   * @throws ServiceFailure
528
   * @throws NotAuthorized
529
   * @throws NotFound
530
   * @throws NotImplemented
531
   * @throws InvalidRequest
532
   */
533
  @Override
534
  public Identifier archive(Session session, Identifier pid) 
535
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
536

    
537
      String localId = null; // The corresponding docid for this pid
538
	  Lock lock = null;      // The lock to be used for this identifier
539
      CNode cn = null;            // a reference to the CN to get the node list    
540
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
541
      List<Node> nodeList = null; // the list of nodes in this CN environment
542
      
543

    
544
      // check for a valid session
545
      if (session == null) {
546
        	throw new InvalidToken("4973", "No session has been provided");
547
        	
548
      }
549

    
550
      // do we have a valid pid?
551
      if (pid == null || pid.getValue().trim().equals("")) {
552
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
553
          
554
      }
555

    
556
	  // check that it is CN/admin
557
	  boolean allowed = isAdminAuthorized(session);
558
	  
559
	  String serviceFailureCode = "4972";
560
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
561
	  if(sid != null) {
562
	        pid = sid;
563
	  }
564
	  
565
	  //check if it is the authoritative member node
566
	  if(!allowed) {
567
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
568
	  }
569
	  
570
	  if (!allowed) {
571
		  String msg = "The subject " + session.getSubject().getValue() + 
572
				  " is not allowed to call archive() on a Coordinating Node.";
573
		  logMetacat.info(msg);
574
		  throw new NotAuthorized("4970", msg);
575
	  }
576
	  
577
      // Check for the existing identifier
578
      try {
579
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
580
          super.archive(session, pid);
581
          
582
      } catch (McdbDocNotFoundException e) {
583
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
584
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
585
    	  
586
          try {
587
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
588
  			  lock.lock();
589
  			  logMetacat.debug("Locked identifier " + pid.getValue());
590

    
591
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
592
			  if ( sysMeta != null ) {
593
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
594
				sysMeta.setArchived(true);
595
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
596
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
597
			    // notify the replicas
598
				notifyReplicaNodes(sysMeta);
599
				  
600
			  } else {
601
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
602
					  ". Couldn't obtain the system metadata record.");
603
				  
604
			  }
605
			  
606
		  } catch (RuntimeException re) {
607
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
608
				  ". The error message was: " + re.getMessage());
609
			  
610
		  } finally {
611
			  lock.unlock();
612
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
613

    
614
		  }
615

    
616
          // NOTE: cannot log the archive without localId
617
//          EventLog.getInstance().log(request.getRemoteAddr(), 
618
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
619
//                  pid.getValue(), Event.DELETE.xmlValue());
620

    
621
      } catch (SQLException e) {
622
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
623
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
624
      }
625

    
626
	  return pid;
627
      
628
  }
629
  
630
  /**
631
   * Set the obsoletedBy attribute in System Metadata
632
   * @param session
633
   * @param pid
634
   * @param obsoletedByPid
635
   * @param serialVersion
636
   * @return
637
   * @throws NotImplemented
638
   * @throws NotFound
639
   * @throws NotAuthorized
640
   * @throws ServiceFailure
641
   * @throws InvalidRequest
642
   * @throws InvalidToken
643
   * @throws VersionMismatch
644
   */
645
  @Override
646
  public boolean setObsoletedBy(Session session, Identifier pid,
647
			Identifier obsoletedByPid, long serialVersion)
648
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
649
			InvalidRequest, InvalidToken, VersionMismatch {
650
      
651
      // do we have a valid pid?
652
      if (pid == null || pid.getValue().trim().equals("")) {
653
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
654
          
655
      }
656
      
657
      /*String serviceFailureCode = "4941";
658
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
659
      if(sid != null) {
660
          pid = sid;
661
      }*/
662
      
663
      // do we have a valid pid?
664
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
665
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
666
          
667
      }
668
      
669
      try {
670
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
671
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
672
          }
673
      } catch (SQLException ee) {
674
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
675
      }
676
      
677

    
678
		// The lock to be used for this identifier
679
		Lock lock = null;
680

    
681
		// get the subject
682
		Subject subject = session.getSubject();
683

    
684
		// are we allowed to do this?
685
		if (!isAuthorized(session, pid, Permission.WRITE)) {
686
			throw new NotAuthorized("4881", Permission.WRITE
687
					+ " not allowed by " + subject.getValue() + " on "
688
					+ pid.getValue());
689

    
690
		}
691

    
692

    
693
		SystemMetadata systemMetadata = null;
694
		try {
695
			lock = HazelcastService.getInstance().getLock(pid.getValue());
696
			lock.lock();
697
			logMetacat.debug("Locked identifier " + pid.getValue());
698

    
699
			try {
700
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
701
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
702
				}
703

    
704
				// did we get it correctly?
705
				if (systemMetadata == null) {
706
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
707
				}
708

    
709
				// does the request have the most current system metadata?
710
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
711
					String msg = "The requested system metadata version number "
712
							+ serialVersion
713
							+ " differs from the current version at "
714
							+ systemMetadata.getSerialVersion().longValue()
715
							+ ". Please get the latest copy in order to modify it.";
716
					throw new VersionMismatch("4886", msg);
717

    
718
				}
719

    
720
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
721
				throw new NotFound("4884", "No record found for: " + pid.getValue());
722

    
723
			}
724

    
725
			// set the new policy
726
			systemMetadata.setObsoletedBy(obsoletedByPid);
727

    
728
			// update the metadata
729
			try {
730
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
731
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
732
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
733
			} catch (RuntimeException e) {
734
				throw new ServiceFailure("4882", e.getMessage());
735
			}
736

    
737
		} catch (RuntimeException e) {
738
			throw new ServiceFailure("4882", e.getMessage());
739
		} finally {
740
			lock.unlock();
741
			logMetacat.debug("Unlocked identifier " + pid.getValue());
742
		}
743

    
744
		return true;
745
	}
746
  
747
  
748
  /**
749
   * Set the replication status for an object given the object identifier
750
   * 
751
   * @param session - the Session object containing the credentials for the Subject
752
   * @param pid - the object identifier for the given object
753
   * @param status - the replication status to be applied
754
   * 
755
   * @return true or false
756
   * 
757
   * @throws NotImplemented
758
   * @throws NotAuthorized
759
   * @throws ServiceFailure
760
   * @throws InvalidRequest
761
   * @throws InvalidToken
762
   * @throws NotFound
763
   * 
764
   */
765
  @Override
766
  public boolean setReplicationStatus(Session session, Identifier pid,
767
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
768
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
769
      InvalidRequest, NotFound {
770
	  
771
	  // cannot be called by public
772
	  if (session == null) {
773
		  throw new NotAuthorized("4720", "Session cannot be null");
774
	  } 
775
	  
776
	  /*else {
777
	      if(!isCNAdmin(session)) {
778
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
779
        }
780
	  }*/
781
	  
782
	// do we have a valid pid?
783
      if (pid == null || pid.getValue().trim().equals("")) {
784
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
785
          
786
      }
787
      
788
      /*String serviceFailureCode = "4700";
789
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
790
      if(sid != null) {
791
          pid = sid;
792
      }*/
793
      
794
      // The lock to be used for this identifier
795
      Lock lock = null;
796
      
797
      boolean allowed = false;
798
      int replicaEntryIndex = -1;
799
      List<Replica> replicas = null;
800
      // get the subject
801
      Subject subject = session.getSubject();
802
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
803
          " is " + status.toString());
804
      
805
      SystemMetadata systemMetadata = null;
806

    
807
      try {
808
          lock = HazelcastService.getInstance().getLock(pid.getValue());
809
          lock.lock();
810
          logMetacat.debug("Locked identifier " + pid.getValue());
811

    
812
          try {      
813
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
814

    
815
              // did we get it correctly?
816
              if ( systemMetadata == null ) {
817
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
818
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
819
                  
820
              }
821
              replicas = systemMetadata.getReplicaList();
822
              int count = 0;
823
              
824
              // was there a failure? log it
825
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
826
                 String msg = "The replication request of the object identified by " + 
827
                     pid.getValue() + " failed.  The error message was " +
828
                     failure.getMessage() + ".";
829
                 logMetacat.error(msg);
830
              }
831
              
832
              if (replicas.size() > 0 && replicas != null) {
833
                  // find the target replica index in the replica list
834
                  for (Replica replica : replicas) {
835
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
836
                      String targetNodeStr = targetNode.getValue();
837
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
838
                  
839
                      if (replicaNodeStr.equals(targetNodeStr)) {
840
                          replicaEntryIndex = count;
841
                          logMetacat.debug("replica entry index is: "
842
                                  + replicaEntryIndex);
843
                          break;
844
                      }
845
                      count++;
846
                  
847
                  }
848
              }
849
              // are we allowed to do this? only CNs and target MNs are allowed
850
              CNode cn = D1Client.getCN();
851
              List<Node> nodes = cn.listNodes().getNodeList();
852
              
853
              // find the node in the node list
854
              for ( Node node : nodes ) {
855
                  
856
                  NodeReference nodeReference = node.getIdentifier();
857
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
858
                      nodeReference.getValue());
859
                  
860
                  // allow target MN certs
861
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
862
                      node.getType().equals(NodeType.MN)) {
863
                      List<Subject> nodeSubjects = node.getSubjectList();
864
                      
865
                      // check if the session subject is in the node subject list
866
                      for (Subject nodeSubject : nodeSubjects) {
867
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
868
                                  nodeSubject.getValue() + " and " + subject.getValue());
869
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
870
                              
871
                              // lastly limit to COMPLETED, INVALIDATED,
872
                              // and FAILED status updates from MNs only
873
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
874
                                   status.equals(ReplicationStatus.INVALIDATED) ||
875
                                   status.equals(ReplicationStatus.FAILED)) {
876
                                  allowed = true;
877
                                  break;
878
                                  
879
                              }                              
880
                          }
881
                      }                 
882
                  }
883
              }
884

    
885
              if ( !allowed ) {
886
                  //check for CN admin access
887
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
888
                  allowed = isCNAdmin(session);
889
                  
890
              }              
891
              
892
              if ( !allowed ) {
893
                  String msg = "The subject identified by "
894
                          + subject.getValue()
895
                          + " is not a CN or MN, and does not have permission to set the replication status for "
896
                          + "the replica identified by "
897
                          + targetNode.getValue() + ".";
898
                  logMetacat.info(msg);
899
                  throw new NotAuthorized("4720", msg);
900
                  
901
              }
902

    
903
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
904
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
905
                " : " + e.getMessage());
906
            
907
          }
908
          
909
          Replica targetReplica = new Replica();
910
          // set the status for the replica
911
          if ( replicaEntryIndex != -1 ) {
912
              targetReplica = replicas.get(replicaEntryIndex);
913
              
914
              // don't allow status to change from COMPLETED to anything other
915
              // than INVALIDATED: prevents overwrites from race conditions
916
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
917
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
918
            	  throw new InvalidRequest("4730", "Status state change from " +
919
            			  targetReplica.getReplicationStatus() + " to " +
920
            			  status.toString() + "is prohibited for identifier " +
921
            			  pid.getValue() + " and target node " + 
922
            			  targetReplica.getReplicaMemberNode().getValue());
923
              }
924
              
925
              if(targetReplica.getReplicationStatus().equals(status)) {
926
                  //There is no change in the status, we do nothing.
927
                  return true;
928
              }
929
              
930
              targetReplica.setReplicationStatus(status);
931
              
932
              logMetacat.debug("Set the replication status for " + 
933
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
934
                  targetReplica.getReplicationStatus() + " for identifier " +
935
                  pid.getValue());
936
              
937
          } else {
938
              // this is a new entry, create it
939
              targetReplica.setReplicaMemberNode(targetNode);
940
              targetReplica.setReplicationStatus(status);
941
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
942
              replicas.add(targetReplica);
943
              
944
          }
945
          
946
          systemMetadata.setReplicaList(replicas);
947
                
948
          // update the metadata
949
          try {
950
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
951
              // Based on CN behavior discussion 9/16/15, we no longer want to 
952
              // update the modified date for changes to the replica list
953
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
954
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
955

    
956
              if ( !status.equals(ReplicationStatus.QUEUED) && 
957
            	   !status.equals(ReplicationStatus.REQUESTED)) {
958
                  
959
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
960
                          "\tNODE:\t" + targetNode.getValue() + 
961
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
962
                
963
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
964
                          "\tPID:\t"  + pid.getValue() + 
965
                          "\tNODE:\t" + targetNode.getValue() + 
966
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
967
              }
968

    
969
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
970
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
971
                      " on target node " + targetNode + ". The exception was: " +
972
                      failure.getMessage());
973
              }
974
              
975
			  // update the replica nodes about the completed replica when complete
976
              if (status.equals(ReplicationStatus.COMPLETED)) {
977
				notifyReplicaNodes(systemMetadata);
978
			}
979
          
980
          } catch (RuntimeException e) {
981
              throw new ServiceFailure("4700", e.getMessage());
982
          
983
          }
984
          
985
    } catch (RuntimeException e) {
986
        String msg = "There was a RuntimeException getting the lock for " +
987
            pid.getValue();
988
        logMetacat.info(msg);
989
        
990
    } finally {
991
        lock.unlock();
992
        logMetacat.debug("Unlocked identifier " + pid.getValue());
993
        
994
    }
995
      
996
      return true;
997
  }
998
  
999
/**
1000
   * Return the checksum of the object given the identifier 
1001
   * 
1002
   * @param session - the Session object containing the credentials for the Subject
1003
   * @param pid - the object identifier for the given object
1004
   * 
1005
   * @return checksum - the checksum of the object
1006
   * 
1007
   * @throws InvalidToken
1008
   * @throws ServiceFailure
1009
   * @throws NotAuthorized
1010
   * @throws NotFound
1011
   * @throws NotImplemented
1012
   */
1013
  @Override
1014
  public Checksum getChecksum(Session session, Identifier pid)
1015
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1016
    NotImplemented {
1017
    
1018
	boolean isAuthorized = false;
1019
	try {
1020
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1021
	} catch (InvalidRequest e) {
1022
		throw new ServiceFailure("1410", e.getDescription());
1023
	}  
1024
    if (!isAuthorized) {
1025
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1026
    }
1027
    
1028
    SystemMetadata systemMetadata = null;
1029
    Checksum checksum = null;
1030
    
1031
    try {
1032
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1033

    
1034
        if (systemMetadata == null ) {
1035
            String error ="";
1036
            String localId = null;
1037
            try {
1038
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1039
              
1040
             } catch (Exception e) {
1041
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1042
            }
1043
            
1044
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1045
                error = DELETEDMESSAGE;
1046
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1047
                error = DELETEDMESSAGE;
1048
            }
1049
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1050
        }
1051
        checksum = systemMetadata.getChecksum();
1052
        
1053
    } catch (RuntimeException e) {
1054
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1055
            pid.getValue() + ". The error message was: " + e.getMessage());
1056
      
1057
    }
1058
    
1059
    return checksum;
1060
  }
1061

    
1062
  /**
1063
   * Resolve the location of a given object
1064
   * 
1065
   * @param session - the Session object containing the credentials for the Subject
1066
   * @param pid - the object identifier for the given object
1067
   * 
1068
   * @return objectLocationList - the list of nodes known to contain the object
1069
   * 
1070
   * @throws InvalidToken
1071
   * @throws ServiceFailure
1072
   * @throws NotAuthorized
1073
   * @throws NotFound
1074
   * @throws NotImplemented
1075
   */
1076
  @Override
1077
  public ObjectLocationList resolve(Session session, Identifier pid)
1078
    throws InvalidToken, ServiceFailure, NotAuthorized,
1079
    NotFound, NotImplemented {
1080

    
1081
    throw new NotImplemented("4131", "resolve not implemented");
1082

    
1083
  }
1084

    
1085
  /**
1086
   * Metacat does not implement this method at the CN level
1087
   */
1088
  @Override
1089
  public ObjectList search(Session session, String queryType, String query)
1090
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1091
    NotImplemented {
1092

    
1093
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1094
	  
1095
//    ObjectList objectList = null;
1096
//    try {
1097
//        objectList = 
1098
//          IdentifierManager.getInstance().querySystemMetadata(
1099
//              null, //startTime, 
1100
//              null, //endTime,
1101
//              null, //objectFormat, 
1102
//              false, //replicaStatus, 
1103
//              0, //start, 
1104
//              1000 //count
1105
//              );
1106
//        
1107
//    } catch (Exception e) {
1108
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1109
//    }
1110
//
1111
//      return objectList;
1112
		  
1113
  }
1114
  
1115
  /**
1116
   * Returns the object format registered in the DataONE Object Format 
1117
   * Vocabulary for the given format identifier
1118
   * 
1119
   * @param fmtid - the identifier of the format requested
1120
   * 
1121
   * @return objectFormat - the object format requested
1122
   * 
1123
   * @throws ServiceFailure
1124
   * @throws NotFound
1125
   * @throws InsufficientResources
1126
   * @throws NotImplemented
1127
   */
1128
  @Override
1129
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1130
    throws ServiceFailure, NotFound, NotImplemented {
1131
     
1132
      return ObjectFormatService.getInstance().getFormat(fmtid);
1133
      
1134
  }
1135

    
1136
    @Override
1137
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1138
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1139

    
1140
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1141
                "format ID: " + format.getFormatId() + "\n" + 
1142
                "format name: " + format.getFormatName() + "\n" + 
1143
                "format type: " + format.getFormatType() );
1144
        
1145
        // FIXME remove:
1146
        if (true)
1147
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1148
        
1149
        if (!isAdminAuthorized(session))
1150
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1151

    
1152
        String separator = ".";
1153
        try {
1154
            separator = PropertyService.getProperty("document.accNumSeparator");
1155
        } catch (PropertyNotFoundException e) {
1156
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1157
        }
1158

    
1159
        // find pids of last and next ObjectFormatList
1160
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1161
        int lastRev = -1;
1162
        try {
1163
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1164
        } catch (SQLException e) {
1165
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1166
        }
1167
        int nextRev = lastRev + 1;
1168
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1169
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1170
        
1171
        Identifier lastPid = new Identifier();
1172
        lastPid.setValue(lastDocID);
1173
        Identifier nextPid = new Identifier();
1174
        nextPid.setValue(nextDocID);
1175
        
1176
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1177
                + "Next ObjectFormatList document ID: " + nextDocID);
1178
        
1179
        // add new format to the current ObjectFormatList
1180
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1181
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1182
        innerList.add(format);
1183

    
1184
        // get existing (last) sysmeta and make a copy
1185
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1186
        SystemMetadata nextSysmeta = new SystemMetadata();
1187
        try {
1188
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1189
        } catch (IllegalAccessException | InvocationTargetException e) {
1190
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1191
        }
1192
        
1193
        // create the new object format list, and update the old sysmeta with obsoletedBy
1194
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1195
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1196
        
1197
        // TODO add to ObjectFormatService local cache?
1198
        
1199
        return formatId;
1200
    }
1201

    
1202
    /**
1203
     * Creates the object for the next / updated version of the ObjectFormatList.
1204
     * 
1205
     * @param session
1206
     * @param lastPid
1207
     * @param nextPid
1208
     * @param objectFormatList
1209
     * @param lastSysmeta
1210
     */
1211
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1212
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1213
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1214
        
1215
        PipedInputStream is = new PipedInputStream();
1216
        PipedOutputStream os = null;
1217
        
1218
        try {
1219
            os = new PipedOutputStream(is);
1220
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1221
        } catch (JiBXException | IOException e) {
1222
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1223
        } finally {
1224
            try {
1225
                os.flush();
1226
                os.close();
1227
            } catch (IOException ioe) {
1228
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1229
            }
1230
        }
1231
        
1232
        BigInteger docSize = lastSysmeta.getSize();
1233
        try {
1234
            docSize = BigInteger.valueOf(is.available());
1235
        } catch (IOException e) {
1236
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1237
        }
1238
        
1239
        lastSysmeta.setIdentifier(nextPid);
1240
        lastSysmeta.setObsoletes(lastPid);
1241
        lastSysmeta.setSize(docSize); 
1242
        lastSysmeta.setSubmitter(session.getSubject());
1243
        lastSysmeta.setDateUploaded(new Date());
1244
        
1245
        // create new object format list
1246
        try {
1247
            create(session, nextPid, is, lastSysmeta);
1248
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1249
                | InvalidSystemMetadata | InvalidRequest e) {
1250
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1251
        }
1252
    }
1253
  
1254
    /**
1255
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1256
     * by setting the obsoletedBy value to the pid of the new version of the 
1257
     * ObjectFormatList.
1258
     * 
1259
     * @param session
1260
     * @param lastPid
1261
     * @param obsoletedByPid
1262
     * @param lastSysmeta
1263
     * @throws ServiceFailure
1264
     */
1265
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1266
            throws ServiceFailure {
1267
        
1268
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1269
        
1270
        try {
1271
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1272
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1273
                | InvalidSystemMetadata | InvalidToken e) {
1274
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1275
        }
1276
    }
1277
  /**
1278
   * Returns a list of all object formats registered in the DataONE Object 
1279
   * Format Vocabulary
1280
    * 
1281
   * @return objectFormatList - The list of object formats registered in 
1282
   *                            the DataONE Object Format Vocabulary
1283
   * 
1284
   * @throws ServiceFailure
1285
   * @throws NotImplemented
1286
   * @throws InsufficientResources
1287
   */
1288
  @Override
1289
  public ObjectFormatList listFormats() 
1290
    throws ServiceFailure, NotImplemented {
1291

    
1292
    return ObjectFormatService.getInstance().listFormats();
1293
  }
1294

    
1295
  /**
1296
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1297
    * 
1298
   * @return nodeList - List of nodes from the registry
1299
   * 
1300
   * @throws ServiceFailure
1301
   * @throws NotImplemented
1302
   */
1303
  @Override
1304
  public NodeList listNodes() 
1305
    throws NotImplemented, ServiceFailure {
1306

    
1307
    throw new NotImplemented("4800", "listNodes not implemented");
1308
  }
1309

    
1310
  /**
1311
   * Provides a mechanism for adding system metadata independently of its 
1312
   * associated object, such as when adding system metadata for data objects.
1313
    * 
1314
   * @param session - the Session object containing the credentials for the Subject
1315
   * @param pid - The identifier of the object to register the system metadata against
1316
   * @param sysmeta - The system metadata to be registered
1317
   * 
1318
   * @return true if the registration succeeds
1319
   * 
1320
   * @throws NotImplemented
1321
   * @throws NotAuthorized
1322
   * @throws ServiceFailure
1323
   * @throws InvalidRequest
1324
   * @throws InvalidSystemMetadata
1325
   */
1326
  @Override
1327
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1328
      SystemMetadata sysmeta) 
1329
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1330
      InvalidSystemMetadata {
1331

    
1332
      // The lock to be used for this identifier
1333
      Lock lock = null;
1334

    
1335
      // TODO: control who can call this?
1336
      if (session == null) {
1337
          //TODO: many of the thrown exceptions do not use the correct error codes
1338
          //check these against the docs and correct them
1339
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1340
                  "  If you are not logged in, please do so and retry the request.");
1341
      } else {
1342
          //only CN is allwoed
1343
          if(!isCNAdmin(session)) {
1344
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1345
          }
1346
      }
1347
      // the identifier can't be an SID
1348
      try {
1349
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1350
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1351
          }
1352
      } catch (SQLException sqle) {
1353
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1354
      }
1355
      
1356
      // verify that guid == SystemMetadata.getIdentifier()
1357
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1358
          "|" + sysmeta.getIdentifier().getValue());
1359
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1360
          throw new InvalidRequest("4863", 
1361
              "The identifier in method call (" + pid.getValue() + 
1362
              ") does not match identifier in system metadata (" +
1363
              sysmeta.getIdentifier().getValue() + ").");
1364
      }
1365
      
1366
      //check if the sid is legitimate in the system metadata
1367
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1368
      Identifier sid = sysmeta.getSeriesId();
1369
      if(sid != null) {
1370
          if (!isValidIdentifier(sid)) {
1371
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1372
          }
1373
      }
1374

    
1375
      try {
1376
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1377
          lock.lock();
1378
          logMetacat.debug("Locked identifier " + pid.getValue());
1379
          logMetacat.debug("Checking if identifier exists...");
1380
          // Check that the identifier does not already exist
1381
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1382
              throw new InvalidRequest("4863", 
1383
                  "The identifier is already in use by an existing object.");
1384
          
1385
          }
1386
          
1387
          // insert the system metadata into the object store
1388
          logMetacat.debug("Starting to insert SystemMetadata...");
1389
          try {
1390
              sysmeta.setSerialVersion(BigInteger.ONE);
1391
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1392
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1393
              
1394
          } catch (RuntimeException e) {
1395
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1396
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1397
                  e.getClass() + ": " + e.getMessage());
1398
              
1399
          }
1400
          
1401
      } catch (RuntimeException e) {
1402
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1403
                  e.getClass() + ": " + e.getMessage());
1404
          
1405
      }  finally {
1406
          lock.unlock();
1407
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1408
          
1409
      }
1410

    
1411
      
1412
      logMetacat.debug("Returning from registerSystemMetadata");
1413
      
1414
      try {
1415
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1416
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1417
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1418
    	          localId, "registerSystemMetadata");
1419
      } catch (McdbDocNotFoundException e) {
1420
    	  // do nothing, no localId to log with
1421
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1422
      } catch (SQLException ee) {
1423
          // do nothing, no localId to log with
1424
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1425
      }
1426
      
1427
      
1428
      return pid;
1429
  }
1430
  
1431
  /**
1432
   * Given an optional scope and format, reserves and returns an identifier 
1433
   * within that scope and format that is unique and will not be 
1434
   * used by any other sessions. 
1435
    * 
1436
   * @param session - the Session object containing the credentials for the Subject
1437
   * @param pid - The identifier of the object to register the system metadata against
1438
   * @param scope - An optional string to be used to qualify the scope of 
1439
   *                the identifier namespace, which is applied differently 
1440
   *                depending on the format requested. If scope is not 
1441
   *                supplied, a default scope will be used.
1442
   * @param format - The optional name of the identifier format to be used, 
1443
   *                  drawn from a DataONE-specific vocabulary of identifier 
1444
   *                 format names, including several common syntaxes such 
1445
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1446
   *                 format is not supplied by the caller, the CN service 
1447
   *                 will use a default identifier format, which may change 
1448
   *                 over time.
1449
   * 
1450
   * @return true if the registration succeeds
1451
   * 
1452
   * @throws InvalidToken
1453
   * @throws ServiceFailure
1454
   * @throws NotAuthorized
1455
   * @throws IdentifierNotUnique
1456
   * @throws NotImplemented
1457
   */
1458
  @Override
1459
  public Identifier reserveIdentifier(Session session, Identifier pid)
1460
  throws InvalidToken, ServiceFailure,
1461
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1462

    
1463
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1464
  }
1465
  
1466
  @Override
1467
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1468
  throws InvalidToken, ServiceFailure,
1469
        NotAuthorized, NotImplemented, InvalidRequest {
1470
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1471
  }
1472
  
1473
  /**
1474
    * Checks whether the pid is reserved by the subject in the session param
1475
    * If the reservation is held on the pid by the subject, we return true.
1476
    * 
1477
   * @param session - the Session object containing the Subject
1478
   * @param pid - The identifier to check
1479
   * 
1480
   * @return true if the reservation exists for the subject/pid
1481
   * 
1482
   * @throws InvalidToken
1483
   * @throws ServiceFailure
1484
   * @throws NotFound - when the pid is not found (in use or in reservation)
1485
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1486
   * @throws IdentifierNotUnique - when the pid is in use
1487
   * @throws NotImplemented
1488
   */
1489

    
1490
  @Override
1491
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1492
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1493
      NotImplemented, InvalidRequest {
1494
  
1495
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1496
  }
1497

    
1498
  /**
1499
   * Changes ownership (RightsHolder) of the specified object to the 
1500
   * subject specified by userId
1501
    * 
1502
   * @param session - the Session object containing the credentials for the Subject
1503
   * @param pid - Identifier of the object to be modified
1504
   * @param userId - The subject that will be taking ownership of the specified object.
1505
   *
1506
   * @return pid - the identifier of the modified object
1507
   * 
1508
   * @throws ServiceFailure
1509
   * @throws InvalidToken
1510
   * @throws NotFound
1511
   * @throws NotAuthorized
1512
   * @throws NotImplemented
1513
   * @throws InvalidRequest
1514
   */  
1515
  @Override
1516
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1517
      long serialVersion)
1518
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1519
      NotImplemented, InvalidRequest, VersionMismatch {
1520
      
1521
      // The lock to be used for this identifier
1522
      Lock lock = null;
1523

    
1524
      // get the subject
1525
      Subject subject = session.getSubject();
1526
      
1527
      String serviceFailureCode = "4490";
1528
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1529
      if(sid != null) {
1530
          pid = sid;
1531
      }
1532
      
1533
      // are we allowed to do this?
1534
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1535
          throw new NotAuthorized("4440", "not allowed by "
1536
                  + subject.getValue() + " on " + pid.getValue());
1537
          
1538
      }
1539
      
1540
      SystemMetadata systemMetadata = null;
1541
      try {
1542
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1543
          lock.lock();
1544
          logMetacat.debug("Locked identifier " + pid.getValue());
1545

    
1546
          try {
1547
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1548
              
1549
              // does the request have the most current system metadata?
1550
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1551
                 String msg = "The requested system metadata version number " + 
1552
                     serialVersion + " differs from the current version at " +
1553
                     systemMetadata.getSerialVersion().longValue() +
1554
                     ". Please get the latest copy in order to modify it.";
1555
                 throw new VersionMismatch("4443", msg);
1556
              }
1557
              
1558
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1559
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1560
              
1561
          }
1562
              
1563
          // set the new rights holder
1564
          systemMetadata.setRightsHolder(userId);
1565
          
1566
          // update the metadata
1567
          try {
1568
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1569
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1570
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1571
              notifyReplicaNodes(systemMetadata);
1572
              
1573
          } catch (RuntimeException e) {
1574
              throw new ServiceFailure("4490", e.getMessage());
1575
          
1576
          }
1577
          
1578
      } catch (RuntimeException e) {
1579
          throw new ServiceFailure("4490", e.getMessage());
1580
          
1581
      } finally {
1582
          lock.unlock();
1583
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1584
      
1585
      }
1586
      
1587
      return pid;
1588
  }
1589

    
1590
  /**
1591
   * Verify that a replication task is authorized by comparing the target node's
1592
   * Subject (from the X.509 certificate-derived Session) with the list of 
1593
   * subjects in the known, pending replication tasks map.
1594
   * 
1595
   * @param originatingNodeSession - Session information that contains the 
1596
   *                                 identity of the calling user
1597
   * @param targetNodeSubject - Subject identifying the target node
1598
   * @param pid - the identifier of the object to be replicated
1599
   * @param replicatePermission - the execute permission to be granted
1600
   * 
1601
   * @throws ServiceFailure
1602
   * @throws NotImplemented
1603
   * @throws InvalidToken
1604
   * @throws NotAuthorized
1605
   * @throws InvalidRequest
1606
   * @throws NotFound
1607
   */
1608
  @Override
1609
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1610
    Subject targetNodeSubject, Identifier pid) 
1611
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1612
    NotFound, InvalidRequest {
1613
    
1614
    boolean isAllowed = false;
1615
    SystemMetadata sysmeta = null;
1616
    NodeReference targetNode = null;
1617
    
1618
    try {
1619
      // get the target node reference from the nodes list
1620
      CNode cn = D1Client.getCN();
1621
      List<Node> nodes = cn.listNodes().getNodeList();
1622
      
1623
      if ( nodes != null ) {
1624
        for (Node node : nodes) {
1625
            
1626
        	if (node.getSubjectList() != null) {
1627
        		
1628
	            for (Subject nodeSubject : node.getSubjectList()) {
1629
	            	
1630
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1631
	                    targetNode = node.getIdentifier();
1632
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1633
	                    break;
1634
	                }
1635
	            }
1636
        	}
1637
            
1638
            if ( targetNode != null) { break; }
1639
        }
1640
        
1641
      } else {
1642
          String msg = "Couldn't get the node list from the CN";
1643
          logMetacat.debug(msg);
1644
          throw new ServiceFailure("4872", msg);
1645
          
1646
      }
1647
      
1648
      // can't find a node listed with the given subject
1649
      if ( targetNode == null ) {
1650
          String msg = "There is no Member Node registered with a node subject " +
1651
              "matching " + targetNodeSubject.getValue();
1652
          logMetacat.info(msg);
1653
          throw new NotAuthorized("4871", msg);
1654
          
1655
      }
1656
      
1657
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1658
      
1659
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1660

    
1661
      if ( sysmeta != null ) {
1662
          
1663
          List<Replica> replicaList = sysmeta.getReplicaList();
1664
          
1665
          if ( replicaList != null ) {
1666
              
1667
              // find the replica with the status set to 'requested'
1668
              for (Replica replica : replicaList) {
1669
                  ReplicationStatus status = replica.getReplicationStatus();
1670
                  NodeReference listedNode = replica.getReplicaMemberNode();
1671
                  if ( listedNode != null && targetNode != null ) {
1672
                      logMetacat.debug("Comparing " + listedNode.getValue()
1673
                              + " to " + targetNode.getValue());
1674
                      
1675
                      if (listedNode.getValue().equals(targetNode.getValue())
1676
                              && status.equals(ReplicationStatus.REQUESTED)) {
1677
                          isAllowed = true;
1678
                          break;
1679

    
1680
                      }
1681
                  }
1682
              }
1683
          }
1684
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1685
              "to replicate: " + isAllowed + " for " + pid.getValue());
1686

    
1687
          
1688
      } else {
1689
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1690
          " is null.");
1691
          String error ="";
1692
          String localId = null;
1693
          try {
1694
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1695
            
1696
           } catch (Exception e) {
1697
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1698
          }
1699
          
1700
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1701
              error = DELETEDMESSAGE;
1702
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1703
              error = DELETEDMESSAGE;
1704
          }
1705
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1706
          
1707
      }
1708

    
1709
    } catch (RuntimeException e) {
1710
    	  ServiceFailure sf = new ServiceFailure("4872", 
1711
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1712
                e.getMessage());
1713
    	  sf.initCause(e);
1714
        throw sf;
1715
        
1716
    }
1717
      
1718
    return isAllowed;
1719
    
1720
  }
1721

    
1722
  /**
1723
   * Adds a new object to the Node, where the object is a science metadata object.
1724
   * 
1725
   * @param session - the Session object containing the credentials for the Subject
1726
   * @param pid - The object identifier to be created
1727
   * @param object - the object bytes
1728
   * @param sysmeta - the system metadata that describes the object  
1729
   * 
1730
   * @return pid - the object identifier created
1731
   * 
1732
   * @throws InvalidToken
1733
   * @throws ServiceFailure
1734
   * @throws NotAuthorized
1735
   * @throws IdentifierNotUnique
1736
   * @throws UnsupportedType
1737
   * @throws InsufficientResources
1738
   * @throws InvalidSystemMetadata
1739
   * @throws NotImplemented
1740
   * @throws InvalidRequest
1741
   */
1742
  public Identifier create(Session session, Identifier pid, InputStream object,
1743
    SystemMetadata sysmeta) 
1744
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1745
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1746
    NotImplemented, InvalidRequest {
1747
       
1748
   // verify the pid is valid format
1749
      if (!isValidIdentifier(pid)) {
1750
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1751
      }
1752
      // The lock to be used for this identifier
1753
      Lock lock = null;
1754

    
1755
      try {
1756
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1757
          lock.lock();
1758
          // are we allowed?
1759
          boolean isAllowed = false;
1760
          isAllowed = isAdminAuthorized(session);
1761
          
1762
          // additional check if it is the authoritative node if it is not the admin
1763
          if(!isAllowed) {
1764
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1765
          }
1766

    
1767
          // proceed if we're called by a CN
1768
          if ( isAllowed ) {
1769
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1770
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1771
              Identifier sid = sysmeta.getSeriesId();
1772
              if(sid != null) {
1773
                  if (!isValidIdentifier(sid)) {
1774
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1775
                  }
1776
              }
1777
              // create the coordinating node version of the document      
1778
              logMetacat.debug("Locked identifier " + pid.getValue());
1779
              sysmeta.setSerialVersion(BigInteger.ONE);
1780
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1781
              //sysmeta.setArchived(false); // this is a create op, not update
1782
              
1783
              // the CN should have set the origin and authoritative member node fields
1784
              try {
1785
                  sysmeta.getOriginMemberNode().getValue();
1786
                  sysmeta.getAuthoritativeMemberNode().getValue();
1787
                  
1788
              } catch (NullPointerException npe) {
1789
                  throw new InvalidSystemMetadata("4896", 
1790
                      "Both the origin and authoritative member node identifiers need to be set.");
1791
                  
1792
              }
1793
              pid = super.create(session, pid, object, sysmeta);
1794

    
1795
          } else {
1796
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1797
                  " isn't allowed to call create() on a Coordinating Node.";
1798
              logMetacat.info(msg);
1799
              throw new NotAuthorized("1100", msg);
1800
          }
1801
          
1802
      } catch (RuntimeException e) {
1803
          // Convert Hazelcast runtime exceptions to service failures
1804
          String msg = "There was a problem creating the object identified by " +
1805
              pid.getValue() + ". There error message was: " + e.getMessage();
1806
          throw new ServiceFailure("4893", msg);
1807
          
1808
      } finally {
1809
    	  if (lock != null) {
1810
	          lock.unlock();
1811
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1812
    	  }
1813
      }
1814
      
1815
      return pid;
1816

    
1817
  }
1818

    
1819
  /**
1820
   * Set access for a given object using the object identifier and a Subject
1821
   * under a given Session.
1822
   * 
1823
   * @param session - the Session object containing the credentials for the Subject
1824
   * @param pid - the object identifier for the given object to apply the policy
1825
   * @param policy - the access policy to be applied
1826
   * 
1827
   * @return true if the application of the policy succeeds
1828
   * @throws InvalidToken
1829
   * @throws ServiceFailure
1830
   * @throws NotFound
1831
   * @throws NotAuthorized
1832
   * @throws NotImplemented
1833
   * @throws InvalidRequest
1834
   */
1835
  public boolean setAccessPolicy(Session session, Identifier pid, 
1836
      AccessPolicy accessPolicy, long serialVersion) 
1837
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1838
      NotImplemented, InvalidRequest, VersionMismatch {
1839
      
1840
   // do we have a valid pid?
1841
      if (pid == null || pid.getValue().trim().equals("")) {
1842
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1843
          
1844
      }
1845
      
1846
      String serviceFailureCode = "4430";
1847
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1848
      if(sid != null) {
1849
          pid = sid;
1850
      }
1851
      // The lock to be used for this identifier
1852
      Lock lock = null;
1853
      SystemMetadata systemMetadata = null;
1854
      
1855
      boolean success = false;
1856
      
1857
      // get the subject
1858
      Subject subject = session.getSubject();
1859
      
1860
      // are we allowed to do this?
1861
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1862
          throw new NotAuthorized("4420", "not allowed by "
1863
                  + subject.getValue() + " on " + pid.getValue());
1864
      }
1865
      
1866
      try {
1867
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1868
          lock.lock();
1869
          logMetacat.debug("Locked identifier " + pid.getValue());
1870

    
1871
          try {
1872
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1873

    
1874
              if ( systemMetadata == null ) {
1875
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1876
                  
1877
              }
1878
              // does the request have the most current system metadata?
1879
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1880
                 String msg = "The requested system metadata version number " + 
1881
                     serialVersion + " differs from the current version at " +
1882
                     systemMetadata.getSerialVersion().longValue() +
1883
                     ". Please get the latest copy in order to modify it.";
1884
                 throw new VersionMismatch("4402", msg);
1885
                 
1886
              }
1887
              
1888
          } catch (RuntimeException e) {
1889
              // convert Hazelcast RuntimeException to NotFound
1890
              throw new NotFound("4400", "No record found for: " + pid);
1891
            
1892
          }
1893
              
1894
          // set the access policy
1895
          systemMetadata.setAccessPolicy(accessPolicy);
1896
          
1897
          // update the system metadata
1898
          try {
1899
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1900
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1901
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1902
              notifyReplicaNodes(systemMetadata);
1903
              
1904
          } catch (RuntimeException e) {
1905
              // convert Hazelcast RuntimeException to ServiceFailure
1906
              throw new ServiceFailure("4430", e.getMessage());
1907
            
1908
          }
1909
          
1910
      } catch (RuntimeException e) {
1911
          throw new ServiceFailure("4430", e.getMessage());
1912
          
1913
      } finally {
1914
          lock.unlock();
1915
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1916
        
1917
      }
1918

    
1919
    
1920
    // TODO: how do we know if the map was persisted?
1921
    success = true;
1922
    
1923
    return success;
1924
  }
1925

    
1926
  /**
1927
   * Full replacement of replication metadata in the system metadata for the 
1928
   * specified object, changes date system metadata modified
1929
   * 
1930
   * @param session - the Session object containing the credentials for the Subject
1931
   * @param pid - the object identifier for the given object to apply the policy
1932
   * @param replica - the replica to be updated
1933
   * @return
1934
   * @throws NotImplemented
1935
   * @throws NotAuthorized
1936
   * @throws ServiceFailure
1937
   * @throws InvalidRequest
1938
   * @throws NotFound
1939
   * @throws VersionMismatch
1940
   */
1941
  @Override
1942
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1943
      Replica replica, long serialVersion) 
1944
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1945
      NotFound, VersionMismatch {
1946
      
1947
      // The lock to be used for this identifier
1948
      Lock lock = null;
1949
      
1950
      // get the subject
1951
      Subject subject = session.getSubject();
1952
      
1953
      // are we allowed to do this?
1954
      if(session == null) {
1955
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
1956
      } else {
1957
          if(!isCNAdmin(session)) {
1958
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
1959
        }
1960
      }
1961
      /*try {
1962

    
1963
          // what is the controlling permission?
1964
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1965
              throw new NotAuthorized("4851", "not allowed by "
1966
                      + subject.getValue() + " on " + pid.getValue());
1967
          }
1968

    
1969
        
1970
      } catch (InvalidToken e) {
1971
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1972
                  " on " + pid.getValue());  
1973
          
1974
      }*/
1975

    
1976
      SystemMetadata systemMetadata = null;
1977
      try {
1978
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1979
          lock.lock();
1980
          logMetacat.debug("Locked identifier " + pid.getValue());
1981

    
1982
          try {      
1983
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1984

    
1985
              // does the request have the most current system metadata?
1986
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1987
                 String msg = "The requested system metadata version number " + 
1988
                     serialVersion + " differs from the current version at " +
1989
                     systemMetadata.getSerialVersion().longValue() +
1990
                     ". Please get the latest copy in order to modify it.";
1991
                 throw new VersionMismatch("4855", msg);
1992
              }
1993
              
1994
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1995
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1996
                  " : " + e.getMessage());
1997
            
1998
          }
1999
              
2000
          // set the status for the replica
2001
          List<Replica> replicas = systemMetadata.getReplicaList();
2002
          NodeReference replicaNode = replica.getReplicaMemberNode();
2003
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2004
          int index = 0;
2005
          for (Replica listedReplica: replicas) {
2006
              
2007
              // remove the replica that we are replacing
2008
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2009
                      // don't allow status to change from COMPLETED to anything other
2010
                      // than INVALIDATED: prevents overwrites from race conditions
2011
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2012
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2013
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2014
                	  throw new InvalidRequest("4853", "Status state change from " +
2015
                			  listedReplica.getReplicationStatus() + " to " +
2016
                			  replicaStatus.toString() + "is prohibited for identifier " +
2017
                			  pid.getValue() + " and target node " + 
2018
                			  listedReplica.getReplicaMemberNode().getValue());
2019

    
2020
            	  }
2021
                  replicas.remove(index);
2022
                  break;
2023
                  
2024
              }
2025
              index++;
2026
          }
2027
          
2028
          // add the new replica item
2029
          replicas.add(replica);
2030
          systemMetadata.setReplicaList(replicas);
2031
          
2032
          // update the metadata
2033
          try {
2034
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2035
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2036
              // update the modified date for changes to the replica list
2037
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2038
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2039
              
2040
              // inform replica nodes of the change if the status is complete
2041
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2042
            	  notifyReplicaNodes(systemMetadata);
2043
            	  
2044
              }
2045
          } catch (RuntimeException e) {
2046
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2047
              throw new ServiceFailure("4852", e.getMessage());
2048
          
2049
          }
2050
          
2051
      } catch (RuntimeException e) {
2052
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2053
          throw new ServiceFailure("4852", e.getMessage());
2054
      
2055
      } finally {
2056
          lock.unlock();
2057
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2058
          
2059
      }
2060
    
2061
      return true;
2062
      
2063
  }
2064
  
2065
  /**
2066
   * 
2067
   */
2068
  @Override
2069
  public ObjectList listObjects(Session session, Date startTime, 
2070
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2071
      Integer start, Integer count)
2072
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2073
      ServiceFailure {
2074

    
2075
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2076
  }
2077

    
2078
  
2079
 	/**
2080
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2081
 	 * @return cal  the list of checksum algorithms
2082
 	 * 
2083
 	 * @throws ServiceFailure
2084
 	 * @throws NotImplemented
2085
 	 */
2086
  @Override
2087
  public ChecksumAlgorithmList listChecksumAlgorithms()
2088
			throws ServiceFailure, NotImplemented {
2089
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2090
		cal.addAlgorithm("MD5");
2091
		cal.addAlgorithm("SHA-1");
2092
		return cal;
2093
		
2094
	}
2095

    
2096
  /**
2097
   * Notify replica Member Nodes of system metadata changes for a given pid
2098
   * 
2099
   * @param currentSystemMetadata - the up to date system metadata
2100
   */
2101
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2102
      
2103
      Session session = null;
2104
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2105
      //MNode mn = null;
2106
      NodeReference replicaNodeRef = null;
2107
      CNode cn = null;
2108
      NodeType nodeType = null;
2109
      List<Node> nodeList = null;
2110
      
2111
      try {
2112
          cn = D1Client.getCN();
2113
          nodeList = cn.listNodes().getNodeList();
2114
          
2115
      } catch (Exception e) { // handle BaseException and other I/O issues
2116
          
2117
          // swallow errors since the call is not critical
2118
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2119
              "to communication issues with the CN: " + e.getMessage());
2120
          
2121
      }
2122
      
2123
      if ( replicaList != null ) {
2124
          
2125
          // iterate through the replicas and inform  MN replica nodes
2126
          for (Replica replica : replicaList) {
2127
              String replicationVersion = null;
2128
              replicaNodeRef = replica.getReplicaMemberNode();
2129
              try {
2130
                  if (nodeList != null) {
2131
                      // find the node type
2132
                      for (Node node : nodeList) {
2133
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2134
                              nodeType = node.getType();
2135
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2136
                              replicationVersion = checker.getVersion("MNRead");
2137
                              break;
2138
              
2139
                          }
2140
                      }
2141
                  }
2142
              
2143
                  // notify only MNs
2144
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2145
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2146
                          //connect to a v2 mn
2147
                          MNode mn = D1Client.getMN(replicaNodeRef);
2148
                          mn.systemMetadataChanged(session, 
2149
                              currentSystemMetadata.getIdentifier(), 
2150
                              currentSystemMetadata.getSerialVersion().longValue(),
2151
                              currentSystemMetadata.getDateSysMetadataModified());
2152
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2153
                          //connect to a v1 mn
2154
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2155
                          mn.systemMetadataChanged(session, 
2156
                                  currentSystemMetadata.getIdentifier(), 
2157
                                  currentSystemMetadata.getSerialVersion().longValue(),
2158
                                  currentSystemMetadata.getDateSysMetadataModified());
2159
                      }
2160
                      
2161
                  }
2162
              
2163
              } catch (Exception e) { // handle BaseException and other I/O issues
2164
              
2165
                  // swallow errors since the call is not critical
2166
                  logMetacat.error("Can't inform "
2167
                          + replicaNodeRef.getValue()
2168
                          + " of system metadata changes due "
2169
                          + "to communication issues with the CN: "
2170
                          + e.getMessage());
2171
              
2172
              }
2173
          }
2174
      }
2175
  }
2176
  
2177
  /**
2178
   * Update the system metadata of the specified pid.
2179
   */
2180
  @Override
2181
  public boolean updateSystemMetadata(Session session, Identifier pid,
2182
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2183
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2184
   if(sysmeta == null) {
2185
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2186
   }
2187
   if(pid == null || pid.getValue() == null) {
2188
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2189
   }
2190

    
2191
   if (session == null) {
2192
       //TODO: many of the thrown exceptions do not use the correct error codes
2193
       //check these against the docs and correct them
2194
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2195
               "  If you are not logged in, please do so and retry the request.");
2196
   } else {
2197
         //only CN is allwoed
2198
         if(!isCNAdmin(session)) {
2199
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2200
         }
2201
   }
2202

    
2203
    //update the system metadata locally
2204
    boolean success = false;
2205
    try {
2206
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2207
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2208
       
2209
        if(currentSysmeta == null) {
2210
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2211
        }
2212
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2213
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2214
        sysmeta.setSerialVersion(currentSerialVersion);
2215
        List<Replica> replicas = currentSysmeta.getReplicaList();
2216
        sysmeta.setReplicaList(replicas);
2217
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2218
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta);
2219
    } finally {
2220
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2221
    }
2222
    return success;
2223
  }
2224
  
2225
    @Override
2226
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2227
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2228

    
2229
    }
2230

    
2231
	@Override
2232
	public QueryEngineDescription getQueryEngineDescription(Session session,
2233
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2234
			NotImplemented, NotFound {
2235
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2236

    
2237
	}
2238
	
2239
	@Override
2240
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2241
			ServiceFailure, NotAuthorized, NotImplemented {
2242
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2243

    
2244
	}
2245
	
2246
	@Override
2247
	public InputStream query(Session session, String queryEngine, String query)
2248
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2249
			NotImplemented, NotFound {
2250
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2251

    
2252
	}
2253
	
2254
	@Override
2255
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2256
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2257
	}
2258

    
2259
}
(1-1/8)