Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.sql.SQLException;
29
import java.util.ArrayList;
30
import java.util.Calendar;
31
import java.util.Date;
32
import java.util.List;
33
import java.util.concurrent.locks.Lock;
34

    
35
import javax.servlet.http.HttpServletRequest;
36

    
37
import org.apache.log4j.Logger;
38
import org.dataone.client.v2.CNode;
39
import org.dataone.client.v2.itk.D1Client;
40
import org.dataone.client.v2.MNode;
41
import org.dataone.service.cn.v2.CNAuthorization;
42
import org.dataone.service.cn.v2.CNCore;
43
import org.dataone.service.cn.v2.CNRead;
44
import org.dataone.service.cn.v2.CNReplication;
45
import org.dataone.service.exceptions.BaseException;
46
import org.dataone.service.exceptions.IdentifierNotUnique;
47
import org.dataone.service.exceptions.InsufficientResources;
48
import org.dataone.service.exceptions.InvalidRequest;
49
import org.dataone.service.exceptions.InvalidSystemMetadata;
50
import org.dataone.service.exceptions.InvalidToken;
51
import org.dataone.service.exceptions.NotAuthorized;
52
import org.dataone.service.exceptions.NotFound;
53
import org.dataone.service.exceptions.NotImplemented;
54
import org.dataone.service.exceptions.ServiceFailure;
55
import org.dataone.service.exceptions.UnsupportedType;
56
import org.dataone.service.exceptions.VersionMismatch;
57
import org.dataone.service.types.v1.AccessPolicy;
58
import org.dataone.service.types.v1.Checksum;
59
import org.dataone.service.types.v1.ChecksumAlgorithmList;
60
import org.dataone.service.types.v1.DescribeResponse;
61
import org.dataone.service.types.v1.Event;
62
import org.dataone.service.types.v1.Identifier;
63
import org.dataone.service.types.v2.Log;
64
import org.dataone.service.types.v2.Node;
65
import org.dataone.service.types.v2.NodeList;
66
import org.dataone.service.types.v1.NodeReference;
67
import org.dataone.service.types.v1.NodeType;
68
import org.dataone.service.types.v2.ObjectFormat;
69
import org.dataone.service.types.v1.ObjectFormatIdentifier;
70
import org.dataone.service.types.v2.ObjectFormatList;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v2.SystemMetadata;
80
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
81
import org.dataone.service.types.v1_1.QueryEngineDescription;
82
import org.dataone.service.types.v1_1.QueryEngineList;
83

    
84
import edu.ucsb.nceas.metacat.EventLog;
85
import edu.ucsb.nceas.metacat.IdentifierManager;
86
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
87
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
88
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
89

    
90
/**
91
 * Represents Metacat's implementation of the DataONE Coordinating Node 
92
 * service API. Methods implement the various CN* interfaces, and methods common
93
 * to both Member Node and Coordinating Node interfaces are found in the
94
 * D1NodeService super class.
95
 *
96
 */
97
public class CNodeService extends D1NodeService implements CNAuthorization,
98
    CNCore, CNRead, CNReplication {
99

    
100
  /* the logger instance */
101
  private Logger logMetacat = null;
102

    
103
  /**
104
   * singleton accessor
105
   */
106
  public static CNodeService getInstance(HttpServletRequest request) { 
107
    return new CNodeService(request);
108
  }
109
  
110
  /**
111
   * Constructor, private for singleton access
112
   */
113
  private CNodeService(HttpServletRequest request) {
114
    super(request);
115
    logMetacat = Logger.getLogger(CNodeService.class);
116
        
117
  }
118
    
119
  /**
120
   * Set the replication policy for an object given the object identifier
121
   * 
122
   * @param session - the Session object containing the credentials for the Subject
123
   * @param pid - the object identifier for the given object
124
   * @param policy - the replication policy to be applied
125
   * 
126
   * @return true or false
127
   * 
128
   * @throws NotImplemented
129
   * @throws NotAuthorized
130
   * @throws ServiceFailure
131
   * @throws InvalidRequest
132
   * @throws VersionMismatch
133
   * 
134
   */
135
  @Override
136
  public boolean setReplicationPolicy(Session session, Identifier pid,
137
      ReplicationPolicy policy, long serialVersion) 
138
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
139
      InvalidRequest, InvalidToken, VersionMismatch {
140
      
141
      // do we have a valid pid?
142
      if (pid == null || pid.getValue().trim().equals("")) {
143
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
144
          
145
      }
146
      
147
      /*String serviceFailureCode = "4882";
148
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
149
      if(sid != null) {
150
          pid = sid;
151
      }*/
152
      // The lock to be used for this identifier
153
      Lock lock = null;
154
      
155
      // get the subject
156
      Subject subject = session.getSubject();
157
      
158
      // are we allowed to do this?
159
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
160
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
161
                  + " not allowed by " + subject.getValue() + " on "
162
                  + pid.getValue());
163
          
164
      }
165
      
166
      SystemMetadata systemMetadata = null;
167
      try {
168
          lock = HazelcastService.getInstance().getLock(pid.getValue());
169
          lock.lock();
170
          logMetacat.debug("Locked identifier " + pid.getValue());
171

    
172
          try {
173
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
174
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
175
                  
176
              }
177
              
178
              // did we get it correctly?
179
              if ( systemMetadata == null ) {
180
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
181
                  
182
              }
183

    
184
              // does the request have the most current system metadata?
185
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
186
                 String msg = "The requested system metadata version number " + 
187
                     serialVersion + " differs from the current version at " +
188
                     systemMetadata.getSerialVersion().longValue() +
189
                     ". Please get the latest copy in order to modify it.";
190
                 throw new VersionMismatch("4886", msg);
191
                 
192
              }
193
              
194
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
195
              throw new NotFound("4884", "No record found for: " + pid.getValue());
196
            
197
          }
198
          
199
          // set the new policy
200
          systemMetadata.setReplicationPolicy(policy);
201
          
202
          // update the metadata
203
          try {
204
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
205
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
206
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
207
              notifyReplicaNodes(systemMetadata);
208
              
209
          } catch (RuntimeException e) {
210
              throw new ServiceFailure("4882", e.getMessage());
211
          
212
          }
213
          
214
      } catch (RuntimeException e) {
215
          throw new ServiceFailure("4882", e.getMessage());
216
          
217
      } finally {
218
          lock.unlock();
219
          logMetacat.debug("Unlocked identifier " + pid.getValue());
220
          
221
      }
222
    
223
      return true;
224
  }
225

    
226
  /**
227
   * Deletes the replica from the given Member Node
228
   * NOTE: MN.delete() may be an "archive" operation. TBD.
229
   * @param session
230
   * @param pid
231
   * @param nodeId
232
   * @param serialVersion
233
   * @return
234
   * @throws InvalidToken
235
   * @throws ServiceFailure
236
   * @throws NotAuthorized
237
   * @throws NotFound
238
   * @throws NotImplemented
239
   * @throws VersionMismatch
240
   */
241
  @Override
242
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
243
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
244
	  
245
	  	// The lock to be used for this identifier
246
		Lock lock = null;
247

    
248
		// get the subject
249
		Subject subject = session.getSubject();
250

    
251
		// are we allowed to do this?
252
		boolean isAuthorized = false;
253
		try {
254
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
255
		} catch (InvalidRequest e) {
256
			throw new ServiceFailure("4882", e.getDescription());
257
		}
