Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.client.MNode;
40
import org.dataone.service.cn.v1.CNAuthorization;
41
import org.dataone.service.cn.v1.CNCore;
42
import org.dataone.service.cn.v1.CNRead;
43
import org.dataone.service.cn.v1.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.Identifier;
60
import org.dataone.service.types.v1.Node;
61
import org.dataone.service.types.v1.NodeList;
62
import org.dataone.service.types.v1.NodeReference;
63
import org.dataone.service.types.v1.NodeType;
64
import org.dataone.service.types.v1.ObjectFormat;
65
import org.dataone.service.types.v1.ObjectFormatIdentifier;
66
import org.dataone.service.types.v1.ObjectFormatList;
67
import org.dataone.service.types.v1.ObjectList;
68
import org.dataone.service.types.v1.ObjectLocationList;
69
import org.dataone.service.types.v1.Permission;
70
import org.dataone.service.types.v1.Replica;
71
import org.dataone.service.types.v1.ReplicationPolicy;
72
import org.dataone.service.types.v1.ReplicationStatus;
73
import org.dataone.service.types.v1.Session;
74
import org.dataone.service.types.v1.Subject;
75
import org.dataone.service.types.v1.SystemMetadata;
76
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
77

    
78
import edu.ucsb.nceas.metacat.EventLog;
79
import edu.ucsb.nceas.metacat.IdentifierManager;
80
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
81

    
82
/**
83
 * Represents Metacat's implementation of the DataONE Coordinating Node 
84
 * service API. Methods implement the various CN* interfaces, and methods common
85
 * to both Member Node and Coordinating Node interfaces are found in the
86
 * D1NodeService super class.
87
 *
88
 */
89
public class CNodeService extends D1NodeService implements CNAuthorization,
90
    CNCore, CNRead, CNReplication {
91

    
92
  /* the logger instance */
93
  private Logger logMetacat = null;
94

    
95
  /**
96
   * singleton accessor
97
   */
98
  public static CNodeService getInstance(HttpServletRequest request) { 
99
    return new CNodeService(request);
100
  }
101
  
102
  /**
103
   * Constructor, private for singleton access
104
   */
105
  private CNodeService(HttpServletRequest request) {
106
    super(request);
107
    logMetacat = Logger.getLogger(CNodeService.class);
108
        
109
  }
110
    
111
  /**
112
   * Set the replication policy for an object given the object identifier
113
   * 
114
   * @param session - the Session object containing the credentials for the Subject
115
   * @param pid - the object identifier for the given object
116
   * @param policy - the replication policy to be applied
117
   * 
118
   * @return true or false
119
   * 
120
   * @throws NotImplemented
121
   * @throws NotAuthorized
122
   * @throws ServiceFailure
123
   * @throws InvalidRequest
124
   * @throws VersionMismatch
125
   * 
126
   */
127
  @Override
128
  public boolean setReplicationPolicy(Session session, Identifier pid,
129
      ReplicationPolicy policy, long serialVersion) 
130
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
131
      InvalidRequest, InvalidToken, VersionMismatch {
132
      
133
      // The lock to be used for this identifier
134
      Lock lock = null;
135
      
136
      // get the subject
137
      Subject subject = session.getSubject();
138
      
139
      // are we allowed to do this?
140
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
141
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
142
                  + " not allowed by " + subject.getValue() + " on "
143
                  + pid.getValue());
144
          
145
      }
146
      
147
      SystemMetadata systemMetadata = null;
148
      try {
149
          lock = HazelcastService.getInstance().getLock(pid.getValue());
150
          lock.lock();
151
          logMetacat.debug("Locked identifier " + pid.getValue());
152

    
153
          try {
154
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
155
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
156
                  
157
              }
158
              
159
              // did we get it correctly?
160
              if ( systemMetadata == null ) {
161
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
162
                  
163
              }
164

    
165
              // does the request have the most current system metadata?
166
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
167
                 String msg = "The requested system metadata version number " + 
168
                     serialVersion + " differs from the current version at " +
169
                     systemMetadata.getSerialVersion().longValue() +
170
                     ". Please get the latest copy in order to modify it.";
171
                 throw new VersionMismatch("4886", msg);
172
                 
173
              }
174
              
175
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
176
              throw new NotFound("4884", "No record found for: " + pid.getValue());
177
            
178
          }
179
          
180
          // set the new policy
181
          systemMetadata.setReplicationPolicy(policy);
182
          
183
          // update the metadata
184
          try {
185
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
186
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
187
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
188
              notifyReplicaNodes(systemMetadata);
189
              
190
          } catch (RuntimeException e) {
191
              throw new ServiceFailure("4882", e.getMessage());
192
          
193
          }
194
          
195
      } catch (RuntimeException e) {
196
          throw new ServiceFailure("4882", e.getMessage());
197
          
198
      } finally {
199
          lock.unlock();
200
          logMetacat.debug("Unlocked identifier " + pid.getValue());
201
          
202
      }
203
    
204
      return true;
205
  }
206

    
207
  /**
208
   * Deletes the replica from the given Member Node
209
   * NOTE: MN.delete() may be an "archive" operation. TBD.
210
   * @param session
211
   * @param pid
212
   * @param nodeId
213
   * @param serialVersion
214
   * @return
215
   * @throws InvalidToken
216
   * @throws ServiceFailure
217
   * @throws NotAuthorized
218
   * @throws NotFound
219
   * @throws NotImplemented
220
   * @throws VersionMismatch
221
   */
222
  @Override
223
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
224
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
225
	  
226
	  	// The lock to be used for this identifier
227
		Lock lock = null;
228

    
229
		// get the subject
230
		Subject subject = session.getSubject();
231

    
232
		// are we allowed to do this?
233
		boolean isAuthorized = false;
234
		try {
235
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
236
		} catch (InvalidRequest e) {
237
			throw new ServiceFailure("4882", e.getDescription());
238
		}
239
		if (!isAuthorized) {
240
			throw new NotAuthorized("4881", Permission.WRITE
241
					+ " not allowed by " + subject.getValue() + " on "
242
					+ pid.getValue());
243

    
244
		}
