Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.sql.SQLException;
29
import java.util.ArrayList;
30
import java.util.Calendar;
31
import java.util.Date;
32
import java.util.List;
33
import java.util.concurrent.locks.Lock;
34

    
35
import javax.servlet.http.HttpServletRequest;
36

    
37
import org.apache.log4j.Logger;
38
import org.dataone.client.v2.CNode;
39
import org.dataone.client.v2.itk.D1Client;
40
import org.dataone.client.v2.MNode;
41
import org.dataone.service.cn.v2.CNAuthorization;
42
import org.dataone.service.cn.v2.CNCore;
43
import org.dataone.service.cn.v2.CNRead;
44
import org.dataone.service.cn.v2.CNReplication;
45
import org.dataone.service.cn.v2.CNView;
46
import org.dataone.service.exceptions.BaseException;
47
import org.dataone.service.exceptions.IdentifierNotUnique;
48
import org.dataone.service.exceptions.InsufficientResources;
49
import org.dataone.service.exceptions.InvalidRequest;
50
import org.dataone.service.exceptions.InvalidSystemMetadata;
51
import org.dataone.service.exceptions.InvalidToken;
52
import org.dataone.service.exceptions.NotAuthorized;
53
import org.dataone.service.exceptions.NotFound;
54
import org.dataone.service.exceptions.NotImplemented;
55
import org.dataone.service.exceptions.ServiceFailure;
56
import org.dataone.service.exceptions.UnsupportedType;
57
import org.dataone.service.exceptions.VersionMismatch;
58
import org.dataone.service.types.v1.AccessPolicy;
59
import org.dataone.service.types.v1.Checksum;
60
import org.dataone.service.types.v1.ChecksumAlgorithmList;
61
import org.dataone.service.types.v1.DescribeResponse;
62
import org.dataone.service.types.v1.Event;
63
import org.dataone.service.types.v1.Identifier;
64
import org.dataone.service.types.v2.Log;
65
import org.dataone.service.types.v2.Node;
66
import org.dataone.service.types.v2.NodeList;
67
import org.dataone.service.types.v1.NodeReference;
68
import org.dataone.service.types.v1.NodeType;
69
import org.dataone.service.types.v2.ObjectFormat;
70
import org.dataone.service.types.v1.ObjectFormatIdentifier;
71
import org.dataone.service.types.v2.ObjectFormatList;
72
import org.dataone.service.types.v1.ObjectList;
73
import org.dataone.service.types.v1.ObjectLocationList;
74
import org.dataone.service.types.v1.Permission;
75
import org.dataone.service.types.v1.Replica;
76
import org.dataone.service.types.v1.ReplicationPolicy;
77
import org.dataone.service.types.v1.ReplicationStatus;
78
import org.dataone.service.types.v1.Session;
79
import org.dataone.service.types.v1.Subject;
80
import org.dataone.service.types.v2.SystemMetadata;
81
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
82
import org.dataone.service.types.v1_1.QueryEngineDescription;
83
import org.dataone.service.types.v1_1.QueryEngineList;
84

    
85
import edu.ucsb.nceas.metacat.EventLog;
86
import edu.ucsb.nceas.metacat.IdentifierManager;
87
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
88
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
89
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
90

    
91
/**
92
 * Represents Metacat's implementation of the DataONE Coordinating Node 
93
 * service API. Methods implement the various CN* interfaces, and methods common
94
 * to both Member Node and Coordinating Node interfaces are found in the
95
 * D1NodeService super class.
96
 *
97
 */
98
public class CNodeService extends D1NodeService implements CNAuthorization,
99
    CNCore, CNRead, CNReplication, CNView {
100

    
101
  /* the logger instance */
102
  private Logger logMetacat = null;
103

    
104
  /**
105
   * singleton accessor
106
   */
107
  public static CNodeService getInstance(HttpServletRequest request) { 
108
    return new CNodeService(request);
109
  }
110
  
111
  /**
112
   * Constructor, private for singleton access
113
   */
114
  private CNodeService(HttpServletRequest request) {
115
    super(request);
116
    logMetacat = Logger.getLogger(CNodeService.class);
117
        
118
  }
119
    
120
  /**
121
   * Set the replication policy for an object given the object identifier
122
   * 
123
   * @param session - the Session object containing the credentials for the Subject
124
   * @param pid - the object identifier for the given object
125
   * @param policy - the replication policy to be applied
126
   * 
127
   * @return true or false
128
   * 
129
   * @throws NotImplemented
130
   * @throws NotAuthorized
131
   * @throws ServiceFailure
132
   * @throws InvalidRequest
133
   * @throws VersionMismatch
134
   * 
135
   */
136
  @Override
137
  public boolean setReplicationPolicy(Session session, Identifier pid,
138
      ReplicationPolicy policy, long serialVersion) 
139
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
140
      InvalidRequest, InvalidToken, VersionMismatch {
141
      
142
      // do we have a valid pid?
143
      if (pid == null || pid.getValue().trim().equals("")) {
144
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
145
          
146
      }
147
      
148
      /*String serviceFailureCode = "4882";
149
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
150
      if(sid != null) {
151
          pid = sid;
152
      }*/
153
      // The lock to be used for this identifier
154
      Lock lock = null;
155
      
156
      // get the subject
157
      Subject subject = session.getSubject();
158
      
159
      // are we allowed to do this?
160
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
161
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
162
                  + " not allowed by " + subject.getValue() + " on "
163
                  + pid.getValue());
164
          
165
      }
166
      
167
      SystemMetadata systemMetadata = null;
168
      try {
169
          lock = HazelcastService.getInstance().getLock(pid.getValue());
170
          lock.lock();
171
          logMetacat.debug("Locked identifier " + pid.getValue());
172

    
173
          try {
174
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
175
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
176
                  
177
              }
178
              
179
              // did we get it correctly?
180
              if ( systemMetadata == null ) {
181
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
182
                  
183
              }
184

    
185
              // does the request have the most current system metadata?
186
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
187
                 String msg = "The requested system metadata version number " + 
188
                     serialVersion + " differs from the current version at " +
189
                     systemMetadata.getSerialVersion().longValue() +
190
                     ". Please get the latest copy in order to modify it.";
191
                 throw new VersionMismatch("4886", msg);
192
                 
193
              }
194
              
195
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
196
              throw new NotFound("4884", "No record found for: " + pid.getValue());
197
            
198
          }
199
          
200
          // set the new policy
201
          systemMetadata.setReplicationPolicy(policy);
202
          
203
          // update the metadata
204
          try {
205
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
206
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
207
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
208
              notifyReplicaNodes(systemMetadata);
209
              
210
          } catch (RuntimeException e) {
211
              throw new ServiceFailure("4882", e.getMessage());
212
          
213
          }
214
          
215
      } catch (RuntimeException e) {
216
          throw new ServiceFailure("4882", e.getMessage());
217
          
218
      } finally {
219
          lock.unlock();
220
          logMetacat.debug("Unlocked identifier " + pid.getValue());
221
          
222
      }
223
    
224
      return true;
225
  }
226

    
227
  /**
228
   * Deletes the replica from the given Member Node
229
   * NOTE: MN.delete() may be an "archive" operation. TBD.
230
   * @param session
231
   * @param pid
232
   * @param nodeId
233
   * @param serialVersion
234
   * @return
235
   * @throws InvalidToken
236
   * @throws ServiceFailure
237
   * @throws NotAuthorized
238
   * @throws NotFound
239
   * @throws NotImplemented
240
   * @throws VersionMismatch
241
   */