258
		if (!isAuthorized) {
259
			throw new NotAuthorized("4881", Permission.WRITE
260
					+ " not allowed by " + subject.getValue() + " on "
261
					+ pid.getValue());
262

    
263
		}
264

    
265
		SystemMetadata systemMetadata = null;
266
		try {
267
			lock = HazelcastService.getInstance().getLock(pid.getValue());
268
			lock.lock();
269
			logMetacat.debug("Locked identifier " + pid.getValue());
270

    
271
			try {
272
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
273
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
274
				}
275

    
276
				// did we get it correctly?
277
				if (systemMetadata == null) {
278
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
279
				}
280

    
281
				// does the request have the most current system metadata?
282
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
283
					String msg = "The requested system metadata version number "
284
							+ serialVersion
285
							+ " differs from the current version at "
286
							+ systemMetadata.getSerialVersion().longValue()
287
							+ ". Please get the latest copy in order to modify it.";
288
					throw new VersionMismatch("4886", msg);
289

    
290
				}
291

    
292
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
293
				throw new NotFound("4884", "No record found for: " + pid.getValue());
294

    
295
			}
296
			  
297
			// check permissions
298
			// TODO: is this necessary?
299
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
300
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
301
			if (!isAllowed) {
302
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
303
			}
304
			  
305
			// delete the replica from the given node
306
			// CSJ: use CN.delete() to truly delete a replica, semantically
307
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
308
			//D1Client.getMN(nodeId).delete(session, pid);
309
			  
310
			// reflect that change in the system metadata
311
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
312
			for (Replica r: systemMetadata.getReplicaList()) {
313
				  if (r.getReplicaMemberNode().equals(nodeId)) {
314
					  updatedReplicas.remove(r);
315
					  break;
316
				  }
317
			}
318
			systemMetadata.setReplicaList(updatedReplicas);
319

    
320
			// update the metadata
321
			try {
322
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
323
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
324
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
325
			} catch (RuntimeException e) {
326
				throw new ServiceFailure("4882", e.getMessage());
327
			}
328

    
329
		} catch (RuntimeException e) {
330
			throw new ServiceFailure("4882", e.getMessage());
331
		} finally {
332
			lock.unlock();
333
			logMetacat.debug("Unlocked identifier " + pid.getValue());
334
		}
335

    
336
		return true;	  
337
	  
338
  }
339
  
340
  /**
341
   * Deletes an object from the Coordinating Node
342
   * 
343
   * @param session - the Session object containing the credentials for the Subject
344
   * @param pid - The object identifier to be deleted
345
   * 
346
   * @return pid - the identifier of the object used for the deletion
347
   * 
348
   * @throws InvalidToken
349
   * @throws ServiceFailure
350
   * @throws NotAuthorized
351
   * @throws NotFound
352
   * @throws NotImplemented
353
   * @throws InvalidRequest
354
   */
355
  @Override
356
  public Identifier delete(Session session, Identifier pid) 
357
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
358
      
359
      String localId = null;      // The corresponding docid for this pid
360
	  Lock lock = null;           // The lock to be used for this identifier
361
      CNode cn = null;            // a reference to the CN to get the node list    
362
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
363
      List<Node> nodeList = null; // the list of nodes in this CN environment
364

    
365
      // check for a valid session
366
      if (session == null) {
367
        	throw new InvalidToken("4963", "No session has been provided");
368
        	
369
      }
370

    
371
      // do we have a valid pid?
372
      if (pid == null || pid.getValue().trim().equals("")) {
373
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
374
          
375
      }
376
      
377
      String serviceFailureCode = "4962";
378
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
379
      if(sid != null) {
380
          pid = sid;
381
      }
382

    
383
	  // check that it is CN/admin
384
	  boolean allowed = isAdminAuthorized(session);
385
	  
386
	  // additional check if it is the authoritative node if it is not the admin
387
      if(!allowed) {
388
          allowed = isAuthoritativeMNodeAdmin(session, pid);
389
          
390
      }
391
	  
392
	  if (!allowed) {
393
		  String msg = "The subject " + session.getSubject().getValue() + 
394
			  " is not allowed to call delete() on a Coordinating Node.";
395
		  logMetacat.info(msg);
396
		  throw new NotAuthorized("4960", msg);
397
		  
398
	  }
399
	  
400
	  // Don't defer to superclass implementation without a locally registered identifier
401
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
402
      // Check for the existing identifier
403
      try {
404
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
405
          super.delete(session, pid);
406
          
407
      } catch (McdbDocNotFoundException e) {
408
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
409
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
410
    	  
411
          try {
412
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
413
  			  lock.lock();
414
  			  logMetacat.debug("Locked identifier " + pid.getValue());
415

    
416
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
417
			  if ( sysMeta != null ) {
418
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
419
				sysMeta.setArchived(true);
420
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
421
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
422
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
423
	            //since this is cn, we don't need worry about the mn solr index.
424
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
425
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
426
	            String username = session.getSubject().getValue();//just for logging purpose
427
                //since data objects were not registered in the identifier table, we use pid as the docid
428
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
429
				
430
			  } else {
431
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
432
					  ". Couldn't obtain the system metadata record.");
433
				  
434
			  }
435
			  
436
		  } catch (RuntimeException re) {
437
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
438
				  ". The error message was: " + re.getMessage());
439
			  
440
		  } finally {
441
			  lock.unlock();
442
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
443

    
444
		  }
445

    
446
          // NOTE: cannot log the delete without localId
447
//          EventLog.getInstance().log(request.getRemoteAddr(), 
448
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
449
//                  pid.getValue(), Event.DELETE.xmlValue());
450

    
451
      } catch (SQLException e) {
452
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
453
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
454
      }
455

    
456
      // get the node list
457
      try {
458
          cn = D1Client.getCN();
459
          nodeList = cn.listNodes().getNodeList();
460
          
461
      } catch (Exception e) { // handle BaseException and other I/O issues
462
          
463
          // swallow errors since the call is not critical
464
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
465
              " due to communication issues with the CN: " + e.getMessage());
466
          
467
      }
468

    
469
	  // notify the replicas
470
	  if (systemMetadata.getReplicaList() != null) {
471
		  for (Replica replica: systemMetadata.getReplicaList()) {
472
			  NodeReference replicaNode = replica.getReplicaMemberNode();
473
			  try {
474
                  if (nodeList != null) {
475
                      // find the node type
476
                      for (Node node : nodeList) {
477
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
478
                              nodeType = node.getType();
479
                              break;
480
              
481
                          }
482
                      }
483
                  }
484
                  
485
                  // only send call MN.delete() to avoid an infinite loop with the CN
486
                  if (nodeType != null && nodeType == NodeType.MN) {
487
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
488
                  }
489
                  
490
			  } catch (Exception e) {
491
				  // all we can really do is log errors and carry on with life
492
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
493
					  " from replica MN: " + replicaNode.getValue(), e);
494
			}
495
			  
496
		  }
497
	  }
498
	  
499
	  return pid;
500
      
501
  }
502
  
503
  /**
504
   * Archives an object from the Coordinating Node
505
   * 
506
   * @param session - the Session object containing the credentials for the Subject
507
   * @param pid - The object identifier to be deleted
508
   * 
509
   * @return pid - the identifier of the object used for the deletion
510
   * 
511
   * @throws InvalidToken
512
   * @throws ServiceFailure
513
   * @throws NotAuthorized
514
   * @throws NotFound
515
   * @throws NotImplemented
516
   * @throws InvalidRequest
517
   */