245

    
246
		SystemMetadata systemMetadata = null;
247
		try {
248
			lock = HazelcastService.getInstance().getLock(pid.getValue());
249
			lock.lock();
250
			logMetacat.debug("Locked identifier " + pid.getValue());
251

    
252
			try {
253
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
254
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
255
				}
256

    
257
				// did we get it correctly?
258
				if (systemMetadata == null) {
259
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
260
				}
261

    
262
				// does the request have the most current system metadata?
263
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
264
					String msg = "The requested system metadata version number "
265
							+ serialVersion
266
							+ " differs from the current version at "
267
							+ systemMetadata.getSerialVersion().longValue()
268
							+ ". Please get the latest copy in order to modify it.";
269
					throw new VersionMismatch("4886", msg);
270

    
271
				}
272

    
273
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
274
				throw new NotFound("4884", "No record found for: " + pid.getValue());
275

    
276
			}
277
			  
278
			// check permissions
279
			// TODO: is this necessary?
280
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
281
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
282
			if (isAllowed) {
283
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
284
			}
285
			  
286
			// delete the replica from the given node
287
			D1Client.getMN(nodeId).delete(session, pid);
288
			  
289
			// reflect that change in the system metadata
290
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
291
			for (Replica r: systemMetadata.getReplicaList()) {
292
				  if (r.getReplicaMemberNode().equals(nodeId)) {
293
					  updatedReplicas.remove(r);
294
					  break;
295
				  }
296
			}
297
			systemMetadata.setReplicaList(updatedReplicas);
298

    
299
			// update the metadata
300
			try {
301
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
302
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
303
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
304
			} catch (RuntimeException e) {
305
				throw new ServiceFailure("4882", e.getMessage());
306
			}
307

    
308
		} catch (RuntimeException e) {
309
			throw new ServiceFailure("4882", e.getMessage());
310
		} finally {
311
			lock.unlock();
312
			logMetacat.debug("Unlocked identifier " + pid.getValue());
313
		}
314

    
315
		return true;	  
316
	  
317
  }
318
  
319
  /**
320
   * Set the obsoletedBy attribute in System Metadata
321
   * @param session
322
   * @param pid
323
   * @param obsoletedByPid
324
   * @param serialVersion
325
   * @return
326
   * @throws NotImplemented
327
   * @throws NotFound
328
   * @throws NotAuthorized
329
   * @throws ServiceFailure
330
   * @throws InvalidRequest
331
   * @throws InvalidToken
332
   * @throws VersionMismatch
333
   */
334
  @Override
335
  public boolean setObsoletedBy(Session session, Identifier pid,
336
			Identifier obsoletedByPid, long serialVersion)
337
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
338
			InvalidRequest, InvalidToken, VersionMismatch {
339

    
340
		// The lock to be used for this identifier
341
		Lock lock = null;
342

    
343
		// get the subject
344
		Subject subject = session.getSubject();
345

    
346
		// are we allowed to do this?
347
		if (!isAuthorized(session, pid, Permission.WRITE)) {
348
			throw new NotAuthorized("4881", Permission.WRITE
349
					+ " not allowed by " + subject.getValue() + " on "
350
					+ pid.getValue());
351

    
352
		}
353

    
354

    
355
		SystemMetadata systemMetadata = null;
356
		try {
357
			lock = HazelcastService.getInstance().getLock(pid.getValue());
358
			lock.lock();
359
			logMetacat.debug("Locked identifier " + pid.getValue());
360

    
361
			try {
362
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
363
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
364
				}
365

    
366
				// did we get it correctly?
367
				if (systemMetadata == null) {
368
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
369
				}
370

    
371
				// does the request have the most current system metadata?
372
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
373
					String msg = "The requested system metadata version number "
374
							+ serialVersion
375
							+ " differs from the current version at "
376
							+ systemMetadata.getSerialVersion().longValue()
377
							+ ". Please get the latest copy in order to modify it.";
378
					throw new VersionMismatch("4886", msg);
379

    
380
				}
381

    
382
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
383
				throw new NotFound("4884", "No record found for: " + pid.getValue());
384

    
385
			}
386

    
387
			// set the new policy
388
			systemMetadata.setObsoletedBy(obsoletedByPid);
389

    
390
			// update the metadata
391
			try {
392
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
393
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
394
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
395
			} catch (RuntimeException e) {
396
				throw new ServiceFailure("4882", e.getMessage());
397
			}
398

    
399
		} catch (RuntimeException e) {
400
			throw new ServiceFailure("4882", e.getMessage());
401
		} finally {
402
			lock.unlock();
403
			logMetacat.debug("Unlocked identifier " + pid.getValue());
404
		}
405

    
406
		return true;
407
	}
408
  
409
  
410
  /**
411
   * Set the replication status for an object given the object identifier
412
   * 
413
   * @param session - the Session object containing the credentials for the Subject
414
   * @param pid - the object identifier for the given object
415
   * @param status - the replication status to be applied
416
   * 
417
   * @return true or false
418
   * 
419
   * @throws NotImplemented
420
   * @throws NotAuthorized
421
   * @throws ServiceFailure
422
   * @throws InvalidRequest
423
   * @throws InvalidToken
424
   * @throws NotFound
425
   * 
426
   */
427
  @Override