242
  @Override
243
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
244
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
245
	  
246
	  	// The lock to be used for this identifier
247
		Lock lock = null;
248

    
249
		// get the subject
250
		Subject subject = session.getSubject();
251

    
252
		// are we allowed to do this?
253
		boolean isAuthorized = false;
254
		try {
255
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
256
		} catch (InvalidRequest e) {
257
			throw new ServiceFailure("4882", e.getDescription());
258
		}
259
		if (!isAuthorized) {
260
			throw new NotAuthorized("4881", Permission.WRITE
261
					+ " not allowed by " + subject.getValue() + " on "
262
					+ pid.getValue());
263

    
264
		}
265

    
266
		SystemMetadata systemMetadata = null;
267
		try {
268
			lock = HazelcastService.getInstance().getLock(pid.getValue());
269
			lock.lock();
270
			logMetacat.debug("Locked identifier " + pid.getValue());
271

    
272
			try {
273
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
274
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
275
				}
276

    
277
				// did we get it correctly?
278
				if (systemMetadata == null) {
279
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
280
				}
281

    
282
				// does the request have the most current system metadata?
283
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
284
					String msg = "The requested system metadata version number "
285
							+ serialVersion
286
							+ " differs from the current version at "
287
							+ systemMetadata.getSerialVersion().longValue()
288
							+ ". Please get the latest copy in order to modify it.";
289
					throw new VersionMismatch("4886", msg);
290

    
291
				}
292

    
293
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
294
				throw new NotFound("4884", "No record found for: " + pid.getValue());
295

    
296
			}
297
			  
298
			// check permissions
299
			// TODO: is this necessary?
300
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
301
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
302
			if (!isAllowed) {
303
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
304
			}
305
			  
306
			// delete the replica from the given node
307
			// CSJ: use CN.delete() to truly delete a replica, semantically
308
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
309
			//D1Client.getMN(nodeId).delete(session, pid);
310
			  
311
			// reflect that change in the system metadata
312
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
313
			for (Replica r: systemMetadata.getReplicaList()) {
314
				  if (r.getReplicaMemberNode().equals(nodeId)) {
315
					  updatedReplicas.remove(r);
316
					  break;
317
				  }
318
			}
319
			systemMetadata.setReplicaList(updatedReplicas);
320

    
321
			// update the metadata
322
			try {
323
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
324
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
325
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
326
			} catch (RuntimeException e) {
327
				throw new ServiceFailure("4882", e.getMessage());
328
			}
329

    
330
		} catch (RuntimeException e) {
331
			throw new ServiceFailure("4882", e.getMessage());
332
		} finally {
333
			lock.unlock();
334
			logMetacat.debug("Unlocked identifier " + pid.getValue());
335
		}
336

    
337
		return true;	  
338
	  
339
  }
340
  
341
  /**
342
   * Deletes an object from the Coordinating Node
343
   * 
344
   * @param session - the Session object containing the credentials for the Subject
345
   * @param pid - The object identifier to be deleted
346
   * 
347
   * @return pid - the identifier of the object used for the deletion
348
   * 
349
   * @throws InvalidToken
350
   * @throws ServiceFailure
351
   * @throws NotAuthorized
352
   * @throws NotFound
353
   * @throws NotImplemented
354
   * @throws InvalidRequest
355
   */
356
  @Override
357
  public Identifier delete(Session session, Identifier pid) 
358
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
359
      
360
      String localId = null;      // The corresponding docid for this pid
361
	  Lock lock = null;           // The lock to be used for this identifier
362
      CNode cn = null;            // a reference to the CN to get the node list    
363
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
364
      List<Node> nodeList = null; // the list of nodes in this CN environment
365

    
366
      // check for a valid session
367
      if (session == null) {
368
        	throw new InvalidToken("4963", "No session has been provided");
369
        	
370
      }
371

    
372
      // do we have a valid pid?
373
      if (pid == null || pid.getValue().trim().equals("")) {
374
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
375
          
376
      }
377
      
378
      String serviceFailureCode = "4962";
379
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
380
      if(sid != null) {
381
          pid = sid;
382
      }
383

    
384
	  // check that it is CN/admin
385
	  boolean allowed = isAdminAuthorized(session);
386
	  
387
	  // additional check if it is the authoritative node if it is not the admin
388
      if(!allowed) {
389
          allowed = isAuthoritativeMNodeAdmin(session, pid);
390
          
391
      }
392
	  
393
	  if (!allowed) {
394
		  String msg = "The subject " + session.getSubject().getValue() + 
395
			  " is not allowed to call delete() on a Coordinating Node.";
396
		  logMetacat.info(msg);
397
		  throw new NotAuthorized("4960", msg);
398
		  
399
	  }
400
	  
401
	  // Don't defer to superclass implementation without a locally registered identifier
402
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
403
      // Check for the existing identifier
404
      try {
405
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
406
          super.delete(session, pid);
407
          
408
      } catch (McdbDocNotFoundException e) {
409
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
410
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
411
    	  
412
          try {
413
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
414
  			  lock.lock();
415
  			  logMetacat.debug("Locked identifier " + pid.getValue());
416

    
417
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
418
			  if ( sysMeta != null ) {
419
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
420
				sysMeta.setArchived(true);
421
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
422
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
423
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
424
	            //since this is cn, we don't need worry about the mn solr index.
425
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
426
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
427
	            String username = session.getSubject().getValue();//just for logging purpose
428
                //since data objects were not registered in the identifier table, we use pid as the docid
429
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
430
				
431
			  } else {
432
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
433
					  ". Couldn't obtain the system metadata record.");
434
				  
435
			  }
436
			  
437
		  } catch (RuntimeException re) {
438
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
439
				  ". The error message was: " + re.getMessage());
440
			  
441
		  } finally {
442
			  lock.unlock();
443
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
444

    
445
		  }
446

    
447
          // NOTE: cannot log the delete without localId
448
//          EventLog.getInstance().log(request.getRemoteAddr(), 
449
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
450
//                  pid.getValue(), Event.DELETE.xmlValue());
451

    
452
      } catch (SQLException e) {
453
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
454
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
455
      }
456

    
457
      // get the node list
458
      try {
459
          cn = D1Client.getCN();
460
          nodeList = cn.listNodes().getNodeList();
461
          
462
      } catch (Exception e) { // handle BaseException and other I/O issues
463
          
464
          // swallow errors since the call is not critical
465
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
466
              " due to communication issues with the CN: " + e.getMessage());
467
          
468
      }
469

    
470
	  // notify the replicas
471
	  if (systemMetadata.getReplicaList() != null) {
472
		  for (Replica replica: systemMetadata.getReplicaList()) {
473
			  NodeReference replicaNode = replica.getReplicaMemberNode();
474
			  try {
475
                  if (nodeList != null) {
476
                      // find the node type
477
                      for (Node node : nodeList) {
478
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
479
                              nodeType = node.getType();
480
                              break;
481
              
482
                          }
483
                      }
484
                  }
485
                  
486
                  // only send call MN.delete() to avoid an infinite loop with the CN
487
                  if (nodeType != null && nodeType == NodeType.MN) {
488
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
489
                  }
490
                  
491
			  } catch (Exception e) {
492
				  // all we can really do is log errors and carry on with life
493
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
494
					  " from replica MN: " + replicaNode.getValue(), e);
495
			}
496
			  
497
		  }
498
	  }
499
	  
500
	  return pid;
501
      
502
  }
503
  