518
  @Override
519
  public Identifier archive(Session session, Identifier pid) 
520
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
521

    
522
      String localId = null; // The corresponding docid for this pid
523
	  Lock lock = null;      // The lock to be used for this identifier
524
      CNode cn = null;            // a reference to the CN to get the node list    
525
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
526
      List<Node> nodeList = null; // the list of nodes in this CN environment
527
      
528

    
529
      // check for a valid session
530
      if (session == null) {
531
        	throw new InvalidToken("4973", "No session has been provided");
532
        	
533
      }
534

    
535
      // do we have a valid pid?
536
      if (pid == null || pid.getValue().trim().equals("")) {
537
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
538
          
539
      }
540

    
541
	  // check that it is CN/admin
542
	  boolean allowed = isAdminAuthorized(session);
543
	  
544
	  String serviceFailureCode = "4972";
545
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
546
	  if(sid != null) {
547
	        pid = sid;
548
	  }
549
	  
550
	  //check if it is the authoritative member node
551
	  if(!allowed) {
552
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
553
	  }
554
	  
555
	  if (!allowed) {
556
		  String msg = "The subject " + session.getSubject().getValue() + 
557
				  " is not allowed to call archive() on a Coordinating Node.";
558
		  logMetacat.info(msg);
559
		  throw new NotAuthorized("4970", msg);
560
	  }
561
	  
562
      // Check for the existing identifier
563
      try {
564
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
565
          super.archive(session, pid);
566
          
567
      } catch (McdbDocNotFoundException e) {
568
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
569
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
570
    	  
571
          try {
572
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
573
  			  lock.lock();
574
  			  logMetacat.debug("Locked identifier " + pid.getValue());
575

    
576
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
577
			  if ( sysMeta != null ) {
578
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
579
				sysMeta.setArchived(true);
580
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
581
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
582
			    // notify the replicas
583
				notifyReplicaNodes(sysMeta);
584
				  
585
			  } else {
586
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
587
					  ". Couldn't obtain the system metadata record.");
588
				  
589
			  }
590
			  
591
		  } catch (RuntimeException re) {
592
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
593
				  ". The error message was: " + re.getMessage());
594
			  
595
		  } finally {
596
			  lock.unlock();
597
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
598

    
599
		  }
600

    
601
          // NOTE: cannot log the archive without localId
602
//          EventLog.getInstance().log(request.getRemoteAddr(), 
603
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
604
//                  pid.getValue(), Event.DELETE.xmlValue());
605

    
606
      } catch (SQLException e) {
607
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
608
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
609
      }
610

    
611
	  return pid;
612
      
613
  }
614
  
615
  /**
616
   * Set the obsoletedBy attribute in System Metadata
617
   * @param session
618
   * @param pid
619
   * @param obsoletedByPid
620
   * @param serialVersion
621
   * @return
622
   * @throws NotImplemented
623
   * @throws NotFound
624
   * @throws NotAuthorized
625
   * @throws ServiceFailure
626
   * @throws InvalidRequest
627
   * @throws InvalidToken
628
   * @throws VersionMismatch
629
   */
630
  @Override
631
  public boolean setObsoletedBy(Session session, Identifier pid,
632
			Identifier obsoletedByPid, long serialVersion)
633
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
634
			InvalidRequest, InvalidToken, VersionMismatch {
635
      
636
      // do we have a valid pid?
637
      if (pid == null || pid.getValue().trim().equals("")) {
638
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
639
          
640
      }
641
      
642
      /*String serviceFailureCode = "4941";
643
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
644
      if(sid != null) {
645
          pid = sid;
646
      }*/
647
      
648
      // do we have a valid pid?
649
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
650
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
651
          
652
      }
653
      
654
      try {
655
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
656
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
657
          }
658
      } catch (SQLException ee) {
659
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
660
      }
661
      
662

    
663
		// The lock to be used for this identifier
664
		Lock lock = null;
665

    
666
		// get the subject
667
		Subject subject = session.getSubject();
668

    
669
		// are we allowed to do this?
670
		if (!isAuthorized(session, pid, Permission.WRITE)) {
671
			throw new NotAuthorized("4881", Permission.WRITE
672
					+ " not allowed by " + subject.getValue() + " on "
673
					+ pid.getValue());
674

    
675
		}
676

    
677

    
678
		SystemMetadata systemMetadata = null;
679
		try {
680
			lock = HazelcastService.getInstance().getLock(pid.getValue());
681
			lock.lock();
682
			logMetacat.debug("Locked identifier " + pid.getValue());
683

    
684
			try {
685
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
686
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
687
				}
688

    
689
				// did we get it correctly?
690
				if (systemMetadata == null) {
691
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
692
				}
693

    
694
				// does the request have the most current system metadata?
695
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
696
					String msg = "The requested system metadata version number "
697
							+ serialVersion
698
							+ " differs from the current version at "
699
							+ systemMetadata.getSerialVersion().longValue()
700
							+ ". Please get the latest copy in order to modify it.";
701
					throw new VersionMismatch("4886", msg);
702

    
703
				}
704

    
705
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
706
				throw new NotFound("4884", "No record found for: " + pid.getValue());
707

    
708
			}
709

    
710
			// set the new policy
711
			systemMetadata.setObsoletedBy(obsoletedByPid);
712

    
713
			// update the metadata
714
			try {
715
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
716
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
717
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
718
			} catch (RuntimeException e) {
719
				throw new ServiceFailure("4882", e.getMessage());
720
			}
721

    
722
		} catch (RuntimeException e) {
723
			throw new ServiceFailure("4882", e.getMessage());
724
		} finally {
725
			lock.unlock();
726
			logMetacat.debug("Unlocked identifier " + pid.getValue());
727
		}
728

    
729
		return true;
730
	}
731
  
732
  
733
  /**
734
   * Set the replication status for an object given the object identifier
735
   * 
736
   * @param session - the Session object containing the credentials for the Subject
737
   * @param pid - the object identifier for the given object
738
   * @param status - the replication status to be applied
739
   * 
740
   * @return true or false
741
   * 
742
   * @throws NotImplemented
743
   * @throws NotAuthorized
744
   * @throws ServiceFailure
745
   * @throws InvalidRequest
746
   * @throws InvalidToken
747
   * @throws NotFound
748
   * 
749
   */
750
  @Override