428
  public boolean setReplicationStatus(Session session, Identifier pid,
429
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
430
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
431
      InvalidRequest, NotFound {
432
	  
433
	  // cannot be called by public
434
	  if (session == null) {
435
		  throw new NotAuthorized("4720", "Session cannot be null");
436
	  }
437
      
438
      // The lock to be used for this identifier
439
      Lock lock = null;
440
      
441
      boolean allowed = false;
442
      int replicaEntryIndex = -1;
443
      List<Replica> replicas = null;
444
      // get the subject
445
      Subject subject = session.getSubject();
446
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
447
          " is " + status.toString());
448
      
449
      SystemMetadata systemMetadata = null;
450

    
451
      try {
452
          lock = HazelcastService.getInstance().getLock(pid.getValue());
453
          lock.lock();
454
          logMetacat.debug("Locked identifier " + pid.getValue());
455

    
456
          try {      
457
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
458

    
459
              // did we get it correctly?
460
              if ( systemMetadata == null ) {
461
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
462
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
463
                  
464
              }
465
              replicas = systemMetadata.getReplicaList();
466
              int count = 0;
467
              
468
              // was there a failure? log it
469
              if ( failure != null && status == ReplicationStatus.FAILED ) {
470
                 String msg = "The replication request of the object identified by " + 
471
                     pid.getValue() + " failed.  The error message was " +
472
                     failure.getMessage() + ".";
473
              }
474
              
475
              if (replicas.size() > 0 && replicas != null) {
476
                  // find the target replica index in the replica list
477
                  for (Replica replica : replicas) {
478
                      String replicaNodeStr = replica.getReplicaMemberNode()
479
                              .getValue();
480
                      String targetNodeStr = targetNode.getValue();
481
                      logMetacat.debug("Comparing " + replicaNodeStr + " to "
482
                              + targetNodeStr);
483
                  
484
                      if (replicaNodeStr.equals(targetNodeStr)) {
485
                          replicaEntryIndex = count;
486
                          logMetacat.debug("replica entry index is: "
487
                                  + replicaEntryIndex);
488
                          break;
489
                      }
490
                      count++;
491
                  
492
                  }
493
              }
494
              // are we allowed to do this? only CNs and target MNs are allowed
495
              CNode cn = D1Client.getCN();
496
              List<Node> nodes = cn.listNodes().getNodeList();
497
              
498
              // find the node in the node list
499
              for ( Node node : nodes ) {
500
                  
501
                  NodeReference nodeReference = node.getIdentifier();
502
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
503
                  
504
                  // allow target MN certs
505
                  if (targetNode.getValue().equals(nodeReference.getValue()) &&
506
                      node.getType() == NodeType.MN) {
507
                      List<Subject> nodeSubjects = node.getSubjectList();
508
                      
509
                      // check if the session subject is in the node subject list
510
                      for (Subject nodeSubject : nodeSubjects) {
511
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
512
                                  nodeSubject.getValue() + " and " + subject.getValue());
513
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
514
                              
515
                              // lastly limit to COMPLETED status updates from MNs only
516
                              if ( status == ReplicationStatus.COMPLETED ) {
517
                                  allowed = true;
518
                                  break;
519
                                  
520
                              }                              
521
                          }
522
                      }                 
523
                  }
524
              }
525

    
526
              if ( !allowed ) {
527
                  //check for CN admin access
528
                  allowed = isAuthorized(session, pid, Permission.WRITE);
529
                  
530
              }              
531
              
532
              if ( !allowed ) {
533
                  String msg = "The subject identified by "
534
                          + subject.getValue()
535
                          + " does not have permission to set the replication status for "
536
                          + "the replica identified by "
537
                          + targetNode.getValue() + ".";
538
                  logMetacat.info(msg);
539
                  throw new NotAuthorized("4720", msg);
540
                  
541
              }
542

    
543
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
544
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
545
                " : " + e.getMessage());
546
            
547
          }
548
          
549
          Replica targetReplica = new Replica();
550
          // set the status for the replica
551
          if ( replicaEntryIndex != -1 ) {
552
              targetReplica = replicas.get(replicaEntryIndex);
553
              targetReplica.setReplicationStatus(status);
554
              logMetacat.debug("Set the replication status for " + 
555
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
556
                  targetReplica.getReplicationStatus());
557
              
558
          } else {
559
              // this is a new entry, create it
560
              targetReplica.setReplicaMemberNode(targetNode);
561
              targetReplica.setReplicationStatus(status);
562
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
563
              replicas.add(targetReplica);
564
              
565
          }
566
          
567
          systemMetadata.setReplicaList(replicas);
568
                
569
          // update the metadata
570
          try {
571
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
572
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
573
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
574
              
575
              if ( status == ReplicationStatus.FAILED && failure != null ) {
576
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
577
                      " on target node " + targetNode + ". The exception was: " +
578
                      failure.getMessage());
579
              }
580
          } catch (RuntimeException e) {
581
              throw new ServiceFailure("4700", e.getMessage());
582
          
583
          }
584
          
585
    } catch (RuntimeException e) {
586
        String msg = "There was a RuntimeException getting the lock for " +
587
            pid.getValue();
588
        logMetacat.info(msg);
589
        
590
    } finally {
591
        lock.unlock();
592
        logMetacat.debug("Unlocked identifier " + pid.getValue());
593
        
594
    }
595
      
596
      return true;
597
  }
598
  
599
  /**
600
   * Return the checksum of the object given the identifier 
601
   * 
602
   * @param session - the Session object containing the credentials for the Subject
603
   * @param pid - the object identifier for the given object
604
   * 
605
   * @return checksum - the checksum of the object
606
   * 
607
   * @throws InvalidToken
608
   * @throws ServiceFailure
609
   * @throws NotAuthorized
610
   * @throws NotFound
611
   * @throws NotImplemented
612
   */
613
  @Override
614
  public Checksum getChecksum(Session session, Identifier pid)
615
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
616
    NotImplemented {
617
    
618
	boolean isAuthorized = false;
619
	try {
620
		isAuthorized = isAuthorized(session, pid, Permission.READ);
621
	} catch (InvalidRequest e) {
622
		throw new ServiceFailure("1410", e.getDescription());
623
	}  
624
    if (!isAuthorized) {
625
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
626
    }
627
    
628
    SystemMetadata systemMetadata = null;
629
    Checksum checksum = null;
630
    
631
    try {
632
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
633

    
634
        if (systemMetadata == null ) {
635
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
636
        }
637
        checksum = systemMetadata.getChecksum();
638
        
639
    } catch (RuntimeException e) {
640
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
641
            pid.getValue() + ". The error message was: " + e.getMessage());
642
      
643
    }