504
  /**
505
   * Archives an object from the Coordinating Node
506
   * 
507
   * @param session - the Session object containing the credentials for the Subject
508
   * @param pid - The object identifier to be deleted
509
   * 
510
   * @return pid - the identifier of the object used for the deletion
511
   * 
512
   * @throws InvalidToken
513
   * @throws ServiceFailure
514
   * @throws NotAuthorized
515
   * @throws NotFound
516
   * @throws NotImplemented
517
   * @throws InvalidRequest
518
   */
519
  @Override
520
  public Identifier archive(Session session, Identifier pid) 
521
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
522

    
523
      String localId = null; // The corresponding docid for this pid
524
	  Lock lock = null;      // The lock to be used for this identifier
525
      CNode cn = null;            // a reference to the CN to get the node list    
526
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
527
      List<Node> nodeList = null; // the list of nodes in this CN environment
528
      
529

    
530
      // check for a valid session
531
      if (session == null) {
532
        	throw new InvalidToken("4973", "No session has been provided");
533
        	
534
      }
535

    
536
      // do we have a valid pid?
537
      if (pid == null || pid.getValue().trim().equals("")) {
538
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
539
          
540
      }
541

    
542
	  // check that it is CN/admin
543
	  boolean allowed = isAdminAuthorized(session);
544
	  
545
	  String serviceFailureCode = "4972";
546
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
547
	  if(sid != null) {
548
	        pid = sid;
549
	  }
550
	  
551
	  //check if it is the authoritative member node
552
	  if(!allowed) {
553
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
554
	  }
555
	  
556
	  if (!allowed) {
557
		  String msg = "The subject " + session.getSubject().getValue() + 
558
				  " is not allowed to call archive() on a Coordinating Node.";
559
		  logMetacat.info(msg);
560
		  throw new NotAuthorized("4970", msg);
561
	  }
562
	  
563
      // Check for the existing identifier
564
      try {
565
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
566
          super.archive(session, pid);
567
          
568
      } catch (McdbDocNotFoundException e) {
569
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
570
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
571
    	  
572
          try {
573
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
574
  			  lock.lock();
575
  			  logMetacat.debug("Locked identifier " + pid.getValue());
576

    
577
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
578
			  if ( sysMeta != null ) {
579
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
580
				sysMeta.setArchived(true);
581
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
582
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
583
			    // notify the replicas
584
				notifyReplicaNodes(sysMeta);
585
				  
586
			  } else {
587
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
588
					  ". Couldn't obtain the system metadata record.");
589
				  
590
			  }
591
			  
592
		  } catch (RuntimeException re) {
593
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
594
				  ". The error message was: " + re.getMessage());
595
			  
596
		  } finally {
597
			  lock.unlock();
598
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
599

    
600
		  }
601

    
602
          // NOTE: cannot log the archive without localId
603
//          EventLog.getInstance().log(request.getRemoteAddr(), 
604
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
605
//                  pid.getValue(), Event.DELETE.xmlValue());
606

    
607
      } catch (SQLException e) {
608
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
609
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
610
      }
611

    
612
	  return pid;
613
      
614
  }
615
  
616
  /**
617
   * Set the obsoletedBy attribute in System Metadata
618
   * @param session
619
   * @param pid
620
   * @param obsoletedByPid
621
   * @param serialVersion
622
   * @return
623
   * @throws NotImplemented
624
   * @throws NotFound
625
   * @throws NotAuthorized
626
   * @throws ServiceFailure
627
   * @throws InvalidRequest
628
   * @throws InvalidToken
629
   * @throws VersionMismatch
630
   */
631
  @Override
632
  public boolean setObsoletedBy(Session session, Identifier pid,
633
			Identifier obsoletedByPid, long serialVersion)
634
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
635
			InvalidRequest, InvalidToken, VersionMismatch {
636
      
637
      // do we have a valid pid?
638
      if (pid == null || pid.getValue().trim().equals("")) {
639
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
640
          
641
      }
642
      
643
      /*String serviceFailureCode = "4941";
644
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
645
      if(sid != null) {
646
          pid = sid;
647
      }*/
648
      
649
      // do we have a valid pid?
650
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
651
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
652
          
653
      }
654
      
655
      try {
656
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
657
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
658
          }
659
      } catch (SQLException ee) {
660
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
661
      }
662
      
663

    
664
		// The lock to be used for this identifier
665
		Lock lock = null;
666

    
667
		// get the subject
668
		Subject subject = session.getSubject();
669

    
670
		// are we allowed to do this?
671
		if (!isAuthorized(session, pid, Permission.WRITE)) {
672
			throw new NotAuthorized("4881", Permission.WRITE
673
					+ " not allowed by " + subject.getValue() + " on "
674
					+ pid.getValue());
675

    
676
		}
677

    
678

    
679
		SystemMetadata systemMetadata = null;
680
		try {
681
			lock = HazelcastService.getInstance().getLock(pid.getValue());
682
			lock.lock();
683
			logMetacat.debug("Locked identifier " + pid.getValue());
684

    
685
			try {
686
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
687
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
688
				}
689

    
690
				// did we get it correctly?
691
				if (systemMetadata == null) {
692
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
693
				}
694

    
695
				// does the request have the most current system metadata?
696
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
697
					String msg = "The requested system metadata version number "
698
							+ serialVersion
699
							+ " differs from the current version at "
700
							+ systemMetadata.getSerialVersion().longValue()
701
							+ ". Please get the latest copy in order to modify it.";
702
					throw new VersionMismatch("4886", msg);
703

    
704
				}
705

    
706
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
707
				throw new NotFound("4884", "No record found for: " + pid.getValue());
708

    
709
			}
710

    
711
			// set the new policy
712
			systemMetadata.setObsoletedBy(obsoletedByPid);
713

    
714
			// update the metadata
715
			try {
716
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
717
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
718
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
719
			} catch (RuntimeException e) {
720
				throw new ServiceFailure("4882", e.getMessage());
721
			}
722

    
723
		} catch (RuntimeException e) {
724
			throw new ServiceFailure("4882", e.getMessage());
725
		} finally {
726
			lock.unlock();
727
			logMetacat.debug("Unlocked identifier " + pid.getValue());
728
		}
729

    
730
		return true;
731
	}
732
  
733
  
734
  /**
735
   * Set the replication status for an object given the object identifier
736
   * 
737
   * @param session - the Session object containing the credentials for the Subject
738
   * @param pid - the object identifier for the given object
739
   * @param status - the replication status to be applied
740
   * 
741
   * @return true or false
742
   * 
743
   * @throws NotImplemented
744
   * @throws NotAuthorized
745
   * @throws ServiceFailure
746
   * @throws InvalidRequest
747
   * @throws InvalidToken
748
   * @throws NotFound
749
   * 
750
   */
751
  @Override