751
  public boolean setReplicationStatus(Session session, Identifier pid,
752
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
753
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
754
      InvalidRequest, NotFound {
755
	  
756
	  // cannot be called by public
757
	  if (session == null) {
758
		  throw new NotAuthorized("4720", "Session cannot be null");
759
	  }
760
	  
761
	// do we have a valid pid?
762
      if (pid == null || pid.getValue().trim().equals("")) {
763
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
764
          
765
      }
766
      
767
      /*String serviceFailureCode = "4700";
768
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
769
      if(sid != null) {
770
          pid = sid;
771
      }*/
772
      
773
      // The lock to be used for this identifier
774
      Lock lock = null;
775
      
776
      boolean allowed = false;
777
      int replicaEntryIndex = -1;
778
      List<Replica> replicas = null;
779
      // get the subject
780
      Subject subject = session.getSubject();
781
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
782
          " is " + status.toString());
783
      
784
      SystemMetadata systemMetadata = null;
785

    
786
      try {
787
          lock = HazelcastService.getInstance().getLock(pid.getValue());
788
          lock.lock();
789
          logMetacat.debug("Locked identifier " + pid.getValue());
790

    
791
          try {      
792
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
793

    
794
              // did we get it correctly?
795
              if ( systemMetadata == null ) {
796
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
797
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
798
                  
799
              }
800
              replicas = systemMetadata.getReplicaList();
801
              int count = 0;
802
              
803
              // was there a failure? log it
804
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
805
                 String msg = "The replication request of the object identified by " + 
806
                     pid.getValue() + " failed.  The error message was " +
807
                     failure.getMessage() + ".";
808
              }
809
              
810
              if (replicas.size() > 0 && replicas != null) {
811
                  // find the target replica index in the replica list
812
                  for (Replica replica : replicas) {
813
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
814
                      String targetNodeStr = targetNode.getValue();
815
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
816
                  
817
                      if (replicaNodeStr.equals(targetNodeStr)) {
818
                          replicaEntryIndex = count;
819
                          logMetacat.debug("replica entry index is: "
820
                                  + replicaEntryIndex);
821
                          break;
822
                      }
823
                      count++;
824
                  
825
                  }
826
              }
827
              // are we allowed to do this? only CNs and target MNs are allowed
828
              CNode cn = D1Client.getCN();
829
              List<Node> nodes = cn.listNodes().getNodeList();
830
              
831
              // find the node in the node list
832
              for ( Node node : nodes ) {
833
                  
834
                  NodeReference nodeReference = node.getIdentifier();
835
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
836
                      nodeReference.getValue());
837
                  
838
                  // allow target MN certs
839
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
840
                      node.getType().equals(NodeType.MN)) {
841
                      List<Subject> nodeSubjects = node.getSubjectList();
842
                      
843
                      // check if the session subject is in the node subject list
844
                      for (Subject nodeSubject : nodeSubjects) {
845
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
846
                                  nodeSubject.getValue() + " and " + subject.getValue());
847
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
848
                              
849
                              // lastly limit to COMPLETED, INVALIDATED,
850
                              // and FAILED status updates from MNs only
851
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
852
                                   status.equals(ReplicationStatus.INVALIDATED) ||
853
                                   status.equals(ReplicationStatus.FAILED)) {
854
                                  allowed = true;
855
                                  break;
856
                                  
857
                              }                              
858
                          }
859
                      }                 
860
                  }
861
              }
862

    
863
              if ( !allowed ) {
864
                  //check for CN admin access
865
                  allowed = isAuthorized(session, pid, Permission.WRITE);
866
                  
867
              }              
868
              
869
              if ( !allowed ) {
870
                  String msg = "The subject identified by "
871
                          + subject.getValue()
872
                          + " does not have permission to set the replication status for "
873
                          + "the replica identified by "
874
                          + targetNode.getValue() + ".";
875
                  logMetacat.info(msg);
876
                  throw new NotAuthorized("4720", msg);
877
                  
878
              }
879

    
880
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
881
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
882
                " : " + e.getMessage());
883
            
884
          }
885
          
886
          Replica targetReplica = new Replica();
887
          // set the status for the replica
888
          if ( replicaEntryIndex != -1 ) {
889
              targetReplica = replicas.get(replicaEntryIndex);
890
              
891
              // don't allow status to change from COMPLETED to anything other
892
              // than INVALIDATED: prevents overwrites from race conditions
893
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
894
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
895
            	  throw new InvalidRequest("4730", "Status state change from " +
896
            			  targetReplica.getReplicationStatus() + " to " +
897
            			  status.toString() + "is prohibited for identifier " +
898
            			  pid.getValue() + " and target node " + 
899
            			  targetReplica.getReplicaMemberNode().getValue());
900
              }
901
              
902
              targetReplica.setReplicationStatus(status);
903
              
904
              logMetacat.debug("Set the replication status for " + 
905
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
906
                  targetReplica.getReplicationStatus() + " for identifier " +
907
                  pid.getValue());
908
              
909
          } else {
910
              // this is a new entry, create it
911
              targetReplica.setReplicaMemberNode(targetNode);
912
              targetReplica.setReplicationStatus(status);
913
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
914
              replicas.add(targetReplica);
915
              
916
          }
917
          
918
          systemMetadata.setReplicaList(replicas);
919
                
920
          // update the metadata
921
          try {
922
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
923
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
924
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
925

    
926
              if ( !status.equals(ReplicationStatus.QUEUED) && 
927
            	   !status.equals(ReplicationStatus.REQUESTED)) {
928
                  
929
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
930
                          "\tNODE:\t" + targetNode.getValue() + 
931
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
932
                
933
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
934
                          "\tPID:\t"  + pid.getValue() + 
935
                          "\tNODE:\t" + targetNode.getValue() + 
936
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
937
              }
938

    
939
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
940
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
941
                      " on target node " + targetNode + ". The exception was: " +
942
                      failure.getMessage());
943
              }
944
              
945
			  // update the replica nodes about the completed replica when complete
946
              if (status.equals(ReplicationStatus.COMPLETED)) {
947
				notifyReplicaNodes(systemMetadata);
948
			}
949
          
950
          } catch (RuntimeException e) {
951
              throw new ServiceFailure("4700", e.getMessage());
952
          
953
          }
954
          
955
    } catch (RuntimeException e) {
956
        String msg = "There was a RuntimeException getting the lock for " +
957
            pid.getValue();
958
        logMetacat.info(msg);
959
        
960
    } finally {
961
        lock.unlock();
962
        logMetacat.debug("Unlocked identifier " + pid.getValue());
963
        
964
    }
965
      
966
      return true;
967
  }
968
  
969
/**
970
   * Return the checksum of the object given the identifier 
971
   * 
972
   * @param session - the Session object containing the credentials for the Subject
973
   * @param pid - the object identifier for the given object
974
   * 
975
   * @return checksum - the checksum of the object
976
   * 
977
   * @throws InvalidToken
978
   * @throws ServiceFailure
979
   * @throws NotAuthorized
980
   * @throws NotFound
981
   * @throws NotImplemented
982
   */
983
  @Override
984
  public Checksum getChecksum(Session session, Identifier pid)
985
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
986
    NotImplemented {
987
    
988
	boolean isAuthorized = false;
989
	try {
990
		isAuthorized = isAuthorized(session, pid, Permission.READ);
991
	} catch (InvalidRequest e) {
992
		throw new ServiceFailure("1410", e.getDescription());
993
	}  
994
    if (!isAuthorized) {
995
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
996
    }
997
    
998
    SystemMetadata systemMetadata = null;
999
    Checksum checksum = null;
1000
    
1001
    try {
1002
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1003

    
1004
        if (systemMetadata == null ) {
1005
            String error ="";
1006
            String localId = null;
1007
            try {
1008
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1009
              
1010
             } catch (Exception e) {
1011
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1012
            }
1013
            
1014
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1015
                error = DELETEDMESSAGE;
1016
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1017
                error = DELETEDMESSAGE;
1018
            }
1019
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1020
        }
1021
        checksum = systemMetadata.getChecksum();
1022
        
1023
    } catch (RuntimeException e) {
1024
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1025
            pid.getValue() + ". The error message was: " + e.getMessage());
1026
      
1027
    }
1028
    
1029
    return checksum;
1030
  }
1031

    
1032
  /**
1033
   * Resolve the location of a given object
1034
   * 
1035
   * @param session - the Session object containing the credentials for the Subject
1036
   * @param pid - the object identifier for the given object
1037
   * 
1038
   * @return objectLocationList - the list of nodes known to contain the object
1039
   * 
1040
   * @throws InvalidToken
1041
   * @throws ServiceFailure
1042
   * @throws NotAuthorized
1043
   * @throws NotFound
1044
   * @throws NotImplemented
1045
   */