644
    
645
    return checksum;
646
  }
647

    
648
  /**
649
   * Resolve the location of a given object
650
   * 
651
   * @param session - the Session object containing the credentials for the Subject
652
   * @param pid - the object identifier for the given object
653
   * 
654
   * @return objectLocationList - the list of nodes known to contain the object
655
   * 
656
   * @throws InvalidToken
657
   * @throws ServiceFailure
658
   * @throws NotAuthorized
659
   * @throws NotFound
660
   * @throws NotImplemented
661
   */
662
  @Override
663
  public ObjectLocationList resolve(Session session, Identifier pid)
664
    throws InvalidToken, ServiceFailure, NotAuthorized,
665
    NotFound, NotImplemented {
666

    
667
    throw new NotImplemented("4131", "resolve not implemented");
668

    
669
  }
670

    
671
  /**
672
   * Search the metadata catalog for identifiers that match the criteria
673
   * 
674
   * @param session - the Session object containing the credentials for the Subject
675
   * @param queryType - An identifier for the type of query expression 
676
   *                    provided in the query
677
   * @param query -  The criteria for matching the characteristics of the 
678
   *                 metadata objects of interest
679
   * 
680
   * @return objectList - the list of objects matching the criteria
681
   * 
682
   * @throws InvalidToken
683
   * @throws ServiceFailure
684
   * @throws NotAuthorized
685
   * @throws InvalidRequest
686
   * @throws NotImplemented
687
   */
688
  @Override
689
  public ObjectList search(Session session, String queryType, String query)
690
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
691
    NotImplemented {
692

    
693
    ObjectList objectList = null;
694
    try {
695
        objectList = 
696
          IdentifierManager.getInstance().querySystemMetadata(
697
              null, //startTime, 
698
              null, //endTime,
699
              null, //objectFormat, 
700
              false, //replicaStatus, 
701
              0, //start, 
702
              -1 //count
703
              );
704
        
705
    } catch (Exception e) {
706
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
707
    }
708

    
709
      return objectList;
710
      
711
    //throw new NotImplemented("4281", "search not implemented");
712
    
713
    // the code block below is from an older implementation
714
    
715
    /*  This block commented out because of the EcoGrid circular dependency.
716
         *  For now, query will not be supported until the circularity can be
717
         *  resolved, probably by moving the ecogrid query syntax transformers
718
         *  directly into the Metacat codebase.  MBJ 2010-02-03
719
         
720
        try {
721
            EcogridQueryParser parser = new EcogridQueryParser(request
722
                    .getReader());
723
            parser.parseXML();
724
            QueryType queryType = parser.getEcogridQuery();
725
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
726
                new EcogridJavaToMetacatJavaQueryTransformer();
727
            QuerySpecification metacatQuery = queryTransformer
728
                    .transform(queryType);
729

    
730
            DBQuery metacat = new DBQuery();
731

    
732
            boolean useXMLIndex = (new Boolean(PropertyService
733
                    .getProperty("database.usexmlindex"))).booleanValue();
734
            String xmlquery = "query"; // we don't care the query in resultset,
735
            // the query can be anything
736
            PrintWriter out = null; // we don't want metacat result, so set out null
737

    
738
            // parameter: queryspecification, user, group, usingIndexOrNot
739
            StringBuffer result = metacat.createResultDocument(xmlquery,
740
                    metacatQuery, out, username, groupNames, useXMLIndex);
741

    
742
            // create result set transfer       
743
            String saxparser = PropertyService.getProperty("xml.saxparser");
744
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
745
                    new StringReader(result.toString()), saxparser, queryType
746
                            .getNamespace().get_value());
747
            ResultsetType records = metacatResultsetParser.getEcogridResult();
748

    
749
            System.out
750
                    .println(EcogridResultsetTransformer.toXMLString(records));
751
            response.setContentType("text/xml");
752
            out = response.getWriter();
753
            out.print(EcogridResultsetTransformer.toXMLString(records));
754

    
755
        } catch (Exception e) {
756
            e.printStackTrace();
757
        }*/
758
    
759

    
760
  }
761
  
762
  /**
763
   * Returns the object format registered in the DataONE Object Format 
764
   * Vocabulary for the given format identifier
765
   * 
766
   * @param fmtid - the identifier of the format requested
767
   * 
768
   * @return objectFormat - the object format requested
769
   * 
770
   * @throws ServiceFailure
771
   * @throws NotFound
772
   * @throws InsufficientResources
773
   * @throws NotImplemented
774
   */
775
  @Override
776
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
777
    throws ServiceFailure, NotFound, NotImplemented {
778
     
779
      return ObjectFormatService.getInstance().getFormat(fmtid);
780
      
781
  }
782

    
783
  /**
784
   * Returns a list of all object formats registered in the DataONE Object 
785
   * Format Vocabulary
786
    * 
787
   * @return objectFormatList - The list of object formats registered in 
788
   *                            the DataONE Object Format Vocabulary
789
   * 
790
   * @throws ServiceFailure
791
   * @throws NotImplemented
792
   * @throws InsufficientResources
793
   */
794
  @Override
795
  public ObjectFormatList listFormats() 
796
    throws ServiceFailure, NotImplemented {
797

    
798
    return ObjectFormatService.getInstance().listFormats();
799
  }
800

    
801
  /**
802
   * Returns a list of nodes that have been registered with the DataONE infrastructure
803
    * 
804
   * @return nodeList - List of nodes from the registry
805
   * 
806
   * @throws ServiceFailure
807
   * @throws NotImplemented
808
   */
809
  @Override
810
  public NodeList listNodes() 
811
    throws NotImplemented, ServiceFailure {
812

    
813
    throw new NotImplemented("4800", "listNodes not implemented");
814
  }
