Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.log4j.Logger;
43
import org.dataone.client.v2.CNode;
44
import org.dataone.client.v2.MNode;
45
import org.dataone.client.v2.itk.D1Client;
46
import org.dataone.service.cn.v2.CNAuthorization;
47
import org.dataone.service.cn.v2.CNCore;
48
import org.dataone.service.cn.v2.CNRead;
49
import org.dataone.service.cn.v2.CNReplication;
50
import org.dataone.service.cn.v2.CNView;
51
import org.dataone.service.exceptions.BaseException;
52
import org.dataone.service.exceptions.IdentifierNotUnique;
53
import org.dataone.service.exceptions.InsufficientResources;
54
import org.dataone.service.exceptions.InvalidRequest;
55
import org.dataone.service.exceptions.InvalidSystemMetadata;
56
import org.dataone.service.exceptions.InvalidToken;
57
import org.dataone.service.exceptions.NotAuthorized;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.NotImplemented;
60
import org.dataone.service.exceptions.ServiceFailure;
61
import org.dataone.service.exceptions.UnsupportedType;
62
import org.dataone.service.exceptions.VersionMismatch;
63
import org.dataone.service.types.v1.AccessPolicy;
64
import org.dataone.service.types.v1.Checksum;
65
import org.dataone.service.types.v1.ChecksumAlgorithmList;
66
import org.dataone.service.types.v1.Event;
67
import org.dataone.service.types.v1.Identifier;
68
import org.dataone.service.types.v1.NodeReference;
69
import org.dataone.service.types.v1.NodeType;
70
import org.dataone.service.types.v1.ObjectFormatIdentifier;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v1_1.QueryEngineDescription;
80
import org.dataone.service.types.v1_1.QueryEngineList;
81
import org.dataone.service.types.v2.Node;
82
import org.dataone.service.types.v2.NodeList;
83
import org.dataone.service.types.v2.ObjectFormat;
84
import org.dataone.service.types.v2.ObjectFormatList;
85
import org.dataone.service.types.v2.SystemMetadata;
86
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
87
import org.dataone.service.util.TypeMarshaller;
88
import org.jibx.runtime.JiBXException;
89

    
90
import edu.ucsb.nceas.metacat.DBUtil;
91
import edu.ucsb.nceas.metacat.EventLog;
92
import edu.ucsb.nceas.metacat.IdentifierManager;
93
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
94
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
95
import edu.ucsb.nceas.metacat.properties.PropertyService;
96
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
97

    
98
/**
99
 * Represents Metacat's implementation of the DataONE Coordinating Node 
100
 * service API. Methods implement the various CN* interfaces, and methods common
101
 * to both Member Node and Coordinating Node interfaces are found in the
102
 * D1NodeService super class.
103
 *
104
 */
105
public class CNodeService extends D1NodeService implements CNAuthorization,
106
    CNCore, CNRead, CNReplication, CNView {
107

    
108
  /* the logger instance */
109
  private Logger logMetacat = null;
110

    
111
  /**
112
   * singleton accessor
113
   */
114
  public static CNodeService getInstance(HttpServletRequest request) { 
115
    return new CNodeService(request);
116
  }
117
  
118
  /**
119
   * Constructor, private for singleton access
120
   */
121
  private CNodeService(HttpServletRequest request) {
122
    super(request);
123
    logMetacat = Logger.getLogger(CNodeService.class);
124
        
125
  }
126
    
127
  /**
128
   * Set the replication policy for an object given the object identifier
129
   * 
130
   * @param session - the Session object containing the credentials for the Subject
131
   * @param pid - the object identifier for the given object
132
   * @param policy - the replication policy to be applied
133
   * 
134
   * @return true or false
135
   * 
136
   * @throws NotImplemented
137
   * @throws NotAuthorized
138
   * @throws ServiceFailure
139
   * @throws InvalidRequest
140
   * @throws VersionMismatch
141
   * 
142
   */
143
  @Override
144
  public boolean setReplicationPolicy(Session session, Identifier pid,
145
      ReplicationPolicy policy, long serialVersion) 
146
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
147
      InvalidRequest, InvalidToken, VersionMismatch {
148
      
149
      // do we have a valid pid?
150
      if (pid == null || pid.getValue().trim().equals("")) {
151
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
152
          
153
      }
154
      
155
      /*String serviceFailureCode = "4882";
156
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
157
      if(sid != null) {
158
          pid = sid;
159
      }*/
160
      // The lock to be used for this identifier
161
      Lock lock = null;
162
      
163
      // get the subject
164
      Subject subject = session.getSubject();
165
      
166
      // are we allowed to do this?
167
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
168
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
169
                  + " not allowed by " + subject.getValue() + " on "
170
                  + pid.getValue());
171
          
172
      }
173
      
174
      SystemMetadata systemMetadata = null;
175
      try {
176
          lock = HazelcastService.getInstance().getLock(pid.getValue());
177
          lock.lock();
178
          logMetacat.debug("Locked identifier " + pid.getValue());
179

    
180
          try {
181
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
182
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
183
                  
184
              }
185
              
186
              // did we get it correctly?
187
              if ( systemMetadata == null ) {
188
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
189
                  
190
              }
191

    
192
              // does the request have the most current system metadata?
193
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
194
                 String msg = "The requested system metadata version number " + 
195
                     serialVersion + " differs from the current version at " +
196
                     systemMetadata.getSerialVersion().longValue() +
197
                     ". Please get the latest copy in order to modify it.";
198
                 throw new VersionMismatch("4886", msg);
199
                 
200
              }
201
              
202
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
203
              throw new NotFound("4884", "No record found for: " + pid.getValue());
204
            
205
          }
206
          
207
          // set the new policy
208
          systemMetadata.setReplicationPolicy(policy);
209
          
210
          // update the metadata
211
          try {
212
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
213
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
214
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
215
              notifyReplicaNodes(systemMetadata);
216
              
217
          } catch (RuntimeException e) {
218
              throw new ServiceFailure("4882", e.getMessage());
219
          
220
          }
221
          
222
      } catch (RuntimeException e) {
223
          throw new ServiceFailure("4882", e.getMessage());
224
          
225
      } finally {
226
          lock.unlock();
227
          logMetacat.debug("Unlocked identifier " + pid.getValue());
228
          
229
      }
230
    
231
      return true;
232
  }
233

    
234
  /**
235
   * Deletes the replica from the given Member Node
236
   * NOTE: MN.delete() may be an "archive" operation. TBD.
237
   * @param session
238
   * @param pid
239
   * @param nodeId
240
   * @param serialVersion
241
   * @return
242
   * @throws InvalidToken
243
   * @throws ServiceFailure
244
   * @throws NotAuthorized
245
   * @throws NotFound
246
   * @throws NotImplemented
247
   * @throws VersionMismatch
248
   */
249
  @Override
250
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
251
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
252
	  
253
	  	// The lock to be used for this identifier
254
		Lock lock = null;
255

    
256
		// get the subject
257
		Subject subject = session.getSubject();
258

    
259
		// are we allowed to do this?
260
		boolean isAuthorized = false;
261
		try {
262
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
263
		} catch (InvalidRequest e) {
264
			throw new ServiceFailure("4882", e.getDescription());
265
		}
266
		if (!isAuthorized) {
267
			throw new NotAuthorized("4881", Permission.WRITE
268
					+ " not allowed by " + subject.getValue() + " on "
269
					+ pid.getValue());
270

    
271
		}
272

    
273
		SystemMetadata systemMetadata = null;
274
		try {
275
			lock = HazelcastService.getInstance().getLock(pid.getValue());
276
			lock.lock();
277
			logMetacat.debug("Locked identifier " + pid.getValue());
278

    
279
			try {
280
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
281
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
282
				}
283

    
284
				// did we get it correctly?
285
				if (systemMetadata == null) {
286
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
287
				}
288

    
289
				// does the request have the most current system metadata?
290
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
291
					String msg = "The requested system metadata version number "
292
							+ serialVersion
293
							+ " differs from the current version at "
294
							+ systemMetadata.getSerialVersion().longValue()
295
							+ ". Please get the latest copy in order to modify it.";
296
					throw new VersionMismatch("4886", msg);
297

    
298
				}