1046
  @Override
1047
  public ObjectLocationList resolve(Session session, Identifier pid)
1048
    throws InvalidToken, ServiceFailure, NotAuthorized,
1049
    NotFound, NotImplemented {
1050

    
1051
    throw new NotImplemented("4131", "resolve not implemented");
1052

    
1053
  }
1054

    
1055
  /**
1056
   * Metacat does not implement this method at the CN level
1057
   */
1058
  @Override
1059
  public ObjectList search(Session session, String queryType, String query)
1060
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1061
    NotImplemented {
1062

    
1063
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1064
	  
1065
//    ObjectList objectList = null;
1066
//    try {
1067
//        objectList = 
1068
//          IdentifierManager.getInstance().querySystemMetadata(
1069
//              null, //startTime, 
1070
//              null, //endTime,
1071
//              null, //objectFormat, 
1072
//              false, //replicaStatus, 
1073
//              0, //start, 
1074
//              1000 //count
1075
//              );
1076
//        
1077
//    } catch (Exception e) {
1078
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1079
//    }
1080
//
1081
//      return objectList;
1082
		  
1083
  }
1084
  
1085
  /**
1086
   * Returns the object format registered in the DataONE Object Format 
1087
   * Vocabulary for the given format identifier
1088
   * 
1089
   * @param fmtid - the identifier of the format requested
1090
   * 
1091
   * @return objectFormat - the object format requested
1092
   * 
1093
   * @throws ServiceFailure
1094
   * @throws NotFound
1095
   * @throws InsufficientResources
1096
   * @throws NotImplemented
1097
   */
1098
  @Override
1099
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1100
    throws ServiceFailure, NotFound, NotImplemented {
1101
     
1102
      return ObjectFormatService.getInstance().getFormat(fmtid);
1103
      
1104
  }
1105

    
1106
  /**
1107
   * Returns a list of all object formats registered in the DataONE Object 
1108
   * Format Vocabulary
1109
    * 
1110
   * @return objectFormatList - The list of object formats registered in 
1111
   *                            the DataONE Object Format Vocabulary
1112
   * 
1113
   * @throws ServiceFailure
1114
   * @throws NotImplemented
1115
   * @throws InsufficientResources
1116
   */
1117
  @Override
1118
  public ObjectFormatList listFormats() 
1119
    throws ServiceFailure, NotImplemented {
1120

    
1121
    return ObjectFormatService.getInstance().listFormats();
1122
  }
1123

    
1124
  /**
1125
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1126
    * 
1127
   * @return nodeList - List of nodes from the registry
1128
   * 
1129
   * @throws ServiceFailure
1130
   * @throws NotImplemented
1131
   */
1132
  @Override
1133
  public NodeList listNodes() 
1134
    throws NotImplemented, ServiceFailure {
1135

    
1136
    throw new NotImplemented("4800", "listNodes not implemented");
1137
  }
1138

    
1139
  /**
1140
   * Provides a mechanism for adding system metadata independently of its 
1141
   * associated object, such as when adding system metadata for data objects.
1142
    * 
1143
   * @param session - the Session object containing the credentials for the Subject
1144
   * @param pid - The identifier of the object to register the system metadata against
1145
   * @param sysmeta - The system metadata to be registered
1146
   * 
1147
   * @return true if the registration succeeds
1148
   * 
1149
   * @throws NotImplemented
1150
   * @throws NotAuthorized
1151
   * @throws ServiceFailure
1152
   * @throws InvalidRequest
1153
   * @throws InvalidSystemMetadata
1154
   */
1155
  @Override
1156
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1157
      SystemMetadata sysmeta) 
1158
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1159
      InvalidSystemMetadata {
1160

    
1161
      // The lock to be used for this identifier
1162
      Lock lock = null;
1163

    
1164
      // TODO: control who can call this?
1165
      if (session == null) {
1166
          //TODO: many of the thrown exceptions do not use the correct error codes
1167
          //check these against the docs and correct them
1168
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1169
                  "  If you are not logged in, please do so and retry the request.");
1170
      }
1171
      // the identifier can't be an SID
1172
      try {
1173
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1174
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1175
          }
1176
      } catch (SQLException sqle) {
1177
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1178
      }
1179
      
1180
      // verify that guid == SystemMetadata.getIdentifier()
1181
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1182
          "|" + sysmeta.getIdentifier().getValue());
1183
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1184
          throw new InvalidRequest("4863", 
1185
              "The identifier in method call (" + pid.getValue() + 
1186
              ") does not match identifier in system metadata (" +
1187
              sysmeta.getIdentifier().getValue() + ").");
1188
      }
1189
      
1190
      //check if the sid is legitimate in the system metadata
1191
      checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1192

    
1193
      try {
1194
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1195
          lock.lock();
1196
          logMetacat.debug("Locked identifier " + pid.getValue());
1197
          logMetacat.debug("Checking if identifier exists...");
1198
          // Check that the identifier does not already exist
1199
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1200
              throw new InvalidRequest("4863", 
1201
                  "The identifier is already in use by an existing object.");
1202
          
1203
          }
1204
          
1205
          // insert the system metadata into the object store
1206
          logMetacat.debug("Starting to insert SystemMetadata...");
1207
          try {
1208
              sysmeta.setSerialVersion(BigInteger.ONE);
1209
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1210
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1211
              
1212
          } catch (RuntimeException e) {
1213
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1214
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1215
                  e.getClass() + ": " + e.getMessage());
1216
              
1217
          }
1218
          
1219
      } catch (RuntimeException e) {
1220
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1221
                  e.getClass() + ": " + e.getMessage());
1222
          
1223
      }  finally {
1224
          lock.unlock();
1225
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1226
          
1227
      }
1228

    
1229
      
1230
      logMetacat.debug("Returning from registerSystemMetadata");
1231
      
1232
      try {
1233
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1234
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1235
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1236
    	          localId, "registerSystemMetadata");
1237
      } catch (McdbDocNotFoundException e) {
1238
    	  // do nothing, no localId to log with
1239
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1240
      } catch (SQLException ee) {
1241
          // do nothing, no localId to log with
1242
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1243
      }
1244
      
1245
      
1246
      return pid;
1247
  }
1248
  
1249
  /**
1250
   * Given an optional scope and format, reserves and returns an identifier 
1251
   * within that scope and format that is unique and will not be 
1252
   * used by any other sessions. 
1253
    * 
1254
   * @param session - the Session object containing the credentials for the Subject
1255
   * @param pid - The identifier of the object to register the system metadata against
1256
   * @param scope - An optional string to be used to qualify the scope of 
1257
   *                the identifier namespace, which is applied differently 
1258
   *                depending on the format requested. If scope is not 
1259
   *                supplied, a default scope will be used.
1260
   * @param format - The optional name of the identifier format to be used, 
1261
   *                  drawn from a DataONE-specific vocabulary of identifier 
1262
   *                 format names, including several common syntaxes such 
1263
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1264
   *                 format is not supplied by the caller, the CN service 
1265
   *                 will use a default identifier format, which may change 
1266
   *                 over time.
1267
   * 
1268
   * @return true if the registration succeeds
1269
   * 
1270
   * @throws InvalidToken
1271
   * @throws ServiceFailure
1272
   * @throws NotAuthorized
1273
   * @throws IdentifierNotUnique
1274
   * @throws NotImplemented
1275
   */