752
  public boolean setReplicationStatus(Session session, Identifier pid,
753
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
754
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
755
      InvalidRequest, NotFound {
756
	  
757
	  // cannot be called by public
758
	  if (session == null) {
759
		  throw new NotAuthorized("4720", "Session cannot be null");
760
	  }
761
	  
762
	// do we have a valid pid?
763
      if (pid == null || pid.getValue().trim().equals("")) {
764
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
765
          
766
      }
767
      
768
      /*String serviceFailureCode = "4700";
769
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
770
      if(sid != null) {
771
          pid = sid;
772
      }*/
773
      
774
      // The lock to be used for this identifier
775
      Lock lock = null;
776
      
777
      boolean allowed = false;
778
      int replicaEntryIndex = -1;
779
      List<Replica> replicas = null;
780
      // get the subject
781
      Subject subject = session.getSubject();
782
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
783
          " is " + status.toString());
784
      
785
      SystemMetadata systemMetadata = null;
786

    
787
      try {
788
          lock = HazelcastService.getInstance().getLock(pid.getValue());
789
          lock.lock();
790
          logMetacat.debug("Locked identifier " + pid.getValue());
791

    
792
          try {      
793
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
794

    
795
              // did we get it correctly?
796
              if ( systemMetadata == null ) {
797
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
798
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
799
                  
800
              }
801
              replicas = systemMetadata.getReplicaList();
802
              int count = 0;
803
              
804
              // was there a failure? log it
805
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
806
                 String msg = "The replication request of the object identified by " + 
807
                     pid.getValue() + " failed.  The error message was " +
808
                     failure.getMessage() + ".";
809
              }
810
              
811
              if (replicas.size() > 0 && replicas != null) {
812
                  // find the target replica index in the replica list
813
                  for (Replica replica : replicas) {
814
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
815
                      String targetNodeStr = targetNode.getValue();
816
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
817
                  
818
                      if (replicaNodeStr.equals(targetNodeStr)) {
819
                          replicaEntryIndex = count;
820
                          logMetacat.debug("replica entry index is: "
821
                                  + replicaEntryIndex);
822
                          break;
823
                      }
824
                      count++;
825
                  
826
                  }
827
              }
828
              // are we allowed to do this? only CNs and target MNs are allowed
829
              CNode cn = D1Client.getCN();
830
              List<Node> nodes = cn.listNodes().getNodeList();
831
              
832
              // find the node in the node list
833
              for ( Node node : nodes ) {
834
                  
835
                  NodeReference nodeReference = node.getIdentifier();
836
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
837
                      nodeReference.getValue());
838
                  
839
                  // allow target MN certs
840
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
841
                      node.getType().equals(NodeType.MN)) {
842
                      List<Subject> nodeSubjects = node.getSubjectList();
843
                      
844
                      // check if the session subject is in the node subject list
845
                      for (Subject nodeSubject : nodeSubjects) {
846
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
847
                                  nodeSubject.getValue() + " and " + subject.getValue());
848
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
849
                              
850
                              // lastly limit to COMPLETED, INVALIDATED,
851
                              // and FAILED status updates from MNs only
852
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
853
                                   status.equals(ReplicationStatus.INVALIDATED) ||
854
                                   status.equals(ReplicationStatus.FAILED)) {
855
                                  allowed = true;
856
                                  break;
857
                                  
858
                              }                              
859
                          }
860
                      }                 
861
                  }
862
              }
863

    
864
              if ( !allowed ) {
865
                  //check for CN admin access
866
                  allowed = isAuthorized(session, pid, Permission.WRITE);
867
                  
868
              }              
869
              
870
              if ( !allowed ) {
871
                  String msg = "The subject identified by "
872
                          + subject.getValue()
873
                          + " does not have permission to set the replication status for "
874
                          + "the replica identified by "
875
                          + targetNode.getValue() + ".";
876
                  logMetacat.info(msg);
877
                  throw new NotAuthorized("4720", msg);
878
                  
879
              }
880

    
881
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
882
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
883
                " : " + e.getMessage());
884
            
885
          }
886
          
887
          Replica targetReplica = new Replica();
888
          // set the status for the replica
889
          if ( replicaEntryIndex != -1 ) {
890
              targetReplica = replicas.get(replicaEntryIndex);
891
              
892
              // don't allow status to change from COMPLETED to anything other
893
              // than INVALIDATED: prevents overwrites from race conditions
894
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
895
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
896
            	  throw new InvalidRequest("4730", "Status state change from " +
897
            			  targetReplica.getReplicationStatus() + " to " +
898
            			  status.toString() + "is prohibited for identifier " +
899
            			  pid.getValue() + " and target node " + 
900
            			  targetReplica.getReplicaMemberNode().getValue());
901
              }
902
              
903
              targetReplica.setReplicationStatus(status);
904
              
905
              logMetacat.debug("Set the replication status for " + 
906
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
907
                  targetReplica.getReplicationStatus() + " for identifier " +
908
                  pid.getValue());
909
              
910
          } else {
911
              // this is a new entry, create it
912
              targetReplica.setReplicaMemberNode(targetNode);
913
              targetReplica.setReplicationStatus(status);
914
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
915
              replicas.add(targetReplica);
916
              
917
          }
918
          
919
          systemMetadata.setReplicaList(replicas);
920
                
921
          // update the metadata
922
          try {
923
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
924
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
925
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
926

    
927
              if ( !status.equals(ReplicationStatus.QUEUED) && 
928
            	   !status.equals(ReplicationStatus.REQUESTED)) {
929
                  
930
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
931
                          "\tNODE:\t" + targetNode.getValue() + 
932
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
933
                
934
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
935
                          "\tPID:\t"  + pid.getValue() + 
936
                          "\tNODE:\t" + targetNode.getValue() + 
937
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
938
              }
939

    
940
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
941
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
942
                      " on target node " + targetNode + ". The exception was: " +
943
                      failure.getMessage());
944
              }
945
              
946
			  // update the replica nodes about the completed replica when complete
947
              if (status.equals(ReplicationStatus.COMPLETED)) {
948
				notifyReplicaNodes(systemMetadata);
949
			}
950
          
951
          } catch (RuntimeException e) {
952
              throw new ServiceFailure("4700", e.getMessage());
953
          
954
          }
955
          
956
    } catch (RuntimeException e) {
957
        String msg = "There was a RuntimeException getting the lock for " +
958
            pid.getValue();
959
        logMetacat.info(msg);
960
        
961
    } finally {
962
        lock.unlock();
963
        logMetacat.debug("Unlocked identifier " + pid.getValue());
964
        
965
    }
966
      
967
      return true;
968
  }
969
  
970
/**
971
   * Return the checksum of the object given the identifier 
972
   * 
973
   * @param session - the Session object containing the credentials for the Subject
974
   * @param pid - the object identifier for the given object
975
   * 
976
   * @return checksum - the checksum of the object
977
   * 
978
   * @throws InvalidToken
979
   * @throws ServiceFailure
980
   * @throws NotAuthorized
981
   * @throws NotFound
982
   * @throws NotImplemented
983
   */
984
  @Override
985
  public Checksum getChecksum(Session session, Identifier pid)
986
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
987
    NotImplemented {
988
    
989
	boolean isAuthorized = false;
990
	try {
991
		isAuthorized = isAuthorized(session, pid, Permission.READ);
992
	} catch (InvalidRequest e) {
993
		throw new ServiceFailure("1410", e.getDescription());
994
	}  
995
    if (!isAuthorized) {
996
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
997
    }
998
    
999
    SystemMetadata systemMetadata = null;
1000
    Checksum checksum = null;
1001
    
1002
    try {
1003
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1004

    
1005
        if (systemMetadata == null ) {
1006
            String error ="";
1007
            String localId = null;
1008
            try {
1009
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1010
              
1011
             } catch (Exception e) {
1012
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1013
            }
1014
            
1015
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1016
                error = DELETEDMESSAGE;
1017
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1018
                error = DELETEDMESSAGE;
1019
            }
1020
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1021
        }
1022
        checksum = systemMetadata.getChecksum();
1023
        
1024
    } catch (RuntimeException e) {
1025
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1026
            pid.getValue() + ". The error message was: " + e.getMessage());
1027
      
1028
    }
1029
    
1030
    return checksum;
1031
  }