299

    
300
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
301
				throw new NotFound("4884", "No record found for: " + pid.getValue());
302

    
303
			}
304
			  
305
			// check permissions
306
			// TODO: is this necessary?
307
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
308
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
309
			if (!isAllowed) {
310
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
311
			}
312
			  
313
			// delete the replica from the given node
314
			// CSJ: use CN.delete() to truly delete a replica, semantically
315
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
316
			//D1Client.getMN(nodeId).delete(session, pid);
317
			  
318
			// reflect that change in the system metadata
319
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
320
			for (Replica r: systemMetadata.getReplicaList()) {
321
				  if (r.getReplicaMemberNode().equals(nodeId)) {
322
					  updatedReplicas.remove(r);
323
					  break;
324
				  }
325
			}
326
			systemMetadata.setReplicaList(updatedReplicas);
327

    
328
			// update the metadata
329
			try {
330
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
331
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
332
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
333
			} catch (RuntimeException e) {
334
				throw new ServiceFailure("4882", e.getMessage());
335
			}
336

    
337
		} catch (RuntimeException e) {
338
			throw new ServiceFailure("4882", e.getMessage());
339
		} finally {
340
			lock.unlock();
341
			logMetacat.debug("Unlocked identifier " + pid.getValue());
342
		}
343

    
344
		return true;	  
345
	  
346
  }
347
  
348
  /**
349
   * Deletes an object from the Coordinating Node
350
   * 
351
   * @param session - the Session object containing the credentials for the Subject
352
   * @param pid - The object identifier to be deleted
353
   * 
354
   * @return pid - the identifier of the object used for the deletion
355
   * 
356
   * @throws InvalidToken
357
   * @throws ServiceFailure
358
   * @throws NotAuthorized
359
   * @throws NotFound
360
   * @throws NotImplemented
361
   * @throws InvalidRequest
362
   */
363
  @Override
364
  public Identifier delete(Session session, Identifier pid) 
365
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
366
      
367
      String localId = null;      // The corresponding docid for this pid
368
	  Lock lock = null;           // The lock to be used for this identifier
369
      CNode cn = null;            // a reference to the CN to get the node list    
370
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
371
      List<Node> nodeList = null; // the list of nodes in this CN environment
372

    
373
      // check for a valid session
374
      if (session == null) {
375
        	throw new InvalidToken("4963", "No session has been provided");
376
        	
377
      }
378

    
379
      // do we have a valid pid?
380
      if (pid == null || pid.getValue().trim().equals("")) {
381
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
382
          
383
      }
384
      
385
      String serviceFailureCode = "4962";
386
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
387
      if(sid != null) {
388
          pid = sid;
389
      }
390

    
391
	  // check that it is CN/admin
392
	  boolean allowed = isAdminAuthorized(session);
393
	  
394
	  // additional check if it is the authoritative node if it is not the admin
395
      if(!allowed) {
396
          allowed = isAuthoritativeMNodeAdmin(session, pid);
397
          
398
      }
399
	  
400
	  if (!allowed) {
401
		  String msg = "The subject " + session.getSubject().getValue() + 
402
			  " is not allowed to call delete() on a Coordinating Node.";
403
		  logMetacat.info(msg);
404
		  throw new NotAuthorized("4960", msg);
405
		  
406
	  }
407
	  
408
	  // Don't defer to superclass implementation without a locally registered identifier
409
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
410
      // Check for the existing identifier
411
      try {
412
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
413
          super.delete(session, pid);
414
          
415
      } catch (McdbDocNotFoundException e) {
416
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
417
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
418
    	  
419
          try {
420
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
421
  			  lock.lock();
422
  			  logMetacat.debug("Locked identifier " + pid.getValue());
423

    
424
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
425
			  if ( sysMeta != null ) {
426
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
427
				sysMeta.setArchived(true);
428
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
429
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
430
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
431
	            //since this is cn, we don't need worry about the mn solr index.
432
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
433
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
434
	            String username = session.getSubject().getValue();//just for logging purpose
435
                //since data objects were not registered in the identifier table, we use pid as the docid
436
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
437
				
438
			  } else {
439
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
440
					  ". Couldn't obtain the system metadata record.");
441
				  
442
			  }
443
			  
444
		  } catch (RuntimeException re) {
445
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
446
				  ". The error message was: " + re.getMessage());
447
			  
448
		  } finally {
449
			  lock.unlock();
450
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
451

    
452
		  }
453

    
454
          // NOTE: cannot log the delete without localId
455
//          EventLog.getInstance().log(request.getRemoteAddr(), 
456
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
457
//                  pid.getValue(), Event.DELETE.xmlValue());
458

    
459
      } catch (SQLException e) {
460
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
461
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
462
      }
463

    
464
      // get the node list
465
      try {
466
          cn = D1Client.getCN();
467
          nodeList = cn.listNodes().getNodeList();
468
          
469
      } catch (Exception e) { // handle BaseException and other I/O issues
470
          
471
          // swallow errors since the call is not critical
472
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
473
              " due to communication issues with the CN: " + e.getMessage());
474
          
475
      }
476

    
477
	  // notify the replicas
478
	  if (systemMetadata.getReplicaList() != null) {
479
		  for (Replica replica: systemMetadata.getReplicaList()) {
480
			  NodeReference replicaNode = replica.getReplicaMemberNode();
481
			  try {
482
                  if (nodeList != null) {
483
                      // find the node type
484
                      for (Node node : nodeList) {
485
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
486
                              nodeType = node.getType();
487
                              break;
488
              
489
                          }
490
                      }
491
                  }
492
                  
493
                  // only send call MN.delete() to avoid an infinite loop with the CN
494
                  if (nodeType != null && nodeType == NodeType.MN) {
495
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
496
                  }
497
                  
498
			  } catch (Exception e) {
499
				  // all we can really do is log errors and carry on with life
500
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
501
					  " from replica MN: " + replicaNode.getValue(), e);
502
			}
503
			  
504
		  }
505
	  }
506
	  
507
	  return pid;
508
      
509
  }
510
  
511
  /**
512
   * Archives an object from the Coordinating Node
513
   * 
514
   * @param session - the Session object containing the credentials for the Subject
515
   * @param pid - The object identifier to be deleted
516
   * 
517
   * @return pid - the identifier of the object used for the deletion
518
   * 
519
   * @throws InvalidToken
520
   * @throws ServiceFailure
521
   * @throws NotAuthorized
522
   * @throws NotFound
523
   * @throws NotImplemented
524
   * @throws InvalidRequest
525
   */
526
  @Override
527
  public Identifier archive(Session session, Identifier pid) 
528
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
529

    
530
      String localId = null; // The corresponding docid for this pid
531
	  Lock lock = null;      // The lock to be used for this identifier
532
      CNode cn = null;            // a reference to the CN to get the node list    
533
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
534
      List<Node> nodeList = null; // the list of nodes in this CN environment
535
      
536

    
537
      // check for a valid session
538
      if (session == null) {
539
        	throw new InvalidToken("4973", "No session has been provided");
540
        	
541
      }
542

    
543
      // do we have a valid pid?
544
      if (pid == null || pid.getValue().trim().equals("")) {
545
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
546
          
547
      }
548

    
549
	  // check that it is CN/admin
550
	  boolean allowed = isAdminAuthorized(session);
551
	  
552
	  String serviceFailureCode = "4972";
553
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
554
	  if(sid != null) {
555
	        pid = sid;
556
	  }
557
	  
558
	  //check if it is the authoritative member node
559
	  if(!allowed) {
560
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
561
	  }
562
	  
563
	  if (!allowed) {
564
		  String msg = "The subject " + session.getSubject().getValue() + 
565
				  " is not allowed to call archive() on a Coordinating Node.";
566
		  logMetacat.info(msg);
567
		  throw new NotAuthorized("4970", msg);
568
	  }
569
	  
570
      // Check for the existing identifier