1276
  @Override
1277
  public Identifier reserveIdentifier(Session session, Identifier pid)
1278
  throws InvalidToken, ServiceFailure,
1279
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1280

    
1281
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1282
  }
1283
  
1284
  @Override
1285
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1286
  throws InvalidToken, ServiceFailure,
1287
        NotAuthorized, NotImplemented, InvalidRequest {
1288
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1289
  }
1290
  
1291
  /**
1292
    * Checks whether the pid is reserved by the subject in the session param
1293
    * If the reservation is held on the pid by the subject, we return true.
1294
    * 
1295
   * @param session - the Session object containing the Subject
1296
   * @param pid - The identifier to check
1297
   * 
1298
   * @return true if the reservation exists for the subject/pid
1299
   * 
1300
   * @throws InvalidToken
1301
   * @throws ServiceFailure
1302
   * @throws NotFound - when the pid is not found (in use or in reservation)
1303
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1304
   * @throws IdentifierNotUnique - when the pid is in use
1305
   * @throws NotImplemented
1306
   */
1307

    
1308
  @Override
1309
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1310
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1311
      NotImplemented, InvalidRequest {
1312
  
1313
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1314
  }
1315

    
1316
  /**
1317
   * Changes ownership (RightsHolder) of the specified object to the 
1318
   * subject specified by userId
1319
    * 
1320
   * @param session - the Session object containing the credentials for the Subject
1321
   * @param pid - Identifier of the object to be modified
1322
   * @param userId - The subject that will be taking ownership of the specified object.
1323
   *
1324
   * @return pid - the identifier of the modified object
1325
   * 
1326
   * @throws ServiceFailure
1327
   * @throws InvalidToken
1328
   * @throws NotFound
1329
   * @throws NotAuthorized
1330
   * @throws NotImplemented
1331
   * @throws InvalidRequest
1332
   */  
1333
  @Override
1334
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1335
      long serialVersion)
1336
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1337
      NotImplemented, InvalidRequest, VersionMismatch {
1338
      
1339
      // The lock to be used for this identifier
1340
      Lock lock = null;
1341

    
1342
      // get the subject
1343
      Subject subject = session.getSubject();
1344
      
1345
      String serviceFailureCode = "4490";
1346
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1347
      if(sid != null) {
1348
          pid = sid;
1349
      }
1350
      
1351
      // are we allowed to do this?
1352
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1353
          throw new NotAuthorized("4440", "not allowed by "
1354
                  + subject.getValue() + " on " + pid.getValue());
1355
          
1356
      }
1357
      
1358
      SystemMetadata systemMetadata = null;
1359
      try {
1360
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1361
          lock.lock();
1362
          logMetacat.debug("Locked identifier " + pid.getValue());
1363

    
1364
          try {
1365
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1366
              
1367
              // does the request have the most current system metadata?
1368
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1369
                 String msg = "The requested system metadata version number " + 
1370
                     serialVersion + " differs from the current version at " +
1371
                     systemMetadata.getSerialVersion().longValue() +
1372
                     ". Please get the latest copy in order to modify it.";
1373
                 throw new VersionMismatch("4443", msg);
1374
              }
1375
              
1376
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1377
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1378
              
1379
          }
1380
              
1381
          // set the new rights holder
1382
          systemMetadata.setRightsHolder(userId);
1383
          
1384
          // update the metadata
1385
          try {
1386
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1387
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1388
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1389
              notifyReplicaNodes(systemMetadata);
1390
              
1391
          } catch (RuntimeException e) {
1392
              throw new ServiceFailure("4490", e.getMessage());
1393
          
1394
          }
1395
          
1396
      } catch (RuntimeException e) {
1397
          throw new ServiceFailure("4490", e.getMessage());
1398
          
1399
      } finally {
1400
          lock.unlock();
1401
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1402
      
1403
      }
1404
      
1405
      return pid;
1406
  }
1407

    
1408
  /**
1409
   * Verify that a replication task is authorized by comparing the target node's
1410
   * Subject (from the X.509 certificate-derived Session) with the list of 
1411
   * subjects in the known, pending replication tasks map.
1412
   * 
1413
   * @param originatingNodeSession - Session information that contains the 
1414
   *                                 identity of the calling user
1415
   * @param targetNodeSubject - Subject identifying the target node
1416
   * @param pid - the identifier of the object to be replicated
1417
   * @param replicatePermission - the execute permission to be granted
1418
   * 
1419
   * @throws ServiceFailure
1420
   * @throws NotImplemented
1421
   * @throws InvalidToken
1422
   * @throws NotAuthorized
1423
   * @throws InvalidRequest
1424
   * @throws NotFound
1425
   */
1426
  @Override
1427
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1428
    Subject targetNodeSubject, Identifier pid) 
1429
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1430
    NotFound, InvalidRequest {
1431
    
1432
    boolean isAllowed = false;
1433
    SystemMetadata sysmeta = null;
1434
    NodeReference targetNode = null;
1435
    
1436
    try {
1437
      // get the target node reference from the nodes list
1438
      CNode cn = D1Client.getCN();
1439
      List<Node> nodes = cn.listNodes().getNodeList();
1440
      
1441
      if ( nodes != null ) {
1442
        for (Node node : nodes) {
1443
            
1444
        	if (node.getSubjectList() != null) {
1445
        		
1446
	            for (Subject nodeSubject : node.getSubjectList()) {
1447
	            	
1448
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1449
	                    targetNode = node.getIdentifier();
1450
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1451
	                    break;
1452
	                }
1453
	            }
1454
        	}
1455
            
1456
            if ( targetNode != null) { break; }
1457
        }
1458
        
1459
      } else {
1460
          String msg = "Couldn't get the node list from the CN";
1461
          logMetacat.debug(msg);
1462
          throw new ServiceFailure("4872", msg);
1463
          
1464
      }
1465
      
1466
      // can't find a node listed with the given subject
1467
      if ( targetNode == null ) {
1468
          String msg = "There is no Member Node registered with a node subject " +
1469
              "matching " + targetNodeSubject.getValue();
1470
          logMetacat.info(msg);
1471
          throw new NotAuthorized("4871", msg);
1472
          
1473
      }
1474
      
1475
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1476
      
1477
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1478

    
1479
      if ( sysmeta != null ) {
1480
          
1481
          List<Replica> replicaList = sysmeta.getReplicaList();
1482
          
1483
          if ( replicaList != null ) {
1484
              
1485
              // find the replica with the status set to 'requested'
1486
              for (Replica replica : replicaList) {
1487
                  ReplicationStatus status = replica.getReplicationStatus();
1488
                  NodeReference listedNode = replica.getReplicaMemberNode();
1489
                  if ( listedNode != null && targetNode != null ) {
1490
                      logMetacat.debug("Comparing " + listedNode.getValue()
1491
                              + " to " + targetNode.getValue());
1492
                      
1493
                      if (listedNode.getValue().equals(targetNode.getValue())
1494
                              && status.equals(ReplicationStatus.REQUESTED)) {
1495
                          isAllowed = true;
1496
                          break;
1497

    
1498
                      }
1499
                  }
1500
              }
1501
          }
1502
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1503
              "to replicate: " + isAllowed + " for " + pid.getValue());
1504

    
1505
          
1506
      } else {
1507
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1508
          " is null.");
1509
          String error ="";
1510
          String localId = null;