1032

    
1033
  /**
1034
   * Resolve the location of a given object
1035
   * 
1036
   * @param session - the Session object containing the credentials for the Subject
1037
   * @param pid - the object identifier for the given object
1038
   * 
1039
   * @return objectLocationList - the list of nodes known to contain the object
1040
   * 
1041
   * @throws InvalidToken
1042
   * @throws ServiceFailure
1043
   * @throws NotAuthorized
1044
   * @throws NotFound
1045
   * @throws NotImplemented
1046
   */
1047
  @Override
1048
  public ObjectLocationList resolve(Session session, Identifier pid)
1049
    throws InvalidToken, ServiceFailure, NotAuthorized,
1050
    NotFound, NotImplemented {
1051

    
1052
    throw new NotImplemented("4131", "resolve not implemented");
1053

    
1054
  }
1055

    
1056
  /**
1057
   * Metacat does not implement this method at the CN level
1058
   */
1059
  @Override
1060
  public ObjectList search(Session session, String queryType, String query)
1061
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1062
    NotImplemented {
1063

    
1064
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1065
	  
1066
//    ObjectList objectList = null;
1067
//    try {
1068
//        objectList = 
1069
//          IdentifierManager.getInstance().querySystemMetadata(
1070
//              null, //startTime, 
1071
//              null, //endTime,
1072
//              null, //objectFormat, 
1073
//              false, //replicaStatus, 
1074
//              0, //start, 
1075
//              1000 //count
1076
//              );
1077
//        
1078
//    } catch (Exception e) {
1079
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1080
//    }
1081
//
1082
//      return objectList;
1083
		  
1084
  }
1085
  
1086
  /**
1087
   * Returns the object format registered in the DataONE Object Format 
1088
   * Vocabulary for the given format identifier
1089
   * 
1090
   * @param fmtid - the identifier of the format requested
1091
   * 
1092
   * @return objectFormat - the object format requested
1093
   * 
1094
   * @throws ServiceFailure
1095
   * @throws NotFound
1096
   * @throws InsufficientResources
1097
   * @throws NotImplemented
1098
   */
1099
  @Override
1100
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1101
    throws ServiceFailure, NotFound, NotImplemented {
1102
     
1103
      return ObjectFormatService.getInstance().getFormat(fmtid);
1104
      
1105
  }
1106

    
1107
  /**
1108
   * Returns a list of all object formats registered in the DataONE Object 
1109
   * Format Vocabulary
1110
    * 
1111
   * @return objectFormatList - The list of object formats registered in 
1112
   *                            the DataONE Object Format Vocabulary
1113
   * 
1114
   * @throws ServiceFailure
1115
   * @throws NotImplemented
1116
   * @throws InsufficientResources
1117
   */
1118
  @Override
1119
  public ObjectFormatList listFormats() 
1120
    throws ServiceFailure, NotImplemented {
1121

    
1122
    return ObjectFormatService.getInstance().listFormats();
1123
  }
1124

    
1125
  /**
1126
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1127
    * 
1128
   * @return nodeList - List of nodes from the registry
1129
   * 
1130
   * @throws ServiceFailure
1131
   * @throws NotImplemented
1132
   */
1133
  @Override
1134
  public NodeList listNodes() 
1135
    throws NotImplemented, ServiceFailure {
1136

    
1137
    throw new NotImplemented("4800", "listNodes not implemented");
1138
  }
1139

    
1140
  /**
1141
   * Provides a mechanism for adding system metadata independently of its 
1142
   * associated object, such as when adding system metadata for data objects.
1143
    * 
1144
   * @param session - the Session object containing the credentials for the Subject
1145
   * @param pid - The identifier of the object to register the system metadata against
1146
   * @param sysmeta - The system metadata to be registered
1147
   * 
1148
   * @return true if the registration succeeds
1149
   * 
1150
   * @throws NotImplemented
1151
   * @throws NotAuthorized
1152
   * @throws ServiceFailure
1153
   * @throws InvalidRequest
1154
   * @throws InvalidSystemMetadata
1155
   */
1156
  @Override
1157
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1158
      SystemMetadata sysmeta) 
1159
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1160
      InvalidSystemMetadata {
1161

    
1162
      // The lock to be used for this identifier
1163
      Lock lock = null;
1164

    
1165
      // TODO: control who can call this?
1166
      if (session == null) {
1167
          //TODO: many of the thrown exceptions do not use the correct error codes
1168
          //check these against the docs and correct them
1169
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1170
                  "  If you are not logged in, please do so and retry the request.");
1171
      }
1172
      // the identifier can't be an SID
1173
      try {
1174
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1175
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1176
          }
1177
      } catch (SQLException sqle) {
1178
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1179
      }
1180
      
1181
      // verify that guid == SystemMetadata.getIdentifier()
1182
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1183
          "|" + sysmeta.getIdentifier().getValue());
1184
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1185
          throw new InvalidRequest("4863", 
1186
              "The identifier in method call (" + pid.getValue() + 
1187
              ") does not match identifier in system metadata (" +
1188
              sysmeta.getIdentifier().getValue() + ").");
1189
      }
1190
      
1191
      //check if the sid is legitimate in the system metadata
1192
      checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1193

    
1194
      try {
1195
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1196
          lock.lock();
1197
          logMetacat.debug("Locked identifier " + pid.getValue());
1198
          logMetacat.debug("Checking if identifier exists...");
1199
          // Check that the identifier does not already exist
1200
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1201
              throw new InvalidRequest("4863", 
1202
                  "The identifier is already in use by an existing object.");
1203
          
1204
          }
1205
          
1206
          // insert the system metadata into the object store
1207
          logMetacat.debug("Starting to insert SystemMetadata...");
1208
          try {
1209
              sysmeta.setSerialVersion(BigInteger.ONE);
1210
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1211
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1212
              
1213
          } catch (RuntimeException e) {
1214
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1215
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1216
                  e.getClass() + ": " + e.getMessage());
1217
              
1218
          }
1219
          
1220
      } catch (RuntimeException e) {
1221
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1222
                  e.getClass() + ": " + e.getMessage());
1223
          
1224
      }  finally {
1225
          lock.unlock();
1226
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1227
          
1228
      }
1229

    
1230
      
1231
      logMetacat.debug("Returning from registerSystemMetadata");
1232
      
1233
      try {
1234
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1235
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1236
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1237
    	          localId, "registerSystemMetadata");
1238
      } catch (McdbDocNotFoundException e) {
1239
    	  // do nothing, no localId to log with
1240
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1241
      } catch (SQLException ee) {
1242
          // do nothing, no localId to log with
1243
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1244
      }
1245
      
1246
      
1247
      return pid;
1248
  }
1249
  
1250
  /**
1251
   * Given an optional scope and format, reserves and returns an identifier 
1252
   * within that scope and format that is unique and will not be 
1253
   * used by any other sessions. 
1254
    * 
1255
   * @param session - the Session object containing the credentials for the Subject
1256
   * @param pid - The identifier of the object to register the system metadata against
1257
   * @param scope - An optional string to be used to qualify the scope of 
1258
   *                the identifier namespace, which is applied differently 
1259
   *                depending on the format requested. If scope is not 
1260
   *                supplied, a default scope will be used.
1261
   * @param format - The optional name of the identifier format to be used, 
1262
   *                  drawn from a DataONE-specific vocabulary of identifier 
1263
   *                 format names, including several common syntaxes such 
1264
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1265
   *                 format is not supplied by the caller, the CN service 
1266
   *                 will use a default identifier format, which may change 
1267
   *                 over time.
1268
   * 
1269
   * @return true if the registration succeeds
1270
   * 
1271
   * @throws InvalidToken
1272
   * @throws ServiceFailure
1273
   * @throws NotAuthorized
1274
   * @throws IdentifierNotUnique
1275
   * @throws NotImplemented
1276
   */