571
      try {
572
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
573
          super.archive(session, pid);
574
          
575
      } catch (McdbDocNotFoundException e) {
576
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
577
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
578
    	  
579
          try {
580
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
581
  			  lock.lock();
582
  			  logMetacat.debug("Locked identifier " + pid.getValue());
583

    
584
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
585
			  if ( sysMeta != null ) {
586
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
587
				sysMeta.setArchived(true);
588
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
589
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
590
			    // notify the replicas
591
				notifyReplicaNodes(sysMeta);
592
				  
593
			  } else {
594
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
595
					  ". Couldn't obtain the system metadata record.");
596
				  
597
			  }
598
			  
599
		  } catch (RuntimeException re) {
600
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
601
				  ". The error message was: " + re.getMessage());
602
			  
603
		  } finally {
604
			  lock.unlock();
605
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
606

    
607
		  }
608

    
609
          // NOTE: cannot log the archive without localId
610
//          EventLog.getInstance().log(request.getRemoteAddr(), 
611
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
612
//                  pid.getValue(), Event.DELETE.xmlValue());
613

    
614
      } catch (SQLException e) {
615
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
616
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
617
      }
618

    
619
	  return pid;
620
      
621
  }
622
  
623
  /**
624
   * Set the obsoletedBy attribute in System Metadata
625
   * @param session
626
   * @param pid
627
   * @param obsoletedByPid
628
   * @param serialVersion
629
   * @return
630
   * @throws NotImplemented
631
   * @throws NotFound
632
   * @throws NotAuthorized
633
   * @throws ServiceFailure
634
   * @throws InvalidRequest
635
   * @throws InvalidToken
636
   * @throws VersionMismatch
637
   */
638
  @Override
639
  public boolean setObsoletedBy(Session session, Identifier pid,
640
			Identifier obsoletedByPid, long serialVersion)
641
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
642
			InvalidRequest, InvalidToken, VersionMismatch {
643
      
644
      // do we have a valid pid?
645
      if (pid == null || pid.getValue().trim().equals("")) {
646
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
647
          
648
      }
649
      
650
      /*String serviceFailureCode = "4941";
651
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
652
      if(sid != null) {
653
          pid = sid;
654
      }*/
655
      
656
      // do we have a valid pid?
657
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
658
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
659
          
660
      }
661
      
662
      try {
663
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
664
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
665
          }
666
      } catch (SQLException ee) {
667
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
668
      }
669
      
670

    
671
		// The lock to be used for this identifier
672
		Lock lock = null;
673

    
674
		// get the subject
675
		Subject subject = session.getSubject();
676

    
677
		// are we allowed to do this?
678
		if (!isAuthorized(session, pid, Permission.WRITE)) {
679
			throw new NotAuthorized("4881", Permission.WRITE
680
					+ " not allowed by " + subject.getValue() + " on "
681
					+ pid.getValue());
682

    
683
		}
684

    
685

    
686
		SystemMetadata systemMetadata = null;
687
		try {
688
			lock = HazelcastService.getInstance().getLock(pid.getValue());
689
			lock.lock();
690
			logMetacat.debug("Locked identifier " + pid.getValue());
691

    
692
			try {
693
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
694
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
695
				}
696

    
697
				// did we get it correctly?
698
				if (systemMetadata == null) {
699
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
700
				}
701

    
702
				// does the request have the most current system metadata?
703
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
704
					String msg = "The requested system metadata version number "
705
							+ serialVersion
706
							+ " differs from the current version at "
707
							+ systemMetadata.getSerialVersion().longValue()
708
							+ ". Please get the latest copy in order to modify it.";
709
					throw new VersionMismatch("4886", msg);
710

    
711
				}
712

    
713
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
714
				throw new NotFound("4884", "No record found for: " + pid.getValue());
715

    
716
			}
717

    
718
			// set the new policy
719
			systemMetadata.setObsoletedBy(obsoletedByPid);
720

    
721
			// update the metadata
722
			try {
723
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
724
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
725
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
726
			} catch (RuntimeException e) {
727
				throw new ServiceFailure("4882", e.getMessage());
728
			}
729

    
730
		} catch (RuntimeException e) {
731
			throw new ServiceFailure("4882", e.getMessage());
732
		} finally {
733
			lock.unlock();
734
			logMetacat.debug("Unlocked identifier " + pid.getValue());
735
		}
736

    
737
		return true;
738
	}
739
  
740
  
741
  /**
742
   * Set the replication status for an object given the object identifier
743
   * 
744
   * @param session - the Session object containing the credentials for the Subject
745
   * @param pid - the object identifier for the given object
746
   * @param status - the replication status to be applied
747
   * 
748
   * @return true or false
749
   * 
750
   * @throws NotImplemented
751
   * @throws NotAuthorized
752
   * @throws ServiceFailure
753
   * @throws InvalidRequest
754
   * @throws InvalidToken
755
   * @throws NotFound
756
   * 
757
   */
758
  @Override
759
  public boolean setReplicationStatus(Session session, Identifier pid,
760
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
761
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
762
      InvalidRequest, NotFound {
763
	  
764
	  // cannot be called by public
765
	  if (session == null) {
766
		  throw new NotAuthorized("4720", "Session cannot be null");
767
	  }
768
	  
769
	// do we have a valid pid?
770
      if (pid == null || pid.getValue().trim().equals("")) {
771
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
772
          
773
      }
774
      
775
      /*String serviceFailureCode = "4700";
776
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
777
      if(sid != null) {
778
          pid = sid;
779
      }*/
780
      
781
      // The lock to be used for this identifier
782
      Lock lock = null;
783
      
784
      boolean allowed = false;
785
      int replicaEntryIndex = -1;
786
      List<Replica> replicas = null;
787
      // get the subject
788
      Subject subject = session.getSubject();
789
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
790
          " is " + status.toString());
791
      
792
      SystemMetadata systemMetadata = null;
793

    
794
      try {
795
          lock = HazelcastService.getInstance().getLock(pid.getValue());
796
          lock.lock();
797
          logMetacat.debug("Locked identifier " + pid.getValue());
798

    
799
          try {      
800
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
801

    
802
              // did we get it correctly?
803
              if ( systemMetadata == null ) {
804
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
805
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
806
                  
807
              }
808
              replicas = systemMetadata.getReplicaList();
809
              int count = 0;
810
              
811
              // was there a failure? log it
812
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
813
                 String msg = "The replication request of the object identified by " + 
814
                     pid.getValue() + " failed.  The error message was " +
815
                     failure.getMessage() + ".";
816
              }
817
              
818
              if (replicas.size() > 0 && replicas != null) {
819
                  // find the target replica index in the replica list
820
                  for (Replica replica : replicas) {
821
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
822
                      String targetNodeStr = targetNode.getValue();
823
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
824
                  
825
                      if (replicaNodeStr.equals(targetNodeStr)) {
826
                          replicaEntryIndex = count;
827
                          logMetacat.debug("replica entry index is: "
828
                                  + replicaEntryIndex);
829
                          break;
830
                      }
831
                      count++;
832
                  
833
                  }
834
              }
835
              // are we allowed to do this? only CNs and target MNs are allowed
836
              CNode cn = D1Client.getCN();
837
              List<Node> nodes = cn.listNodes().getNodeList();
838
              
839
              // find the node in the node list
840
              for ( Node node : nodes ) {
841
                  
842
                  NodeReference nodeReference = node.getIdentifier();
843
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
844
                      nodeReference.getValue());
845
                  
846
                  // allow target MN certs
847
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
848
                      node.getType().equals(NodeType.MN)) {
849
                      List<Subject> nodeSubjects = node.getSubjectList();
850
                      
851
                      // check if the session subject is in the node subject list
852
                      for (Subject nodeSubject : nodeSubjects) {
853
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
854
                                  nodeSubject.getValue() + " and " + subject.getValue());
855
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
856
                              
857
                              // lastly limit to COMPLETED, INVALIDATED,
858
                              // and FAILED status updates from MNs only
859
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
860
                                   status.equals(ReplicationStatus.INVALIDATED) ||
861
                                   status.equals(ReplicationStatus.FAILED)) {
862
                                  allowed = true;
863
                                  break;
864
                                  
865
                              }                              
866
                          }