1511
          try {
1512
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1513
            
1514
           } catch (Exception e) {
1515
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1516
          }
1517
          
1518
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1519
              error = DELETEDMESSAGE;
1520
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1521
              error = DELETEDMESSAGE;
1522
          }
1523
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1524
          
1525
      }
1526

    
1527
    } catch (RuntimeException e) {
1528
    	  ServiceFailure sf = new ServiceFailure("4872", 
1529
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1530
                e.getMessage());
1531
    	  sf.initCause(e);
1532
        throw sf;
1533
        
1534
    }
1535
      
1536
    return isAllowed;
1537
    
1538
  }
1539

    
1540
  /**
1541
   * Adds a new object to the Node, where the object is a science metadata object.
1542
   * 
1543
   * @param session - the Session object containing the credentials for the Subject
1544
   * @param pid - The object identifier to be created
1545
   * @param object - the object bytes
1546
   * @param sysmeta - the system metadata that describes the object  
1547
   * 
1548
   * @return pid - the object identifier created
1549
   * 
1550
   * @throws InvalidToken
1551
   * @throws ServiceFailure
1552
   * @throws NotAuthorized
1553
   * @throws IdentifierNotUnique
1554
   * @throws UnsupportedType
1555
   * @throws InsufficientResources
1556
   * @throws InvalidSystemMetadata
1557
   * @throws NotImplemented
1558
   * @throws InvalidRequest
1559
   */
1560
  public Identifier create(Session session, Identifier pid, InputStream object,
1561
    SystemMetadata sysmeta) 
1562
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1563
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1564
    NotImplemented, InvalidRequest {
1565
       
1566
   // verify the pid is valid format
1567
      if (!isValidIdentifier(pid)) {
1568
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1569
      }
1570
      // The lock to be used for this identifier
1571
      Lock lock = null;
1572

    
1573
      try {
1574
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1575
          lock.lock();
1576
          // are we allowed?
1577
          boolean isAllowed = false;
1578
          isAllowed = isAdminAuthorized(session);
1579
          
1580
          // additional check if it is the authoritative node if it is not the admin
1581
          if(!isAllowed) {
1582
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1583
          }
1584

    
1585
          // proceed if we're called by a CN
1586
          if ( isAllowed ) {
1587
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1588
              checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1589
              // create the coordinating node version of the document      
1590
              logMetacat.debug("Locked identifier " + pid.getValue());
1591
              sysmeta.setSerialVersion(BigInteger.ONE);
1592
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1593
              //sysmeta.setArchived(false); // this is a create op, not update
1594
              
1595
              // the CN should have set the origin and authoritative member node fields
1596
              try {
1597
                  sysmeta.getOriginMemberNode().getValue();
1598
                  sysmeta.getAuthoritativeMemberNode().getValue();
1599
                  
1600
              } catch (NullPointerException npe) {
1601
                  throw new InvalidSystemMetadata("4896", 
1602
                      "Both the origin and authoritative member node identifiers need to be set.");
1603
                  
1604
              }
1605
              pid = super.create(session, pid, object, sysmeta);
1606

    
1607
          } else {
1608
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1609
                  " isn't allowed to call create() on a Coordinating Node.";
1610
              logMetacat.info(msg);
1611
              throw new NotAuthorized("1100", msg);
1612
          }
1613
          
1614
      } catch (RuntimeException e) {
1615
          // Convert Hazelcast runtime exceptions to service failures
1616
          String msg = "There was a problem creating the object identified by " +
1617
              pid.getValue() + ". There error message was: " + e.getMessage();
1618
          throw new ServiceFailure("4893", msg);
1619
          
1620
      } finally {
1621
    	  if (lock != null) {
1622
	          lock.unlock();
1623
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1624
    	  }
1625
      }
1626
      
1627
      return pid;
1628

    
1629
  }
1630

    
1631
  /**
1632
   * Set access for a given object using the object identifier and a Subject
1633
   * under a given Session.
1634
   * 
1635
   * @param session - the Session object containing the credentials for the Subject
1636
   * @param pid - the object identifier for the given object to apply the policy
1637
   * @param policy - the access policy to be applied
1638
   * 
1639
   * @return true if the application of the policy succeeds
1640
   * @throws InvalidToken
1641
   * @throws ServiceFailure
1642
   * @throws NotFound
1643
   * @throws NotAuthorized
1644
   * @throws NotImplemented
1645
   * @throws InvalidRequest
1646
   */
1647
  public boolean setAccessPolicy(Session session, Identifier pid, 
1648
      AccessPolicy accessPolicy, long serialVersion) 
1649
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1650
      NotImplemented, InvalidRequest, VersionMismatch {
1651
      
1652
   // do we have a valid pid?
1653
      if (pid == null || pid.getValue().trim().equals("")) {
1654
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1655
          
1656
      }
1657
      
1658
      String serviceFailureCode = "4430";
1659
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1660
      if(sid != null) {
1661
          pid = sid;
1662
      }
1663
      // The lock to be used for this identifier
1664
      Lock lock = null;
1665
      SystemMetadata systemMetadata = null;
1666
      
1667
      boolean success = false;
1668
      
1669
      // get the subject
1670
      Subject subject = session.getSubject();
1671
      
1672
      // are we allowed to do this?
1673
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1674
          throw new NotAuthorized("4420", "not allowed by "
1675
                  + subject.getValue() + " on " + pid.getValue());
1676
      }
1677
      
1678
      try {
1679
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1680
          lock.lock();
1681
          logMetacat.debug("Locked identifier " + pid.getValue());
1682

    
1683
          try {
1684
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1685

    
1686
              if ( systemMetadata == null ) {
1687
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1688
                  
1689
              }
1690
              // does the request have the most current system metadata?
1691
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1692
                 String msg = "The requested system metadata version number " + 
1693
                     serialVersion + " differs from the current version at " +
1694
                     systemMetadata.getSerialVersion().longValue() +
1695
                     ". Please get the latest copy in order to modify it.";
1696
                 throw new VersionMismatch("4402", msg);
1697
                 
1698
              }
1699
              
1700
          } catch (RuntimeException e) {
1701
              // convert Hazelcast RuntimeException to NotFound
1702
              throw new NotFound("4400", "No record found for: " + pid);
1703
            
1704
          }
1705
              
1706
          // set the access policy
1707
          systemMetadata.setAccessPolicy(accessPolicy);
1708
          
1709
          // update the system metadata
1710
          try {
1711
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1712
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1713
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1714
              notifyReplicaNodes(systemMetadata);
1715
              
1716
          } catch (RuntimeException e) {
1717
              // convert Hazelcast RuntimeException to ServiceFailure
1718
              throw new ServiceFailure("4430", e.getMessage());
1719
            
1720
          }
1721
          
1722
      } catch (RuntimeException e) {
1723
          throw new ServiceFailure("4430", e.getMessage());
1724
          
1725
      } finally {
1726
          lock.unlock();
1727
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1728
        
1729
      }
1730

    
1731
    
1732
    // TODO: how do we know if the map was persisted?
1733
    success = true;
1734
    
1735
    return success;
1736
  }
1737

    
1738
  /**
1739
   * Full replacement of replication metadata in the system metadata for the 
1740
   * specified object, changes date system metadata modified
1741
   * 
1742
   * @param session - the Session object containing the credentials for the Subject
1743
   * @param pid - the object identifier for the given object to apply the policy
1744
   * @param replica - the replica to be updated
1745
   * @return
1746
   * @throws NotImplemented
1747
   * @throws NotAuthorized
1748
   * @throws ServiceFailure
1749
   * @throws InvalidRequest
1750
   * @throws NotFound
1751
   * @throws VersionMismatch
1752
   */