815

    
816
  /**
817
   * Provides a mechanism for adding system metadata independently of its 
818
   * associated object, such as when adding system metadata for data objects.
819
    * 
820
   * @param session - the Session object containing the credentials for the Subject
821
   * @param pid - The identifier of the object to register the system metadata against
822
   * @param sysmeta - The system metadata to be registered
823
   * 
824
   * @return true if the registration succeeds
825
   * 
826
   * @throws NotImplemented
827
   * @throws NotAuthorized
828
   * @throws ServiceFailure
829
   * @throws InvalidRequest
830
   * @throws InvalidSystemMetadata
831
   */
832
  @Override
833
  public Identifier registerSystemMetadata(Session session, Identifier pid,
834
      SystemMetadata sysmeta) 
835
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
836
      InvalidSystemMetadata {
837

    
838
      // The lock to be used for this identifier
839
      Lock lock = null;
840

    
841
      // TODO: control who can call this?
842
      if (session == null) {
843
          //TODO: many of the thrown exceptions do not use the correct error codes
844
          //check these against the docs and correct them
845
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
846
                  "  If you are not logged in, please do so and retry the request.");
847
      }
848
      
849
      // verify that guid == SystemMetadata.getIdentifier()
850
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
851
          "|" + sysmeta.getIdentifier().getValue());
852
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
853
          throw new InvalidRequest("4863", 
854
              "The identifier in method call (" + pid.getValue() + 
855
              ") does not match identifier in system metadata (" +
856
              sysmeta.getIdentifier().getValue() + ").");
857
      }
858

    
859
      try {
860
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
861
          lock.lock();
862
          logMetacat.debug("Locked identifier " + pid.getValue());
863
          logMetacat.debug("Checking if identifier exists...");
864
          // Check that the identifier does not already exist
865
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
866
              throw new InvalidRequest("4863", 
867
                  "The identifier is already in use by an existing object.");
868
          
869
          }
870
          
871
          // insert the system metadata into the object store
872
          logMetacat.debug("Starting to insert SystemMetadata...");
873
          try {
874
              sysmeta.setSerialVersion(BigInteger.ONE);
875
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
876
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
877
              
878
          } catch (RuntimeException e) {
879
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
880
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
881
                  e.getClass() + ": " + e.getMessage());
882
              
883
          }
884
          
885
      } catch (RuntimeException e) {
886
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
887
                  e.getClass() + ": " + e.getMessage());
888
          
889
      }  finally {
890
          lock.unlock();
891
          logMetacat.debug("Unlocked identifier " + pid.getValue());
892
          
893
      }
894

    
895
      
896
      logMetacat.debug("Returning from registerSystemMetadata");
897
      EventLog.getInstance().log(request.getRemoteAddr(), 
898
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
899
          pid.getValue(), "registerSystemMetadata");
900
      return pid;
901
  }
902
  
903
  /**
904
   * Given an optional scope and format, reserves and returns an identifier 
905
   * within that scope and format that is unique and will not be 
906
   * used by any other sessions. 
907
    * 
908
   * @param session - the Session object containing the credentials for the Subject
909
   * @param pid - The identifier of the object to register the system metadata against
910
   * @param scope - An optional string to be used to qualify the scope of 
911
   *                the identifier namespace, which is applied differently 
912
   *                depending on the format requested. If scope is not 
913
   *                supplied, a default scope will be used.
914
   * @param format - The optional name of the identifier format to be used, 
915
   *                  drawn from a DataONE-specific vocabulary of identifier 
916
   *                 format names, including several common syntaxes such 
917
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
918
   *                 format is not supplied by the caller, the CN service 
919
   *                 will use a default identifier format, which may change 
920
   *                 over time.
921
   * 
922
   * @return true if the registration succeeds
923
   * 
924
   * @throws InvalidToken
925
   * @throws ServiceFailure
926
   * @throws NotAuthorized
927
   * @throws IdentifierNotUnique
928
   * @throws NotImplemented
929
   */
930
  @Override
931
  public Identifier reserveIdentifier(Session session, Identifier pid)
932
  throws InvalidToken, ServiceFailure,
933
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
934

    
935
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
936
  }
937
  
938
  @Override
939
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
940
  throws InvalidToken, ServiceFailure,
941
        NotAuthorized, NotImplemented, InvalidRequest {
942
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
943
  }
944
  
945
  /**
946
    * Checks whether the pid is reserved by the subject in the session param
947
    * If the reservation is held on the pid by the subject, we return true.
948
    * 
949
   * @param session - the Session object containing the Subject
950
   * @param pid - The identifier to check
951
   * 
952
   * @return true if the reservation exists for the subject/pid
953
   * 
954
   * @throws InvalidToken
955
   * @throws ServiceFailure
956
   * @throws NotFound - when the pid is not found (in use or in reservation)
957
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
958
   * @throws IdentifierNotUnique - when the pid is in use
959
   * @throws NotImplemented
960
   */
961

    
962
  @Override
963
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
964
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
965
      NotImplemented, InvalidRequest {
966
  
967
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
968
  }
969

    
970
  /**
971
   * Changes ownership (RightsHolder) of the specified object to the 
972
   * subject specified by userId
973
    * 
974
   * @param session - the Session object containing the credentials for the Subject
975
   * @param pid - Identifier of the object to be modified
976
   * @param userId - The subject that will be taking ownership of the specified object.
977
   *
978
   * @return pid - the identifier of the modified object
979
   * 
980
   * @throws ServiceFailure
981
   * @throws InvalidToken
982
   * @throws NotFound
983
   * @throws NotAuthorized
984
   * @throws NotImplemented
985
   * @throws InvalidRequest
986
   */  
987
  @Override
988
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
989
      long serialVersion)
990
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
991
      NotImplemented, InvalidRequest, VersionMismatch {
992
      
993
      // The lock to be used for this identifier
994
      Lock lock = null;
995

    
996
      // get the subject
997
      Subject subject = session.getSubject();
998
      
999
      // are we allowed to do this?
1000
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1001
          throw new NotAuthorized("4440", "not allowed by "
1002
                  + subject.getValue() + " on " + pid.getValue());
1003
          
1004
      }
1005
      
1006
      SystemMetadata systemMetadata = null;