867
                      }                 
868
                  }
869
              }
870

    
871
              if ( !allowed ) {
872
                  //check for CN admin access
873
                  allowed = isAuthorized(session, pid, Permission.WRITE);
874
                  
875
              }              
876
              
877
              if ( !allowed ) {
878
                  String msg = "The subject identified by "
879
                          + subject.getValue()
880
                          + " does not have permission to set the replication status for "
881
                          + "the replica identified by "
882
                          + targetNode.getValue() + ".";
883
                  logMetacat.info(msg);
884
                  throw new NotAuthorized("4720", msg);
885
                  
886
              }
887

    
888
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
889
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
890
                " : " + e.getMessage());
891
            
892
          }
893
          
894
          Replica targetReplica = new Replica();
895
          // set the status for the replica
896
          if ( replicaEntryIndex != -1 ) {
897
              targetReplica = replicas.get(replicaEntryIndex);
898
              
899
              // don't allow status to change from COMPLETED to anything other
900
              // than INVALIDATED: prevents overwrites from race conditions
901
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
902
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
903
            	  throw new InvalidRequest("4730", "Status state change from " +
904
            			  targetReplica.getReplicationStatus() + " to " +
905
            			  status.toString() + "is prohibited for identifier " +
906
            			  pid.getValue() + " and target node " + 
907
            			  targetReplica.getReplicaMemberNode().getValue());
908
              }
909
              
910
              targetReplica.setReplicationStatus(status);
911
              
912
              logMetacat.debug("Set the replication status for " + 
913
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
914
                  targetReplica.getReplicationStatus() + " for identifier " +
915
                  pid.getValue());
916
              
917
          } else {
918
              // this is a new entry, create it
919
              targetReplica.setReplicaMemberNode(targetNode);
920
              targetReplica.setReplicationStatus(status);
921
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
922
              replicas.add(targetReplica);
923
              
924
          }
925
          
926
          systemMetadata.setReplicaList(replicas);
927
                
928
          // update the metadata
929
          try {
930
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
931
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
932
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
933

    
934
              if ( !status.equals(ReplicationStatus.QUEUED) && 
935
            	   !status.equals(ReplicationStatus.REQUESTED)) {
936
                  
937
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
938
                          "\tNODE:\t" + targetNode.getValue() + 
939
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
940
                
941
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
942
                          "\tPID:\t"  + pid.getValue() + 
943
                          "\tNODE:\t" + targetNode.getValue() + 
944
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
945
              }
946

    
947
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
948
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
949
                      " on target node " + targetNode + ". The exception was: " +
950
                      failure.getMessage());
951
              }
952
              
953
			  // update the replica nodes about the completed replica when complete
954
              if (status.equals(ReplicationStatus.COMPLETED)) {
955
				notifyReplicaNodes(systemMetadata);
956
			}
957
          
958
          } catch (RuntimeException e) {
959
              throw new ServiceFailure("4700", e.getMessage());
960
          
961
          }
962
          
963
    } catch (RuntimeException e) {
964
        String msg = "There was a RuntimeException getting the lock for " +
965
            pid.getValue();
966
        logMetacat.info(msg);
967
        
968
    } finally {
969
        lock.unlock();
970
        logMetacat.debug("Unlocked identifier " + pid.getValue());
971
        
972
    }
973
      
974
      return true;
975
  }
976
  
977
/**
978
   * Return the checksum of the object given the identifier 
979
   * 
980
   * @param session - the Session object containing the credentials for the Subject
981
   * @param pid - the object identifier for the given object
982
   * 
983
   * @return checksum - the checksum of the object
984
   * 
985
   * @throws InvalidToken
986
   * @throws ServiceFailure
987
   * @throws NotAuthorized
988
   * @throws NotFound
989
   * @throws NotImplemented
990
   */
991
  @Override
992
  public Checksum getChecksum(Session session, Identifier pid)
993
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
994
    NotImplemented {
995
    
996
	boolean isAuthorized = false;
997
	try {
998
		isAuthorized = isAuthorized(session, pid, Permission.READ);
999
	} catch (InvalidRequest e) {
1000
		throw new ServiceFailure("1410", e.getDescription());
1001
	}  
1002
    if (!isAuthorized) {
1003
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1004
    }
1005
    
1006
    SystemMetadata systemMetadata = null;
1007
    Checksum checksum = null;
1008
    
1009
    try {
1010
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1011

    
1012
        if (systemMetadata == null ) {
1013
            String error ="";
1014
            String localId = null;
1015
            try {
1016
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1017
              
1018
             } catch (Exception e) {
1019
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1020
            }
1021
            
1022
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1023
                error = DELETEDMESSAGE;
1024
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1025
                error = DELETEDMESSAGE;
1026
            }
1027
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1028
        }
1029
        checksum = systemMetadata.getChecksum();
1030
        
1031
    } catch (RuntimeException e) {
1032
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1033
            pid.getValue() + ". The error message was: " + e.getMessage());
1034
      
1035
    }
1036
    
1037
    return checksum;
1038
  }
1039

    
1040
  /**
1041
   * Resolve the location of a given object
1042
   * 
1043
   * @param session - the Session object containing the credentials for the Subject
1044
   * @param pid - the object identifier for the given object
1045
   * 
1046
   * @return objectLocationList - the list of nodes known to contain the object
1047
   * 
1048
   * @throws InvalidToken
1049
   * @throws ServiceFailure
1050
   * @throws NotAuthorized
1051
   * @throws NotFound
1052
   * @throws NotImplemented
1053
   */
1054
  @Override
1055
  public ObjectLocationList resolve(Session session, Identifier pid)
1056
    throws InvalidToken, ServiceFailure, NotAuthorized,
1057
    NotFound, NotImplemented {
1058

    
1059
    throw new NotImplemented("4131", "resolve not implemented");
1060

    
1061
  }
1062

    
1063
  /**
1064
   * Metacat does not implement this method at the CN level
1065
   */
1066
  @Override
1067
  public ObjectList search(Session session, String queryType, String query)
1068
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1069
    NotImplemented {
1070

    
1071
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1072
	  
1073
//    ObjectList objectList = null;
1074
//    try {
1075
//        objectList = 
1076
//          IdentifierManager.getInstance().querySystemMetadata(
1077
//              null, //startTime, 
1078
//              null, //endTime,
1079
//              null, //objectFormat, 
1080
//              false, //replicaStatus, 
1081
//              0, //start, 
1082
//              1000 //count
1083
//              );
1084
//        
1085
//    } catch (Exception e) {
1086
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1087
//    }
1088
//
1089
//      return objectList;
1090
		  
1091
  }
1092
  
1093
  /**
1094
   * Returns the object format registered in the DataONE Object Format 
1095
   * Vocabulary for the given format identifier
1096
   * 
1097
   * @param fmtid - the identifier of the format requested
1098
   * 
1099
   * @return objectFormat - the object format requested
1100
   * 
1101
   * @throws ServiceFailure
1102
   * @throws NotFound
1103
   * @throws InsufficientResources
1104
   * @throws NotImplemented
1105
   */
1106
  @Override
1107
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1108
    throws ServiceFailure, NotFound, NotImplemented {
1109
     
1110
      return ObjectFormatService.getInstance().getFormat(fmtid);
1111
      
1112
  }
1113

    
1114
    @Override
1115
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1116
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1117

    
1118
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1119
                "format ID: " + format.getFormatId() + "\n" + 
1120
                "format name: " + format.getFormatName() + "\n" + 
1121
                "format type: " + format.getFormatType() );
1122
        
1123
        // FIXME remove:
1124
        if (true)
1125
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1126
        
1127
        if (!isAdminAuthorized(session))
1128
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1129

    
1130
        String separator = ".";
1131
        try {
1132
            separator = PropertyService.getProperty("document.accNumSeparator");
1133
        } catch (PropertyNotFoundException e) {
1134
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1135
        }
1136

    
1137
        // find pids of last and next ObjectFormatList