1753
  @Override
1754
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1755
      Replica replica, long serialVersion) 
1756
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1757
      NotFound, VersionMismatch {
1758
      
1759
      // The lock to be used for this identifier
1760
      Lock lock = null;
1761
      
1762
      // get the subject
1763
      Subject subject = session.getSubject();
1764
      
1765
      // are we allowed to do this?
1766
      try {
1767

    
1768
          // what is the controlling permission?
1769
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1770
              throw new NotAuthorized("4851", "not allowed by "
1771
                      + subject.getValue() + " on " + pid.getValue());
1772
          }
1773

    
1774
        
1775
      } catch (InvalidToken e) {
1776
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1777
                  " on " + pid.getValue());  
1778
          
1779
      }
1780

    
1781
      SystemMetadata systemMetadata = null;
1782
      try {
1783
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1784
          lock.lock();
1785
          logMetacat.debug("Locked identifier " + pid.getValue());
1786

    
1787
          try {      
1788
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1789

    
1790
              // does the request have the most current system metadata?
1791
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1792
                 String msg = "The requested system metadata version number " + 
1793
                     serialVersion + " differs from the current version at " +
1794
                     systemMetadata.getSerialVersion().longValue() +
1795
                     ". Please get the latest copy in order to modify it.";
1796
                 throw new VersionMismatch("4855", msg);
1797
              }
1798
              
1799
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1800
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1801
                  " : " + e.getMessage());
1802
            
1803
          }
1804
              
1805
          // set the status for the replica
1806
          List<Replica> replicas = systemMetadata.getReplicaList();
1807
          NodeReference replicaNode = replica.getReplicaMemberNode();
1808
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1809
          int index = 0;
1810
          for (Replica listedReplica: replicas) {
1811
              
1812
              // remove the replica that we are replacing
1813
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1814
                      // don't allow status to change from COMPLETED to anything other
1815
                      // than INVALIDATED: prevents overwrites from race conditions
1816
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1817
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1818
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1819
                	  throw new InvalidRequest("4853", "Status state change from " +
1820
                			  listedReplica.getReplicationStatus() + " to " +
1821
                			  replicaStatus.toString() + "is prohibited for identifier " +
1822
                			  pid.getValue() + " and target node " + 
1823
                			  listedReplica.getReplicaMemberNode().getValue());
1824

    
1825
            	  }
1826
                  replicas.remove(index);
1827
                  break;
1828
                  
1829
              }
1830
              index++;
1831
          }
1832
          
1833
          // add the new replica item
1834
          replicas.add(replica);
1835
          systemMetadata.setReplicaList(replicas);
1836
          
1837
          // update the metadata
1838
          try {
1839
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1840
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1841
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1842
              
1843
              // inform replica nodes of the change if the status is complete
1844
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1845
            	  notifyReplicaNodes(systemMetadata);
1846
            	  
1847
              }
1848
          } catch (RuntimeException e) {
1849
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1850
              throw new ServiceFailure("4852", e.getMessage());
1851
          
1852
          }
1853
          
1854
      } catch (RuntimeException e) {
1855
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1856
          throw new ServiceFailure("4852", e.getMessage());
1857
      
1858
      } finally {
1859
          lock.unlock();
1860
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1861
          
1862
      }
1863
    
1864
      return true;
1865
      
1866
  }
1867
  
1868
  /**
1869
   * 
1870
   */
1871
  @Override
1872
  public ObjectList listObjects(Session session, Date startTime, 
1873
      Date endTime, ObjectFormatIdentifier formatid, Identifier identifier, Boolean replicaStatus,
1874
      Integer start, Integer count)
1875
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1876
      ServiceFailure {
1877
      
1878
      return super.listObjects(session, startTime, endTime, formatid, identifier, replicaStatus, start, count);
1879
  }
1880

    
1881
  
1882
 	/**
1883
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1884
 	 * @return cal  the list of checksum algorithms
1885
 	 * 
1886
 	 * @throws ServiceFailure
1887
 	 * @throws NotImplemented
1888
 	 */
1889
  @Override
1890
  public ChecksumAlgorithmList listChecksumAlgorithms()
1891
			throws ServiceFailure, NotImplemented {
1892
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1893
		cal.addAlgorithm("MD5");
1894
		cal.addAlgorithm("SHA-1");
1895
		return cal;
1896
		
1897
	}
1898

    
1899
  /**
1900
   * Notify replica Member Nodes of system metadata changes for a given pid
1901
   * 
1902
   * @param currentSystemMetadata - the up to date system metadata
1903
   */
1904
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1905
      
1906
      Session session = null;
1907
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1908
      MNode mn = null;
1909
      NodeReference replicaNodeRef = null;
1910
      CNode cn = null;
1911
      NodeType nodeType = null;
1912
      List<Node> nodeList = null;
1913
      
1914
      try {
1915
          cn = D1Client.getCN();
1916
          nodeList = cn.listNodes().getNodeList();
1917
          
1918
      } catch (Exception e) { // handle BaseException and other I/O issues
1919
          
1920
          // swallow errors since the call is not critical
1921
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1922
              "to communication issues with the CN: " + e.getMessage());
1923
          
1924
      }
1925
      
1926
      if ( replicaList != null ) {
1927
          
1928
          // iterate through the replicas and inform  MN replica nodes
1929
          for (Replica replica : replicaList) {
1930
              
1931
              replicaNodeRef = replica.getReplicaMemberNode();
1932
              try {
1933
                  if (nodeList != null) {
1934
                      // find the node type
1935
                      for (Node node : nodeList) {
1936
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
1937
                              nodeType = node.getType();
1938
                              break;
1939
              
1940
                          }
1941
                      }
1942
                  }
1943
              
1944
                  // notify only MNs
1945
                  if (nodeType != null && nodeType == NodeType.MN) {
1946
                      mn = D1Client.getMN(replicaNodeRef);
1947
                      mn.systemMetadataChanged(session, 
1948
                          currentSystemMetadata.getIdentifier(), 
1949
                          currentSystemMetadata.getSerialVersion().longValue(),
1950
                          currentSystemMetadata.getDateSysMetadataModified());
1951
                  }
1952
              
1953
              } catch (Exception e) { // handle BaseException and other I/O issues
1954
              
1955
                  // swallow errors since the call is not critical
1956
                  logMetacat.error("Can't inform "
1957
                          + replicaNodeRef.getValue()
1958
                          + " of system metadata changes due "
1959
                          + "to communication issues with the CN: "
1960
                          + e.getMessage());
1961
              
1962
              }
1963
          }
1964
      }
1965
  }
1966

    
1967
	@Override
1968
	public QueryEngineDescription getQueryEngineDescription(Session session,
1969
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
1970
			NotImplemented, NotFound {
1971
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1972

    
1973
	}
1974
	
1975
	@Override
1976
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
1977
			ServiceFailure, NotAuthorized, NotImplemented {
1978
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1979

    
1980
	}
1981
	
1982
	@Override
1983
	public InputStream query(Session session, String queryEngine, String query)
1984
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1985
			NotImplemented, NotFound {
1986
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1987

    
1988
	}
1989
	
1990
	@Override
1991
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
1992
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
1993
	}
1994
	
1995
	
1996
    
1997
}
(1-1/7)