1007
      try {
1008
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1009
          logMetacat.debug("Locked identifier " + pid.getValue());
1010

    
1011
          try {
1012
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1013
              
1014
              // does the request have the most current system metadata?
1015
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1016
                 String msg = "The requested system metadata version number " + 
1017
                     serialVersion + " differs from the current version at " +
1018
                     systemMetadata.getSerialVersion().longValue() +
1019
                     ". Please get the latest copy in order to modify it.";
1020
                 throw new VersionMismatch("4443", msg);
1021
              }
1022
              
1023
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1024
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1025
              
1026
          }
1027
              
1028
          // set the new rights holder
1029
          systemMetadata.setRightsHolder(userId);
1030
          
1031
          // update the metadata
1032
          try {
1033
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1034
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1035
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1036
              notifyReplicaNodes(systemMetadata);
1037
              
1038
          } catch (RuntimeException e) {
1039
              throw new ServiceFailure("4490", e.getMessage());
1040
          
1041
          }
1042
          
1043
      } catch (RuntimeException e) {
1044
          throw new ServiceFailure("4490", e.getMessage());
1045
          
1046
      } finally {
1047
          lock.unlock();
1048
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1049
      
1050
      }
1051
      
1052
      return pid;
1053
  }
1054

    
1055
  /**
1056
   * Verify that a replication task is authorized by comparing the target node's
1057
   * Subject (from the X.509 certificate-derived Session) with the list of 
1058
   * subjects in the known, pending replication tasks map.
1059
   * 
1060
   * @param originatingNodeSession - Session information that contains the 
1061
   *                                 identity of the calling user
1062
   * @param targetNodeSubject - Subject identifying the target node
1063
   * @param pid - the identifier of the object to be replicated
1064
   * @param replicatePermission - the execute permission to be granted
1065
   * 
1066
   * @throws ServiceFailure
1067
   * @throws NotImplemented
1068
   * @throws InvalidToken
1069
   * @throws NotAuthorized
1070
   * @throws InvalidRequest
1071
   * @throws NotFound
1072
   */
1073
  @Override
1074
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1075
    Subject targetNodeSubject, Identifier pid) 
1076
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1077
    NotFound, InvalidRequest {
1078
    
1079
    boolean isAllowed = false;
1080
    SystemMetadata sysmeta = null;
1081
    NodeReference targetNode = null;
1082
    
1083
    try {
1084
      // get the target node reference from the nodes list
1085
      CNode cn = D1Client.getCN();
1086
      List<Node> nodes = cn.listNodes().getNodeList();
1087
      
1088
      if ( nodes != null ) {
1089
        for (Node node : nodes) {
1090
            
1091
            for (Subject nodeSubject : node.getSubjectList()) {
1092
                
1093
                if ( nodeSubject.equals(targetNodeSubject) ) {
1094
                    targetNode = node.getIdentifier();
1095
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1096
                    break;
1097
                }
1098
            }
1099
            
1100
            if ( targetNode != null) { break; }
1101
        }
1102
        
1103
      } else {
1104
          String msg = "Couldn't get the node list from the CN";
1105
          logMetacat.debug(msg);
1106
          throw new ServiceFailure("4872", msg);
1107
          
1108
      }
1109
      
1110
      // can't find a node listed with the given subject
1111
      if ( targetNode == null ) {
1112
          String msg = "There is no Member Node registered with a node subject " +
1113
              "matching " + targetNodeSubject.getValue();
1114
          logMetacat.info(msg);
1115
          throw new NotAuthorized("4871", msg);
1116
          
1117
      }
1118
      
1119
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1120
      
1121
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1122

    
1123
      if ( sysmeta != null ) {
1124
          
1125
          List<Replica> replicaList = sysmeta.getReplicaList();
1126
          
1127
          if ( replicaList != null ) {
1128
              
1129
              // find the replica with the status set to 'requested'
1130
              for (Replica replica : replicaList) {
1131
                  ReplicationStatus status = replica.getReplicationStatus();
1132
                  NodeReference listedNode = replica.getReplicaMemberNode();
1133
                  if ( listedNode != null && targetNode != null ) {
1134
                      logMetacat.debug("Comparing " + listedNode.getValue()
1135
                              + " to " + targetNode.getValue());
1136
                      
1137
                      if (listedNode.getValue().equals(targetNode.getValue())
1138
                              && status.equals(ReplicationStatus.REQUESTED)) {
1139
                          isAllowed = true;
1140
                          break;
1141

    
1142
                      }
1143
                  }
1144
              }
1145
          }
1146
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1147
              "to replicate: " + isAllowed + " for " + pid.getValue());
1148

    
1149
          
1150
      } else {
1151
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1152
          " is null.");          
1153
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1154
          
1155
      }
1156

    
1157
    } catch (RuntimeException e) {
1158
    	  ServiceFailure sf = new ServiceFailure("4872", 
1159
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1160
                e.getCause().getMessage());
1161
    	  sf.initCause(e);
1162
        throw sf;
1163
        
1164
    }
1165
      
1166
    return isAllowed;
1167
    
1168
  }
1169

    
1170
  /**
1171
   * Adds a new object to the Node, where the object is a science metadata object.
1172
   * 
1173
   * @param session - the Session object containing the credentials for the Subject
1174
   * @param pid - The object identifier to be created
1175
   * @param object - the object bytes
1176
   * @param sysmeta - the system metadata that describes the object  
1177
   * 
1178
   * @return pid - the object identifier created
1179
   * 
1180
   * @throws InvalidToken
1181
   * @throws ServiceFailure
1182
   * @throws NotAuthorized
1183
   * @throws IdentifierNotUnique
1184
   * @throws UnsupportedType
1185
   * @throws InsufficientResources
1186
   * @throws InvalidSystemMetadata
1187
   * @throws NotImplemented
1188
   * @throws InvalidRequest
1189
   */
1190
  public Identifier create(Session session, Identifier pid, InputStream object,
1191
    SystemMetadata sysmeta) 