1138
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1139
        int lastRev = -1;
1140
        try {
1141
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1142
        } catch (SQLException e) {
1143
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1144
        }
1145
        int nextRev = lastRev + 1;
1146
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1147
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1148
        
1149
        Identifier lastPid = new Identifier();
1150
        lastPid.setValue(lastDocID);
1151
        Identifier nextPid = new Identifier();
1152
        nextPid.setValue(nextDocID);
1153
        
1154
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1155
                + "Next ObjectFormatList document ID: " + nextDocID);
1156
        
1157
        // add new format to the current ObjectFormatList
1158
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1159
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1160
        innerList.add(format);
1161

    
1162
        // get existing (last) sysmeta and make a copy
1163
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1164
        SystemMetadata nextSysmeta = new SystemMetadata();
1165
        try {
1166
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1167
        } catch (IllegalAccessException | InvocationTargetException e) {
1168
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1169
        }
1170
        
1171
        // create the new object format list, and update the old sysmeta with obsoletedBy
1172
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1173
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1174
        
1175
        // TODO add to ObjectFormatService local cache?
1176
        
1177
        return formatId;
1178
    }
1179

    
1180
    /**
1181
     * Creates the object for the next / updated version of the ObjectFormatList.
1182
     * 
1183
     * @param session
1184
     * @param lastPid
1185
     * @param nextPid
1186
     * @param objectFormatList
1187
     * @param lastSysmeta
1188
     */
1189
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1190
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1191
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1192
        
1193
        PipedInputStream is = new PipedInputStream();
1194
        PipedOutputStream os = null;
1195
        
1196
        try {
1197
            os = new PipedOutputStream(is);
1198
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1199
        } catch (JiBXException | IOException e) {
1200
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1201
        } finally {
1202
            try {
1203
                os.flush();
1204
                os.close();
1205
            } catch (IOException ioe) {
1206
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1207
            }
1208
        }
1209
        
1210
        BigInteger docSize = lastSysmeta.getSize();
1211
        try {
1212
            docSize = BigInteger.valueOf(is.available());
1213
        } catch (IOException e) {
1214
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1215
        }
1216
        
1217
        lastSysmeta.setIdentifier(nextPid);
1218
        lastSysmeta.setObsoletes(lastPid);
1219
        lastSysmeta.setSize(docSize); 
1220
        lastSysmeta.setSubmitter(session.getSubject());
1221
        lastSysmeta.setDateUploaded(new Date());
1222
        
1223
        // create new object format list
1224
        try {
1225
            create(session, nextPid, is, lastSysmeta);
1226
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1227
                | InvalidSystemMetadata | InvalidRequest e) {
1228
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1229
        }
1230
    }
1231
  
1232
    /**
1233
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1234
     * by setting the obsoletedBy value to the pid of the new version of the 
1235
     * ObjectFormatList.
1236
     * 
1237
     * @param session
1238
     * @param lastPid
1239
     * @param obsoletedByPid
1240
     * @param lastSysmeta
1241
     * @throws ServiceFailure
1242
     */
1243
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1244
            throws ServiceFailure {
1245
        
1246
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1247
        
1248
        try {
1249
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1250
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1251
                | InvalidSystemMetadata | InvalidToken e) {
1252
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1253
        }
1254
    }
1255
  /**
1256
   * Returns a list of all object formats registered in the DataONE Object 
1257
   * Format Vocabulary
1258
    * 
1259
   * @return objectFormatList - The list of object formats registered in 
1260
   *                            the DataONE Object Format Vocabulary
1261
   * 
1262
   * @throws ServiceFailure
1263
   * @throws NotImplemented
1264
   * @throws InsufficientResources
1265
   */
1266
  @Override
1267
  public ObjectFormatList listFormats() 
1268
    throws ServiceFailure, NotImplemented {
1269

    
1270
    return ObjectFormatService.getInstance().listFormats();
1271
  }
1272

    
1273
  /**
1274
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1275
    * 
1276
   * @return nodeList - List of nodes from the registry
1277
   * 
1278
   * @throws ServiceFailure
1279
   * @throws NotImplemented
1280
   */
1281
  @Override
1282
  public NodeList listNodes() 
1283
    throws NotImplemented, ServiceFailure {
1284

    
1285
    throw new NotImplemented("4800", "listNodes not implemented");
1286
  }
1287

    
1288
  /**
1289
   * Provides a mechanism for adding system metadata independently of its 
1290
   * associated object, such as when adding system metadata for data objects.
1291
    * 
1292
   * @param session - the Session object containing the credentials for the Subject
1293
   * @param pid - The identifier of the object to register the system metadata against
1294
   * @param sysmeta - The system metadata to be registered
1295
   * 
1296
   * @return true if the registration succeeds
1297
   * 
1298
   * @throws NotImplemented
1299
   * @throws NotAuthorized
1300
   * @throws ServiceFailure
1301
   * @throws InvalidRequest
1302
   * @throws InvalidSystemMetadata
1303
   */
1304
  @Override
1305
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1306
      SystemMetadata sysmeta) 
1307
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1308
      InvalidSystemMetadata {
1309

    
1310
      // The lock to be used for this identifier
1311
      Lock lock = null;
1312

    
1313
      // TODO: control who can call this?
1314
      if (session == null) {
1315
          //TODO: many of the thrown exceptions do not use the correct error codes
1316
          //check these against the docs and correct them
1317
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1318
                  "  If you are not logged in, please do so and retry the request.");
1319
      }
1320
      // the identifier can't be an SID
1321
      try {
1322
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1323
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1324
          }
1325
      } catch (SQLException sqle) {
1326
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1327
      }
1328
      
1329
      // verify that guid == SystemMetadata.getIdentifier()
1330
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1331
          "|" + sysmeta.getIdentifier().getValue());
1332
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1333
          throw new InvalidRequest("4863", 
1334
              "The identifier in method call (" + pid.getValue() + 
1335
              ") does not match identifier in system metadata (" +
1336
              sysmeta.getIdentifier().getValue() + ").");
1337
      }
1338
      
1339
      //check if the sid is legitimate in the system metadata
1340
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1341
      Identifier sid = sysmeta.getSeriesId();
1342
      if(sid != null) {
1343
          if (!isValidIdentifier(sid)) {
1344
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1345
          }
1346
      }
1347

    
1348
      try {
1349
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1350
          lock.lock();
1351
          logMetacat.debug("Locked identifier " + pid.getValue());
1352
          logMetacat.debug("Checking if identifier exists...");
1353
          // Check that the identifier does not already exist
1354
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1355
              throw new InvalidRequest("4863", 
1356
                  "The identifier is already in use by an existing object.");
1357
          
1358
          }
1359
          
1360
          // insert the system metadata into the object store
1361
          logMetacat.debug("Starting to insert SystemMetadata...");
1362
          try {
1363
              sysmeta.setSerialVersion(BigInteger.ONE);
1364
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1365
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1366
              
1367
          } catch (RuntimeException e) {
1368
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1369
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1370
                  e.getClass() + ": " + e.getMessage());
1371
              
1372
          }
1373
          
1374
      } catch (RuntimeException e) {
1375
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1376
                  e.getClass() + ": " + e.getMessage());
1377
          
1378
      }  finally {
1379
          lock.unlock();
1380
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1381
          
1382
      }
1383

    
1384
      
1385
      logMetacat.debug("Returning from registerSystemMetadata");
1386
      
1387
      try {
1388
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1389
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1390
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1391
    	          localId, "registerSystemMetadata");
1392
      } catch (McdbDocNotFoundException e) {
1393
    	  // do nothing, no localId to log with
1394
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1395
      } catch (SQLException ee) {
1396
          // do nothing, no localId to log with
1397
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1398
      }
1399
      
1400
      
1401
      return pid;
1402
  }
1403
  