1277
  @Override
1278
  public Identifier reserveIdentifier(Session session, Identifier pid)
1279
  throws InvalidToken, ServiceFailure,
1280
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1281

    
1282
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1283
  }
1284
  
1285
  @Override
1286
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1287
  throws InvalidToken, ServiceFailure,
1288
        NotAuthorized, NotImplemented, InvalidRequest {
1289
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1290
  }
1291
  
1292
  /**
1293
    * Checks whether the pid is reserved by the subject in the session param
1294
    * If the reservation is held on the pid by the subject, we return true.
1295
    * 
1296
   * @param session - the Session object containing the Subject
1297
   * @param pid - The identifier to check
1298
   * 
1299
   * @return true if the reservation exists for the subject/pid
1300
   * 
1301
   * @throws InvalidToken
1302
   * @throws ServiceFailure
1303
   * @throws NotFound - when the pid is not found (in use or in reservation)
1304
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1305
   * @throws IdentifierNotUnique - when the pid is in use
1306
   * @throws NotImplemented
1307
   */
1308

    
1309
  @Override
1310
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1311
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1312
      NotImplemented, InvalidRequest {
1313
  
1314
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1315
  }
1316

    
1317
  /**
1318
   * Changes ownership (RightsHolder) of the specified object to the 
1319
   * subject specified by userId
1320
    * 
1321
   * @param session - the Session object containing the credentials for the Subject
1322
   * @param pid - Identifier of the object to be modified
1323
   * @param userId - The subject that will be taking ownership of the specified object.
1324
   *
1325
   * @return pid - the identifier of the modified object
1326
   * 
1327
   * @throws ServiceFailure
1328
   * @throws InvalidToken
1329
   * @throws NotFound
1330
   * @throws NotAuthorized
1331
   * @throws NotImplemented
1332
   * @throws InvalidRequest
1333
   */  
1334
  @Override
1335
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1336
      long serialVersion)
1337
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1338
      NotImplemented, InvalidRequest, VersionMismatch {
1339
      
1340
      // The lock to be used for this identifier
1341
      Lock lock = null;
1342

    
1343
      // get the subject
1344
      Subject subject = session.getSubject();
1345
      
1346
      String serviceFailureCode = "4490";
1347
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1348
      if(sid != null) {
1349
          pid = sid;
1350
      }
1351
      
1352
      // are we allowed to do this?
1353
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1354
          throw new NotAuthorized("4440", "not allowed by "
1355
                  + subject.getValue() + " on " + pid.getValue());
1356
          
1357
      }
1358
      
1359
      SystemMetadata systemMetadata = null;
1360
      try {
1361
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1362
          lock.lock();
1363
          logMetacat.debug("Locked identifier " + pid.getValue());
1364

    
1365
          try {
1366
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1367
              
1368
              // does the request have the most current system metadata?
1369
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1370
                 String msg = "The requested system metadata version number " + 
1371
                     serialVersion + " differs from the current version at " +
1372
                     systemMetadata.getSerialVersion().longValue() +
1373
                     ". Please get the latest copy in order to modify it.";
1374
                 throw new VersionMismatch("4443", msg);
1375
              }
1376
              
1377
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1378
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1379
              
1380
          }
1381
              
1382
          // set the new rights holder
1383
          systemMetadata.setRightsHolder(userId);
1384
          
1385
          // update the metadata
1386
          try {
1387
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1388
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1389
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1390
              notifyReplicaNodes(systemMetadata);
1391
              
1392
          } catch (RuntimeException e) {
1393
              throw new ServiceFailure("4490", e.getMessage());
1394
          
1395
          }
1396
          
1397
      } catch (RuntimeException e) {
1398
          throw new ServiceFailure("4490", e.getMessage());
1399
          
1400
      } finally {
1401
          lock.unlock();
1402
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1403
      
1404
      }
1405
      
1406
      return pid;
1407
  }
1408

    
1409
  /**
1410
   * Verify that a replication task is authorized by comparing the target node's
1411
   * Subject (from the X.509 certificate-derived Session) with the list of 
1412
   * subjects in the known, pending replication tasks map.
1413
   * 
1414
   * @param originatingNodeSession - Session information that contains the 
1415
   *                                 identity of the calling user
1416
   * @param targetNodeSubject - Subject identifying the target node
1417
   * @param pid - the identifier of the object to be replicated
1418
   * @param replicatePermission - the execute permission to be granted
1419
   * 
1420
   * @throws ServiceFailure
1421
   * @throws NotImplemented
1422
   * @throws InvalidToken
1423
   * @throws NotAuthorized
1424
   * @throws InvalidRequest
1425
   * @throws NotFound
1426
   */
1427
  @Override
1428
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1429
    Subject targetNodeSubject, Identifier pid) 
1430
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1431
    NotFound, InvalidRequest {
1432
    
1433
    boolean isAllowed = false;
1434
    SystemMetadata sysmeta = null;
1435
    NodeReference targetNode = null;
1436
    
1437
    try {
1438
      // get the target node reference from the nodes list
1439
      CNode cn = D1Client.getCN();
1440
      List<Node> nodes = cn.listNodes().getNodeList();
1441
      
1442
      if ( nodes != null ) {
1443
        for (Node node : nodes) {
1444
            
1445
        	if (node.getSubjectList() != null) {
1446
        		
1447
	            for (Subject nodeSubject : node.getSubjectList()) {
1448
	            	
1449
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1450
	                    targetNode = node.getIdentifier();
1451
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1452
	                    break;
1453
	                }
1454
	            }
1455
        	}
1456
            
1457
            if ( targetNode != null) { break; }
1458
        }
1459
        
1460
      } else {
1461
          String msg = "Couldn't get the node list from the CN";
1462
          logMetacat.debug(msg);
1463
          throw new ServiceFailure("4872", msg);
1464
          
1465
      }
1466
      
1467
      // can't find a node listed with the given subject
1468
      if ( targetNode == null ) {
1469
          String msg = "There is no Member Node registered with a node subject " +
1470
              "matching " + targetNodeSubject.getValue();
1471
          logMetacat.info(msg);
1472
          throw new NotAuthorized("4871", msg);
1473
          
1474
      }
1475
      
1476
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1477
      
1478
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1479

    
1480
      if ( sysmeta != null ) {
1481
          
1482
          List<Replica> replicaList = sysmeta.getReplicaList();
1483
          
1484
          if ( replicaList != null ) {
1485
              
1486
              // find the replica with the status set to 'requested'
1487
              for (Replica replica : replicaList) {
1488
                  ReplicationStatus status = replica.getReplicationStatus();
1489
                  NodeReference listedNode = replica.getReplicaMemberNode();
1490
                  if ( listedNode != null && targetNode != null ) {
1491
                      logMetacat.debug("Comparing " + listedNode.getValue()
1492
                              + " to " + targetNode.getValue());
1493
                      
1494
                      if (listedNode.getValue().equals(targetNode.getValue())
1495
                              && status.equals(ReplicationStatus.REQUESTED)) {
1496
                          isAllowed = true;
1497
                          break;
1498

    
1499
                      }
1500
                  }
1501
              }
1502
          }
1503
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1504
              "to replicate: " + isAllowed + " for " + pid.getValue());
1505

    
1506
          
1507
      } else {
1508
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1509
          " is null.");