1192
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1193
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1194
    NotImplemented, InvalidRequest {
1195
                  
1196
      // The lock to be used for this identifier
1197
      Lock lock = null;
1198

    
1199
      try {
1200
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1201
          // are we allowed?
1202
          boolean isAllowed = false;
1203
          CNode cn = D1Client.getCN();
1204
          NodeList nodeList = cn.listNodes();
1205
          
1206
          for (Node node : nodeList.getNodeList()) {
1207
              if ( node.getType().equals(NodeType.CN) ) {
1208
                  
1209
                  List<Subject> subjects = node.getSubjectList();
1210
                  for (Subject subject : subjects) {
1211
                     if (subject.equals(session.getSubject())) {
1212
                         isAllowed = true;
1213
                         break;
1214
                     }
1215
                  }
1216
              } else {
1217
                  
1218
              }
1219
          }
1220

    
1221
          // proceed if we're called by a CN
1222
          if ( isAllowed ) {
1223
              // create the coordinating node version of the document      
1224
              lock.lock();
1225
              logMetacat.debug("Locked identifier " + pid.getValue());
1226
              sysmeta.setSerialVersion(BigInteger.ONE);
1227
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1228
              sysmeta.setArchived(false); // this is a create op, not update
1229
              
1230
              // the CN should have set the origin and authoritative member node fields
1231
              try {
1232
                  sysmeta.getOriginMemberNode().getValue();
1233
                  sysmeta.getAuthoritativeMemberNode().getValue();
1234
                  
1235
              } catch (NullPointerException npe) {
1236
                  throw new InvalidSystemMetadata("4896", 
1237
                      "Both the origin and authoritative member node identifiers need to be set.");
1238
                  
1239
              }
1240
              pid = super.create(session, pid, object, sysmeta);
1241

    
1242
          } else {
1243
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1244
                  " isn't allowed to call create() on a Coordinating Node.";
1245
              logMetacat.info(msg);
1246
              throw new NotAuthorized("1100", msg);
1247
          }
1248
          
1249
      } catch (RuntimeException e) {
1250
          // Convert Hazelcast runtime exceptions to service failures
1251
          String msg = "There was a problem creating the object identified by " +
1252
              pid.getValue() + ". There error message was: " + e.getMessage();
1253
          throw new ServiceFailure("4893", msg);
1254
          
1255
      } finally {
1256
    	  if (lock != null) {
1257
	          lock.unlock();
1258
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1259
    	  }
1260
      }
1261
      
1262
      return pid;
1263

    
1264
  }
1265

    
1266
  /**
1267
   * Set access for a given object using the object identifier and a Subject
1268
   * under a given Session.
1269
   * 
1270
   * @param session - the Session object containing the credentials for the Subject
1271
   * @param pid - the object identifier for the given object to apply the policy
1272
   * @param policy - the access policy to be applied
1273
   * 
1274
   * @return true if the application of the policy succeeds
1275
   * @throws InvalidToken
1276
   * @throws ServiceFailure
1277
   * @throws NotFound
1278
   * @throws NotAuthorized
1279
   * @throws NotImplemented
1280
   * @throws InvalidRequest
1281
   */
1282
  public boolean setAccessPolicy(Session session, Identifier pid, 
1283
      AccessPolicy accessPolicy, long serialVersion) 
1284
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1285
      NotImplemented, InvalidRequest, VersionMismatch {
1286
      
1287
      // The lock to be used for this identifier
1288
      Lock lock = null;
1289
      SystemMetadata systemMetadata = null;
1290
      
1291
      boolean success = false;
1292
      
1293
      // get the subject
1294
      Subject subject = session.getSubject();
1295
      
1296
      // are we allowed to do this?
1297
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1298
          throw new NotAuthorized("4420", "not allowed by "
1299
                  + subject.getValue() + " on " + pid.getValue());
1300
      }
1301
      
1302
      try {
1303
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1304
          lock.lock();
1305
          logMetacat.debug("Locked identifier " + pid.getValue());
1306

    
1307
          try {
1308
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1309

    
1310
              if ( systemMetadata == null ) {
1311
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1312
                  
1313
              }
1314
              // does the request have the most current system metadata?
1315
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1316
                 String msg = "The requested system metadata version number " + 
1317
                     serialVersion + " differs from the current version at " +
1318
                     systemMetadata.getSerialVersion().longValue() +
1319
                     ". Please get the latest copy in order to modify it.";
1320
                 throw new VersionMismatch("4402", msg);
1321
                 
1322
              }
1323
              
1324
          } catch (RuntimeException e) {
1325
              // convert Hazelcast RuntimeException to NotFound
1326
              throw new NotFound("4400", "No record found for: " + pid);
1327
            
1328
          }
1329
              
1330
          // set the access policy
1331
          systemMetadata.setAccessPolicy(accessPolicy);
1332
          
1333
          // update the system metadata
1334
          try {
1335
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1336
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1337
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1338
              notifyReplicaNodes(systemMetadata);
1339
              
1340
          } catch (RuntimeException e) {
1341
              // convert Hazelcast RuntimeException to ServiceFailure
1342
              throw new ServiceFailure("4430", e.getMessage());
1343
            
1344
          }
1345
          
1346
      } catch (RuntimeException e) {
1347
          throw new ServiceFailure("4430", e.getMessage());
1348
          
1349
      } finally {
1350
          lock.unlock();
1351
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1352
        
1353
      }
1354

    
1355
    
1356
    // TODO: how do we know if the map was persisted?
1357
    success = true;
1358
    
1359
    return success;
1360
  }
1361

    
1362
  /**
1363
   * Full replacement of replication metadata in the system metadata for the 
1364
   * specified object, changes date system metadata modified
1365
   * 
1366
   * @param session - the Session object containing the credentials for the Subject
1367
   * @param pid - the object identifier for the given object to apply the policy
1368
   * @param replica - the replica to be updated
1369
   * @return
1370
   * @throws NotImplemented
1371
   * @throws NotAuthorized
1372
   * @throws ServiceFailure
1373
   * @throws InvalidRequest
1374
   * @throws NotFound
1375
   * @throws VersionMismatch
1376
   */