1404
  /**
1405
   * Given an optional scope and format, reserves and returns an identifier 
1406
   * within that scope and format that is unique and will not be 
1407
   * used by any other sessions. 
1408
    * 
1409
   * @param session - the Session object containing the credentials for the Subject
1410
   * @param pid - The identifier of the object to register the system metadata against
1411
   * @param scope - An optional string to be used to qualify the scope of 
1412
   *                the identifier namespace, which is applied differently 
1413
   *                depending on the format requested. If scope is not 
1414
   *                supplied, a default scope will be used.
1415
   * @param format - The optional name of the identifier format to be used, 
1416
   *                  drawn from a DataONE-specific vocabulary of identifier 
1417
   *                 format names, including several common syntaxes such 
1418
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1419
   *                 format is not supplied by the caller, the CN service 
1420
   *                 will use a default identifier format, which may change 
1421
   *                 over time.
1422
   * 
1423
   * @return true if the registration succeeds
1424
   * 
1425
   * @throws InvalidToken
1426
   * @throws ServiceFailure
1427
   * @throws NotAuthorized
1428
   * @throws IdentifierNotUnique
1429
   * @throws NotImplemented
1430
   */
1431
  @Override
1432
  public Identifier reserveIdentifier(Session session, Identifier pid)
1433
  throws InvalidToken, ServiceFailure,
1434
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1435

    
1436
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1437
  }
1438
  
1439
  @Override
1440
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1441
  throws InvalidToken, ServiceFailure,
1442
        NotAuthorized, NotImplemented, InvalidRequest {
1443
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1444
  }
1445
  
1446
  /**
1447
    * Checks whether the pid is reserved by the subject in the session param
1448
    * If the reservation is held on the pid by the subject, we return true.
1449
    * 
1450
   * @param session - the Session object containing the Subject
1451
   * @param pid - The identifier to check
1452
   * 
1453
   * @return true if the reservation exists for the subject/pid
1454
   * 
1455
   * @throws InvalidToken
1456
   * @throws ServiceFailure
1457
   * @throws NotFound - when the pid is not found (in use or in reservation)
1458
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1459
   * @throws IdentifierNotUnique - when the pid is in use
1460
   * @throws NotImplemented
1461
   */
1462

    
1463
  @Override
1464
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1465
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1466
      NotImplemented, InvalidRequest {
1467
  
1468
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1469
  }
1470

    
1471
  /**
1472
   * Changes ownership (RightsHolder) of the specified object to the 
1473
   * subject specified by userId
1474
    * 
1475
   * @param session - the Session object containing the credentials for the Subject
1476
   * @param pid - Identifier of the object to be modified
1477
   * @param userId - The subject that will be taking ownership of the specified object.
1478
   *
1479
   * @return pid - the identifier of the modified object
1480
   * 
1481
   * @throws ServiceFailure
1482
   * @throws InvalidToken
1483
   * @throws NotFound
1484
   * @throws NotAuthorized
1485
   * @throws NotImplemented
1486
   * @throws InvalidRequest
1487
   */  
1488
  @Override
1489
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1490
      long serialVersion)
1491
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1492
      NotImplemented, InvalidRequest, VersionMismatch {
1493
      
1494
      // The lock to be used for this identifier
1495
      Lock lock = null;
1496

    
1497
      // get the subject
1498
      Subject subject = session.getSubject();
1499
      
1500
      String serviceFailureCode = "4490";
1501
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1502
      if(sid != null) {
1503
          pid = sid;
1504
      }
1505
      
1506
      // are we allowed to do this?
1507
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1508
          throw new NotAuthorized("4440", "not allowed by "
1509
                  + subject.getValue() + " on " + pid.getValue());
1510
          
1511
      }
1512
      
1513
      SystemMetadata systemMetadata = null;
1514
      try {
1515
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1516
          lock.lock();
1517
          logMetacat.debug("Locked identifier " + pid.getValue());
1518

    
1519
          try {
1520
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1521
              
1522
              // does the request have the most current system metadata?
1523
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1524
                 String msg = "The requested system metadata version number " + 
1525
                     serialVersion + " differs from the current version at " +
1526
                     systemMetadata.getSerialVersion().longValue() +
1527
                     ". Please get the latest copy in order to modify it.";
1528
                 throw new VersionMismatch("4443", msg);
1529
              }
1530
              
1531
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1532
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1533
              
1534
          }
1535
              
1536
          // set the new rights holder
1537
          systemMetadata.setRightsHolder(userId);
1538
          
1539
          // update the metadata
1540
          try {
1541
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1542
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1543
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1544
              notifyReplicaNodes(systemMetadata);
1545
              
1546
          } catch (RuntimeException e) {
1547
              throw new ServiceFailure("4490", e.getMessage());
1548
          
1549
          }
1550
          
1551
      } catch (RuntimeException e) {
1552
          throw new ServiceFailure("4490", e.getMessage());
1553
          
1554
      } finally {
1555
          lock.unlock();
1556
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1557
      
1558
      }
1559
      
1560
      return pid;
1561
  }
1562

    
1563
  /**
1564
   * Verify that a replication task is authorized by comparing the target node's
1565
   * Subject (from the X.509 certificate-derived Session) with the list of 
1566
   * subjects in the known, pending replication tasks map.
1567
   * 
1568
   * @param originatingNodeSession - Session information that contains the 
1569
   *                                 identity of the calling user
1570
   * @param targetNodeSubject - Subject identifying the target node
1571
   * @param pid - the identifier of the object to be replicated
1572
   * @param replicatePermission - the execute permission to be granted
1573
   * 
1574
   * @throws ServiceFailure
1575
   * @throws NotImplemented
1576
   * @throws InvalidToken
1577
   * @throws NotAuthorized
1578
   * @throws InvalidRequest
1579
   * @throws NotFound
1580
   */
1581
  @Override
1582
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1583
    Subject targetNodeSubject, Identifier pid) 
1584
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1585
    NotFound, InvalidRequest {
1586
    
1587
    boolean isAllowed = false;
1588
    SystemMetadata sysmeta = null;
1589
    NodeReference targetNode = null;
1590
    
1591
    try {
1592
      // get the target node reference from the nodes list
1593
      CNode cn = D1Client.getCN();
1594
      List<Node> nodes = cn.listNodes().getNodeList();
1595
      
1596
      if ( nodes != null ) {
1597
        for (Node node : nodes) {
1598
            
1599
        	if (node.getSubjectList() != null) {
1600
        		
1601
	            for (Subject nodeSubject : node.getSubjectList()) {
1602
	            	
1603
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1604
	                    targetNode = node.getIdentifier();
1605
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1606
	                    break;
1607
	                }
1608
	            }
1609
        	}
1610
            
1611
            if ( targetNode != null) { break; }
1612
        }
1613
        
1614
      } else {
1615
          String msg = "Couldn't get the node list from the CN";
1616
          logMetacat.debug(msg);
1617
          throw new ServiceFailure("4872", msg);
1618
          
1619
      }
1620
      
1621
      // can't find a node listed with the given subject
1622
      if ( targetNode == null ) {
1623
          String msg = "There is no Member Node registered with a node subject " +
1624
              "matching " + targetNodeSubject.getValue();
1625
          logMetacat.info(msg);
1626
          throw new NotAuthorized("4871", msg);
1627
          
1628
      }
1629
      
1630
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1631
      
1632
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1633

    
1634
      if ( sysmeta != null ) {
1635
          
1636
          List<Replica> replicaList = sysmeta.getReplicaList();
1637
          
1638
          if ( replicaList != null ) {
1639
              
1640
              // find the replica with the status set to 'requested'
1641
              for (Replica replica : replicaList) {
1642
                  ReplicationStatus status = replica.getReplicationStatus();
1643
                  NodeReference listedNode = replica.getReplicaMemberNode();
1644
                  if ( listedNode != null && targetNode != null ) {
1645
                      logMetacat.debug("Comparing " + listedNode.getValue()
1646
                              + " to " + targetNode.getValue());
1647
                      
1648
                      if (listedNode.getValue().equals(targetNode.getValue())
1649
                              && status.equals(ReplicationStatus.REQUESTED)) {
1650
                          isAllowed = true;
1651
                          break;
1652

    
1653
                      }
1654
                  }
1655
              }
1656
          }
1657
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1658
              "to replicate: " + isAllowed + " for " + pid.getValue());
1659

    
1660
          
1661
      } else {
1662
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1663
          " is null.");