1510
          String error ="";
1511
          String localId = null;
1512
          try {
1513
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1514
            
1515
           } catch (Exception e) {
1516
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1517
          }
1518
          
1519
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1520
              error = DELETEDMESSAGE;
1521
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1522
              error = DELETEDMESSAGE;
1523
          }
1524
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1525
          
1526
      }
1527

    
1528
    } catch (RuntimeException e) {
1529
    	  ServiceFailure sf = new ServiceFailure("4872", 
1530
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1531
                e.getMessage());
1532
    	  sf.initCause(e);
1533
        throw sf;
1534
        
1535
    }
1536
      
1537
    return isAllowed;
1538
    
1539
  }
1540

    
1541
  /**
1542
   * Adds a new object to the Node, where the object is a science metadata object.
1543
   * 
1544
   * @param session - the Session object containing the credentials for the Subject
1545
   * @param pid - The object identifier to be created
1546
   * @param object - the object bytes
1547
   * @param sysmeta - the system metadata that describes the object  
1548
   * 
1549
   * @return pid - the object identifier created
1550
   * 
1551
   * @throws InvalidToken
1552
   * @throws ServiceFailure
1553
   * @throws NotAuthorized
1554
   * @throws IdentifierNotUnique
1555
   * @throws UnsupportedType
1556
   * @throws InsufficientResources
1557
   * @throws InvalidSystemMetadata
1558
   * @throws NotImplemented
1559
   * @throws InvalidRequest
1560
   */
1561
  public Identifier create(Session session, Identifier pid, InputStream object,
1562
    SystemMetadata sysmeta) 
1563
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1564
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1565
    NotImplemented, InvalidRequest {
1566
       
1567
   // verify the pid is valid format
1568
      if (!isValidIdentifier(pid)) {
1569
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1570
      }
1571
      // The lock to be used for this identifier
1572
      Lock lock = null;
1573

    
1574
      try {
1575
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1576
          lock.lock();
1577
          // are we allowed?
1578
          boolean isAllowed = false;
1579
          isAllowed = isAdminAuthorized(session);
1580
          
1581
          // additional check if it is the authoritative node if it is not the admin
1582
          if(!isAllowed) {
1583
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1584
          }
1585

    
1586
          // proceed if we're called by a CN
1587
          if ( isAllowed ) {
1588
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1589
              checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1590
              // create the coordinating node version of the document      
1591
              logMetacat.debug("Locked identifier " + pid.getValue());
1592
              sysmeta.setSerialVersion(BigInteger.ONE);
1593
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1594
              //sysmeta.setArchived(false); // this is a create op, not update
1595
              
1596
              // the CN should have set the origin and authoritative member node fields
1597
              try {
1598
                  sysmeta.getOriginMemberNode().getValue();
1599
                  sysmeta.getAuthoritativeMemberNode().getValue();
1600
                  
1601
              } catch (NullPointerException npe) {
1602
                  throw new InvalidSystemMetadata("4896", 
1603
                      "Both the origin and authoritative member node identifiers need to be set.");
1604
                  
1605
              }
1606
              pid = super.create(session, pid, object, sysmeta);
1607

    
1608
          } else {
1609
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1610
                  " isn't allowed to call create() on a Coordinating Node.";
1611
              logMetacat.info(msg);
1612
              throw new NotAuthorized("1100", msg);
1613
          }
1614
          
1615
      } catch (RuntimeException e) {
1616
          // Convert Hazelcast runtime exceptions to service failures
1617
          String msg = "There was a problem creating the object identified by " +
1618
              pid.getValue() + ". There error message was: " + e.getMessage();
1619
          throw new ServiceFailure("4893", msg);
1620
          
1621
      } finally {
1622
    	  if (lock != null) {
1623
	          lock.unlock();
1624
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1625
    	  }
1626
      }
1627
      
1628
      return pid;
1629

    
1630
  }
1631

    
1632
  /**
1633
   * Set access for a given object using the object identifier and a Subject
1634
   * under a given Session.
1635
   * 
1636
   * @param session - the Session object containing the credentials for the Subject
1637
   * @param pid - the object identifier for the given object to apply the policy
1638
   * @param policy - the access policy to be applied
1639
   * 
1640
   * @return true if the application of the policy succeeds
1641
   * @throws InvalidToken
1642
   * @throws ServiceFailure
1643
   * @throws NotFound
1644
   * @throws NotAuthorized
1645
   * @throws NotImplemented
1646
   * @throws InvalidRequest
1647
   */
1648
  public boolean setAccessPolicy(Session session, Identifier pid, 
1649
      AccessPolicy accessPolicy, long serialVersion) 
1650
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1651
      NotImplemented, InvalidRequest, VersionMismatch {
1652
      
1653
   // do we have a valid pid?
1654
      if (pid == null || pid.getValue().trim().equals("")) {
1655
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1656
          
1657
      }
1658
      
1659
      String serviceFailureCode = "4430";
1660
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1661
      if(sid != null) {
1662
          pid = sid;
1663
      }
1664
      // The lock to be used for this identifier
1665
      Lock lock = null;
1666
      SystemMetadata systemMetadata = null;
1667
      
1668
      boolean success = false;
1669
      
1670
      // get the subject
1671
      Subject subject = session.getSubject();
1672
      
1673
      // are we allowed to do this?
1674
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1675
          throw new NotAuthorized("4420", "not allowed by "
1676
                  + subject.getValue() + " on " + pid.getValue());
1677
      }
1678
      
1679
      try {
1680
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1681
          lock.lock();
1682
          logMetacat.debug("Locked identifier " + pid.getValue());
1683

    
1684
          try {
1685
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1686

    
1687
              if ( systemMetadata == null ) {
1688
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1689
                  
1690
              }
1691
              // does the request have the most current system metadata?
1692
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1693
                 String msg = "The requested system metadata version number " + 
1694
                     serialVersion + " differs from the current version at " +
1695
                     systemMetadata.getSerialVersion().longValue() +
1696
                     ". Please get the latest copy in order to modify it.";
1697
                 throw new VersionMismatch("4402", msg);
1698
                 
1699
              }
1700
              
1701
          } catch (RuntimeException e) {
1702
              // convert Hazelcast RuntimeException to NotFound
1703
              throw new NotFound("4400", "No record found for: " + pid);
1704
            
1705
          }
1706
              
1707
          // set the access policy
1708
          systemMetadata.setAccessPolicy(accessPolicy);
1709
          
1710
          // update the system metadata
1711
          try {
1712
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1713
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1714
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1715
              notifyReplicaNodes(systemMetadata);
1716
              
1717
          } catch (RuntimeException e) {
1718
              // convert Hazelcast RuntimeException to ServiceFailure
1719
              throw new ServiceFailure("4430", e.getMessage());
1720
            
1721
          }
1722
          
1723
      } catch (RuntimeException e) {
1724
          throw new ServiceFailure("4430", e.getMessage());
1725
          
1726
      } finally {
1727
          lock.unlock();
1728
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1729
        
1730
      }
1731

    
1732
    
1733
    // TODO: how do we know if the map was persisted?
1734
    success = true;
1735
    
1736
    return success;
1737
  }
1738

    
1739
  /**
1740
   * Full replacement of replication metadata in the system metadata for the 
1741
   * specified object, changes date system metadata modified
1742
   * 
1743
   * @param session - the Session object containing the credentials for the Subject
1744
   * @param pid - the object identifier for the given object to apply the policy
1745
   * @param replica - the replica to be updated
1746
   * @return
1747
   * @throws NotImplemented
1748
   * @throws NotAuthorized
1749
   * @throws ServiceFailure
1750
   * @throws InvalidRequest
1751
   * @throws NotFound
1752
   * @throws VersionMismatch
1753
   */