1377
  @Override
1378
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1379
      Replica replica, long serialVersion) 
1380
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1381
      NotFound, VersionMismatch {
1382
      
1383
      // The lock to be used for this identifier
1384
      Lock lock = null;
1385
      
1386
      // get the subject
1387
      Subject subject = session.getSubject();
1388
      
1389
      // are we allowed to do this?
1390
      try {
1391

    
1392
          // what is the controlling permission?
1393
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1394
              throw new NotAuthorized("4851", "not allowed by "
1395
                      + subject.getValue() + " on " + pid.getValue());
1396
          }
1397

    
1398
        
1399
      } catch (InvalidToken e) {
1400
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1401
                  " on " + pid.getValue());  
1402
          
1403
      }
1404

    
1405
      SystemMetadata systemMetadata = null;
1406
      try {
1407
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1408
          lock.lock();
1409
          logMetacat.debug("Locked identifier " + pid.getValue());
1410

    
1411
          try {      
1412
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1413

    
1414
              // does the request have the most current system metadata?
1415
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1416
                 String msg = "The requested system metadata version number " + 
1417
                     serialVersion + " differs from the current version at " +
1418
                     systemMetadata.getSerialVersion().longValue() +
1419
                     ". Please get the latest copy in order to modify it.";
1420
                 throw new VersionMismatch("4855", msg);
1421
              }
1422
              
1423
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1424
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1425
                  " : " + e.getMessage());
1426
            
1427
          }
1428
              
1429
          // set the status for the replica
1430
          List<Replica> replicas = systemMetadata.getReplicaList();
1431
          NodeReference replicaNode = replica.getReplicaMemberNode();
1432
          int index = 0;
1433
          for (Replica listedReplica: replicas) {
1434
              
1435
              // remove the replica that we are replacing
1436
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1437
                  replicas.remove(index);
1438
                  break;
1439
                  
1440
              }
1441
              index++;
1442
          }
1443
          
1444
          // add the new replica item
1445
          replicas.add(replica);
1446
          systemMetadata.setReplicaList(replicas);
1447
          
1448
          // update the metadata
1449
          try {
1450
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1451
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1452
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1453
            
1454
          } catch (RuntimeException e) {
1455
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1456
              throw new ServiceFailure("4852", e.getMessage());
1457
          
1458
          }
1459
          
1460
      } catch (RuntimeException e) {
1461
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1462
          throw new ServiceFailure("4852", e.getMessage());
1463
      
1464
      } finally {
1465
          lock.unlock();
1466
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1467
          
1468
      }
1469
    
1470
      return true;
1471
      
1472
  }
1473
  
1474
  /**
1475
   * 
1476
   */
1477
  @Override
1478
  public ObjectList listObjects(Session session, Date startTime, 
1479
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1480
      Integer start, Integer count)
1481
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1482
      ServiceFailure {
1483
      
1484
      ObjectList objectList = null;
1485
        try {
1486
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1487
        } catch (Exception e) {
1488
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1489
        }
1490

    
1491
        return objectList;
1492
  }
1493

    
1494
  
1495
 	/**
1496
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1497
 	 * @return cal  the list of checksum algorithms
1498
 	 * 
1499
 	 * @throws ServiceFailure
1500
 	 * @throws NotImplemented
1501
 	 */
1502
  @Override
1503
  public ChecksumAlgorithmList listChecksumAlgorithms()
1504
			throws ServiceFailure, NotImplemented {
1505
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1506
		cal.addAlgorithm("MD5");
1507
		cal.addAlgorithm("SHA-1");
1508
		return cal;
1509
		
1510
	}
1511

    
1512
  /**
1513
   * Notify replica Member Nodes of system metadata changes for a given pid
1514
   * 
1515
   * @param currentSystemMetadata - the up to date system metadata
1516
   */
1517
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1518
      
1519
      Session session = null;
1520
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1521
      MNode mn = null;
1522
      NodeReference replicaNodeRef = null;
1523
      CNode cn = null;
1524
      NodeType nodeType = null;
1525
      List<Node> nodeList = null;
1526
      
1527
      try {
1528
          cn = D1Client.getCN();
1529
          nodeList = cn.listNodes().getNodeList();
1530
          
1531
      } catch (Exception e) { // handle BaseException and other I/O issues
1532
          
1533
          // swallow errors since the call is not critical
1534
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1535
              "to communication issues with the CN: " + e.getMessage());
1536
          
1537
      }
1538
      
1539
      if ( replicaList != null ) {
1540
          
1541
          // iterate through the replicas and inform  MN replica nodes
1542
          for (Replica replica : replicaList) {
1543
              
1544
              replicaNodeRef = replica.getReplicaMemberNode();
1545
              try {
1546
                  if (nodeList != null) {
1547
                      // find the node type
1548
                      for (Node node : nodeList) {
1549
                          if (node.getIdentifier().getValue() == 
1550
                              replicaNodeRef.getValue()) {
1551
                              nodeType = node.getType();
1552
                              break;
1553
              
1554
                          }
1555
                      }
1556
                  }
1557
              
1558
                  // notify only MNs
1559
                  if (nodeType != null && nodeType == NodeType.MN) {
1560
                      mn = D1Client.getMN(replicaNodeRef);
1561
                      mn.systemMetadataChanged(session, 
1562
                          currentSystemMetadata.getIdentifier(), 
1563
                          currentSystemMetadata.getSerialVersion().longValue(),
1564
                          currentSystemMetadata.getDateSysMetadataModified());
1565
                  }
1566
              
1567
              } catch (Exception e) { // handle BaseException and other I/O issues
1568
              
1569
                  // swallow errors since the call is not critical
1570
                  logMetacat.error("Can't inform "
1571
                          + replicaNodeRef.getValue()
1572
                          + " of system metadata changes due "
1573
                          + "to communication issues with the CN: "
1574
                          + e.getMessage());
1575
              
1576
              }
1577
          }
1578
      }
1579
  }
1580
}
(1-1/5)