1664
          String error ="";
1665
          String localId = null;
1666
          try {
1667
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1668
            
1669
           } catch (Exception e) {
1670
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1671
          }
1672
          
1673
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1674
              error = DELETEDMESSAGE;
1675
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1676
              error = DELETEDMESSAGE;
1677
          }
1678
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1679
          
1680
      }
1681

    
1682
    } catch (RuntimeException e) {
1683
    	  ServiceFailure sf = new ServiceFailure("4872", 
1684
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1685
                e.getMessage());
1686
    	  sf.initCause(e);
1687
        throw sf;
1688
        
1689
    }
1690
      
1691
    return isAllowed;
1692
    
1693
  }
1694

    
1695
  /**
1696
   * Adds a new object to the Node, where the object is a science metadata object.
1697
   * 
1698
   * @param session - the Session object containing the credentials for the Subject
1699
   * @param pid - The object identifier to be created
1700
   * @param object - the object bytes
1701
   * @param sysmeta - the system metadata that describes the object  
1702
   * 
1703
   * @return pid - the object identifier created
1704
   * 
1705
   * @throws InvalidToken
1706
   * @throws ServiceFailure
1707
   * @throws NotAuthorized
1708
   * @throws IdentifierNotUnique
1709
   * @throws UnsupportedType
1710
   * @throws InsufficientResources
1711
   * @throws InvalidSystemMetadata
1712
   * @throws NotImplemented
1713
   * @throws InvalidRequest
1714
   */
1715
  public Identifier create(Session session, Identifier pid, InputStream object,
1716
    SystemMetadata sysmeta) 
1717
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1718
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1719
    NotImplemented, InvalidRequest {
1720
       
1721
   // verify the pid is valid format
1722
      if (!isValidIdentifier(pid)) {
1723
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1724
      }
1725
      // The lock to be used for this identifier
1726
      Lock lock = null;
1727

    
1728
      try {
1729
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1730
          lock.lock();
1731
          // are we allowed?
1732
          boolean isAllowed = false;
1733
          isAllowed = isAdminAuthorized(session);
1734
          
1735
          // additional check if it is the authoritative node if it is not the admin
1736
          if(!isAllowed) {
1737
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1738
          }
1739

    
1740
          // proceed if we're called by a CN
1741
          if ( isAllowed ) {
1742
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1743
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1744
              Identifier sid = sysmeta.getSeriesId();
1745
              if(sid != null) {
1746
                  if (!isValidIdentifier(sid)) {
1747
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1748
                  }
1749
              }
1750
              // create the coordinating node version of the document      
1751
              logMetacat.debug("Locked identifier " + pid.getValue());
1752
              sysmeta.setSerialVersion(BigInteger.ONE);
1753
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1754
              //sysmeta.setArchived(false); // this is a create op, not update
1755
              
1756
              // the CN should have set the origin and authoritative member node fields
1757
              try {
1758
                  sysmeta.getOriginMemberNode().getValue();
1759
                  sysmeta.getAuthoritativeMemberNode().getValue();
1760
                  
1761
              } catch (NullPointerException npe) {
1762
                  throw new InvalidSystemMetadata("4896", 
1763
                      "Both the origin and authoritative member node identifiers need to be set.");
1764
                  
1765
              }
1766
              pid = super.create(session, pid, object, sysmeta);
1767

    
1768
          } else {
1769
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1770
                  " isn't allowed to call create() on a Coordinating Node.";
1771
              logMetacat.info(msg);
1772
              throw new NotAuthorized("1100", msg);
1773
          }
1774
          
1775
      } catch (RuntimeException e) {
1776
          // Convert Hazelcast runtime exceptions to service failures
1777
          String msg = "There was a problem creating the object identified by " +
1778
              pid.getValue() + ". There error message was: " + e.getMessage();
1779
          throw new ServiceFailure("4893", msg);
1780
          
1781
      } finally {
1782
    	  if (lock != null) {
1783
	          lock.unlock();
1784
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1785
    	  }
1786
      }
1787
      
1788
      return pid;
1789

    
1790
  }
1791

    
1792
  /**
1793
   * Set access for a given object using the object identifier and a Subject
1794
   * under a given Session.
1795
   * 
1796
   * @param session - the Session object containing the credentials for the Subject
1797
   * @param pid - the object identifier for the given object to apply the policy
1798
   * @param policy - the access policy to be applied
1799
   * 
1800
   * @return true if the application of the policy succeeds
1801
   * @throws InvalidToken
1802
   * @throws ServiceFailure
1803
   * @throws NotFound
1804
   * @throws NotAuthorized
1805
   * @throws NotImplemented
1806
   * @throws InvalidRequest
1807
   */
1808
  public boolean setAccessPolicy(Session session, Identifier pid, 
1809
      AccessPolicy accessPolicy, long serialVersion) 
1810
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1811
      NotImplemented, InvalidRequest, VersionMismatch {
1812
      
1813
   // do we have a valid pid?
1814
      if (pid == null || pid.getValue().trim().equals("")) {
1815
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1816
          
1817
      }
1818
      
1819
      String serviceFailureCode = "4430";
1820
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1821
      if(sid != null) {
1822
          pid = sid;
1823
      }
1824
      // The lock to be used for this identifier
1825
      Lock lock = null;
1826
      SystemMetadata systemMetadata = null;
1827
      
1828
      boolean success = false;
1829
      
1830
      // get the subject
1831
      Subject subject = session.getSubject();
1832
      
1833
      // are we allowed to do this?
1834
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1835
          throw new NotAuthorized("4420", "not allowed by "
1836
                  + subject.getValue() + " on " + pid.getValue());
1837
      }
1838
      
1839
      try {
1840
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1841
          lock.lock();
1842
          logMetacat.debug("Locked identifier " + pid.getValue());
1843

    
1844
          try {
1845
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1846

    
1847
              if ( systemMetadata == null ) {
1848
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1849
                  
1850
              }
1851
              // does the request have the most current system metadata?
1852
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1853
                 String msg = "The requested system metadata version number " + 
1854
                     serialVersion + " differs from the current version at " +
1855
                     systemMetadata.getSerialVersion().longValue() +
1856
                     ". Please get the latest copy in order to modify it.";
1857
                 throw new VersionMismatch("4402", msg);
1858
                 
1859
              }
1860
              
1861
          } catch (RuntimeException e) {
1862
              // convert Hazelcast RuntimeException to NotFound
1863
              throw new NotFound("4400", "No record found for: " + pid);
1864
            
1865
          }
1866
              
1867
          // set the access policy
1868
          systemMetadata.setAccessPolicy(accessPolicy);
1869
          
1870
          // update the system metadata
1871
          try {
1872
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1873
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1874
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1875
              notifyReplicaNodes(systemMetadata);
1876
              
1877
          } catch (RuntimeException e) {
1878
              // convert Hazelcast RuntimeException to ServiceFailure
1879
              throw new ServiceFailure("4430", e.getMessage());
1880
            
1881
          }
1882
          
1883
      } catch (RuntimeException e) {
1884
          throw new ServiceFailure("4430", e.getMessage());
1885
          
1886
      } finally {
1887
          lock.unlock();
1888
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1889
        
1890
      }
1891

    
1892
    
1893
    // TODO: how do we know if the map was persisted?
1894
    success = true;
1895
    
1896
    return success;
1897
  }
1898

    
1899
  /**
1900
   * Full replacement of replication metadata in the system metadata for the 
1901
   * specified object, changes date system metadata modified
1902
   * 
1903
   * @param session - the Session object containing the credentials for the Subject
1904
   * @param pid - the object identifier for the given object to apply the policy
1905
   * @param replica - the replica to be updated
1906
   * @return
1907
   * @throws NotImplemented
1908
   * @throws NotAuthorized
1909
   * @throws ServiceFailure
1910
   * @throws InvalidRequest
1911
   * @throws NotFound
1912
   * @throws VersionMismatch
1913
   */
1914
  @Override