1754
  @Override
1755
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1756
      Replica replica, long serialVersion) 
1757
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1758
      NotFound, VersionMismatch {
1759
      
1760
      // The lock to be used for this identifier
1761
      Lock lock = null;
1762
      
1763
      // get the subject
1764
      Subject subject = session.getSubject();
1765
      
1766
      // are we allowed to do this?
1767
      try {
1768

    
1769
          // what is the controlling permission?
1770
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1771
              throw new NotAuthorized("4851", "not allowed by "
1772
                      + subject.getValue() + " on " + pid.getValue());
1773
          }
1774

    
1775
        
1776
      } catch (InvalidToken e) {
1777
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1778
                  " on " + pid.getValue());  
1779
          
1780
      }
1781

    
1782
      SystemMetadata systemMetadata = null;
1783
      try {
1784
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1785
          lock.lock();
1786
          logMetacat.debug("Locked identifier " + pid.getValue());
1787

    
1788
          try {      
1789
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1790

    
1791
              // does the request have the most current system metadata?
1792
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1793
                 String msg = "The requested system metadata version number " + 
1794
                     serialVersion + " differs from the current version at " +
1795
                     systemMetadata.getSerialVersion().longValue() +
1796
                     ". Please get the latest copy in order to modify it.";
1797
                 throw new VersionMismatch("4855", msg);
1798
              }
1799
              
1800
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1801
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1802
                  " : " + e.getMessage());
1803
            
1804
          }
1805
              
1806
          // set the status for the replica
1807
          List<Replica> replicas = systemMetadata.getReplicaList();
1808
          NodeReference replicaNode = replica.getReplicaMemberNode();
1809
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1810
          int index = 0;
1811
          for (Replica listedReplica: replicas) {
1812
              
1813
              // remove the replica that we are replacing
1814
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1815
                      // don't allow status to change from COMPLETED to anything other
1816
                      // than INVALIDATED: prevents overwrites from race conditions
1817
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1818
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1819
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1820
                	  throw new InvalidRequest("4853", "Status state change from " +
1821
                			  listedReplica.getReplicationStatus() + " to " +
1822
                			  replicaStatus.toString() + "is prohibited for identifier " +
1823
                			  pid.getValue() + " and target node " + 
1824
                			  listedReplica.getReplicaMemberNode().getValue());
1825

    
1826
            	  }
1827
                  replicas.remove(index);
1828
                  break;
1829
                  
1830
              }
1831
              index++;
1832
          }
1833
          
1834
          // add the new replica item
1835
          replicas.add(replica);
1836
          systemMetadata.setReplicaList(replicas);
1837
          
1838
          // update the metadata
1839
          try {
1840
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1841
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1842
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1843
              
1844
              // inform replica nodes of the change if the status is complete
1845
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1846
            	  notifyReplicaNodes(systemMetadata);
1847
            	  
1848
              }
1849
          } catch (RuntimeException e) {
1850
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1851
              throw new ServiceFailure("4852", e.getMessage());
1852
          
1853
          }
1854
          
1855
      } catch (RuntimeException e) {
1856
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1857
          throw new ServiceFailure("4852", e.getMessage());
1858
      
1859
      } finally {
1860
          lock.unlock();
1861
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1862
          
1863
      }
1864
    
1865
      return true;
1866
      
1867
  }
1868
  
1869
  /**
1870
   * 
1871
   */
1872
  @Override
1873
  public ObjectList listObjects(Session session, Date startTime, 
1874
      Date endTime, ObjectFormatIdentifier formatid, Identifier identifier, Boolean replicaStatus,
1875
      Integer start, Integer count)
1876
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1877
      ServiceFailure {
1878
      
1879
      return super.listObjects(session, startTime, endTime, formatid, identifier, replicaStatus, start, count);
1880
  }
1881

    
1882
  
1883
 	/**
1884
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1885
 	 * @return cal  the list of checksum algorithms
1886
 	 * 
1887
 	 * @throws ServiceFailure
1888
 	 * @throws NotImplemented
1889
 	 */
1890
  @Override
1891
  public ChecksumAlgorithmList listChecksumAlgorithms()
1892
			throws ServiceFailure, NotImplemented {
1893
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1894
		cal.addAlgorithm("MD5");
1895
		cal.addAlgorithm("SHA-1");
1896
		return cal;
1897
		
1898
	}
1899

    
1900
  /**
1901
   * Notify replica Member Nodes of system metadata changes for a given pid
1902
   * 
1903
   * @param currentSystemMetadata - the up to date system metadata
1904
   */
1905
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1906
      
1907
      Session session = null;
1908
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1909
      MNode mn = null;
1910
      NodeReference replicaNodeRef = null;
1911
      CNode cn = null;
1912
      NodeType nodeType = null;
1913
      List<Node> nodeList = null;
1914
      
1915
      try {
1916
          cn = D1Client.getCN();
1917
          nodeList = cn.listNodes().getNodeList();
1918
          
1919
      } catch (Exception e) { // handle BaseException and other I/O issues
1920
          
1921
          // swallow errors since the call is not critical
1922
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1923
              "to communication issues with the CN: " + e.getMessage());
1924
          
1925
      }
1926
      
1927
      if ( replicaList != null ) {
1928
          
1929
          // iterate through the replicas and inform  MN replica nodes
1930
          for (Replica replica : replicaList) {
1931
              
1932
              replicaNodeRef = replica.getReplicaMemberNode();
1933
              try {
1934
                  if (nodeList != null) {
1935
                      // find the node type
1936
                      for (Node node : nodeList) {
1937
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
1938
                              nodeType = node.getType();
1939
                              break;
1940
              
1941
                          }
1942
                      }
1943
                  }
1944
              
1945
                  // notify only MNs
1946
                  if (nodeType != null && nodeType == NodeType.MN) {
1947
                      mn = D1Client.getMN(replicaNodeRef);
1948
                      mn.systemMetadataChanged(session, 
1949
                          currentSystemMetadata.getIdentifier(), 
1950
                          currentSystemMetadata.getSerialVersion().longValue(),
1951
                          currentSystemMetadata.getDateSysMetadataModified());
1952
                  }
1953
              
1954
              } catch (Exception e) { // handle BaseException and other I/O issues
1955
              
1956
                  // swallow errors since the call is not critical
1957
                  logMetacat.error("Can't inform "
1958
                          + replicaNodeRef.getValue()
1959
                          + " of system metadata changes due "
1960
                          + "to communication issues with the CN: "
1961
                          + e.getMessage());
1962
              
1963
              }
1964
          }
1965
      }
1966
  }
1967

    
1968
	@Override
1969
	public QueryEngineDescription getQueryEngineDescription(Session session,
1970
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
1971
			NotImplemented, NotFound {
1972
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1973

    
1974
	}
1975
	
1976
	@Override
1977
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
1978
			ServiceFailure, NotAuthorized, NotImplemented {
1979
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1980

    
1981
	}
1982
	
1983
	@Override
1984
	public InputStream query(Session session, String queryEngine, String query)
1985
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1986
			NotImplemented, NotFound {
1987
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1988

    
1989
	}
1990
	
1991
	@Override
1992
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
1993
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
1994
	}
1995
	
1996
	
1997
    
1998
}
(1-1/7)