1915
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1916
      Replica replica, long serialVersion) 
1917
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1918
      NotFound, VersionMismatch {
1919
      
1920
      // The lock to be used for this identifier
1921
      Lock lock = null;
1922
      
1923
      // get the subject
1924
      Subject subject = session.getSubject();
1925
      
1926
      // are we allowed to do this?
1927
      try {
1928

    
1929
          // what is the controlling permission?
1930
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1931
              throw new NotAuthorized("4851", "not allowed by "
1932
                      + subject.getValue() + " on " + pid.getValue());
1933
          }
1934

    
1935
        
1936
      } catch (InvalidToken e) {
1937
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1938
                  " on " + pid.getValue());  
1939
          
1940
      }
1941

    
1942
      SystemMetadata systemMetadata = null;
1943
      try {
1944
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1945
          lock.lock();
1946
          logMetacat.debug("Locked identifier " + pid.getValue());
1947

    
1948
          try {      
1949
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1950

    
1951
              // does the request have the most current system metadata?
1952
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1953
                 String msg = "The requested system metadata version number " + 
1954
                     serialVersion + " differs from the current version at " +
1955
                     systemMetadata.getSerialVersion().longValue() +
1956
                     ". Please get the latest copy in order to modify it.";
1957
                 throw new VersionMismatch("4855", msg);
1958
              }
1959
              
1960
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1961
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1962
                  " : " + e.getMessage());
1963
            
1964
          }
1965
              
1966
          // set the status for the replica
1967
          List<Replica> replicas = systemMetadata.getReplicaList();
1968
          NodeReference replicaNode = replica.getReplicaMemberNode();
1969
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1970
          int index = 0;
1971
          for (Replica listedReplica: replicas) {
1972
              
1973
              // remove the replica that we are replacing
1974
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1975
                      // don't allow status to change from COMPLETED to anything other
1976
                      // than INVALIDATED: prevents overwrites from race conditions
1977
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1978
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1979
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1980
                	  throw new InvalidRequest("4853", "Status state change from " +
1981
                			  listedReplica.getReplicationStatus() + " to " +
1982
                			  replicaStatus.toString() + "is prohibited for identifier " +
1983
                			  pid.getValue() + " and target node " + 
1984
                			  listedReplica.getReplicaMemberNode().getValue());
1985

    
1986
            	  }
1987
                  replicas.remove(index);
1988
                  break;
1989
                  
1990
              }
1991
              index++;
1992
          }
1993
          
1994
          // add the new replica item
1995
          replicas.add(replica);
1996
          systemMetadata.setReplicaList(replicas);
1997
          
1998
          // update the metadata
1999
          try {
2000
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2001
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2002
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2003
              
2004
              // inform replica nodes of the change if the status is complete
2005
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2006
            	  notifyReplicaNodes(systemMetadata);
2007
            	  
2008
              }
2009
          } catch (RuntimeException e) {
2010
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2011
              throw new ServiceFailure("4852", e.getMessage());
2012
          
2013
          }
2014
          
2015
      } catch (RuntimeException e) {
2016
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2017
          throw new ServiceFailure("4852", e.getMessage());
2018
      
2019
      } finally {
2020
          lock.unlock();
2021
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2022
          
2023
      }
2024
    
2025
      return true;
2026
      
2027
  }
2028
  
2029
  /**
2030
   * 
2031
   */
2032
  @Override
2033
  public ObjectList listObjects(Session session, Date startTime, 
2034
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2035
      Integer start, Integer count)
2036
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2037
      ServiceFailure {
2038
      boolean replicaStatus= true;
2039
      return super.listObjects(session, startTime, endTime, formatid, identifier, replicaStatus, start, count);
2040
  }
2041

    
2042
  
2043
 	/**
2044
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2045
 	 * @return cal  the list of checksum algorithms
2046
 	 * 
2047
 	 * @throws ServiceFailure
2048
 	 * @throws NotImplemented
2049
 	 */
2050
  @Override
2051
  public ChecksumAlgorithmList listChecksumAlgorithms()
2052
			throws ServiceFailure, NotImplemented {
2053
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2054
		cal.addAlgorithm("MD5");
2055
		cal.addAlgorithm("SHA-1");
2056
		return cal;
2057
		
2058
	}
2059

    
2060
  /**
2061
   * Notify replica Member Nodes of system metadata changes for a given pid
2062
   * 
2063
   * @param currentSystemMetadata - the up to date system metadata
2064
   */
2065
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2066
      
2067
      Session session = null;
2068
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2069
      MNode mn = null;
2070
      NodeReference replicaNodeRef = null;
2071
      CNode cn = null;
2072
      NodeType nodeType = null;
2073
      List<Node> nodeList = null;
2074
      
2075
      try {
2076
          cn = D1Client.getCN();
2077
          nodeList = cn.listNodes().getNodeList();
2078
          
2079
      } catch (Exception e) { // handle BaseException and other I/O issues
2080
          
2081
          // swallow errors since the call is not critical
2082
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2083
              "to communication issues with the CN: " + e.getMessage());
2084
          
2085
      }
2086
      
2087
      if ( replicaList != null ) {
2088
          
2089
          // iterate through the replicas and inform  MN replica nodes
2090
          for (Replica replica : replicaList) {
2091
              
2092
              replicaNodeRef = replica.getReplicaMemberNode();
2093
              try {
2094
                  if (nodeList != null) {
2095
                      // find the node type
2096
                      for (Node node : nodeList) {
2097
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2098
                              nodeType = node.getType();
2099
                              break;
2100
              
2101
                          }
2102
                      }
2103
                  }
2104
              
2105
                  // notify only MNs
2106
                  if (nodeType != null && nodeType == NodeType.MN) {
2107
                      mn = D1Client.getMN(replicaNodeRef);
2108
                      mn.systemMetadataChanged(session, 
2109
                          currentSystemMetadata.getIdentifier(), 
2110
                          currentSystemMetadata.getSerialVersion().longValue(),
2111
                          currentSystemMetadata.getDateSysMetadataModified());
2112
                  }
2113
              
2114
              } catch (Exception e) { // handle BaseException and other I/O issues
2115
              
2116
                  // swallow errors since the call is not critical
2117
                  logMetacat.error("Can't inform "
2118
                          + replicaNodeRef.getValue()
2119
                          + " of system metadata changes due "
2120
                          + "to communication issues with the CN: "
2121
                          + e.getMessage());
2122
              
2123
              }
2124
          }
2125
      }
2126
  }
2127
  
2128
  /**
2129
   * Update the system metadata of the specified pid.
2130
   */
2131
  @Override
2132
  public boolean updateSystemMetadata(Session session, Identifier pid,
2133
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2134
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2135
   if(sysmeta == null) {
2136
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2137
   }
2138
   if(pid == null || pid.getValue() == null) {
2139
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2140
   }
2141

    
2142
   if (session == null) {
2143
       //TODO: many of the thrown exceptions do not use the correct error codes
2144
       //check these against the docs and correct them
2145
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2146
               "  If you are not logged in, please do so and retry the request.");
2147
   } else {
2148
         //only CN is allwoed
2149
         if(!isCNAdmin(session)) {
2150
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2151
         }
2152
   }
2153
    //update the system metadata locally  
2154
    boolean success = super.updateSystemMetadata(session, pid, sysmeta);
2155
    return success;
2156
  }
2157
  
2158
    @Override
2159
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2160
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2161

    
2162
    }
2163

    
2164
	@Override
2165
	public QueryEngineDescription getQueryEngineDescription(Session session,
2166
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2167
			NotImplemented, NotFound {
2168
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2169

    
2170
	}
2171
	
2172
	@Override
2173
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2174
			ServiceFailure, NotAuthorized, NotImplemented {
2175
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2176

    
2177
	}
2178
	
2179
	@Override
2180
	public InputStream query(Session session, String queryEngine, String query)
2181
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2182
			NotImplemented, NotFound {
2183
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2184

    
2185
	}
2186
	
2187
	@Override
2188
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2189
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2190
	}
2191

    
2192
}
(1-1/7)