Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.client.MNode;
40
import org.dataone.service.cn.v1.CNAuthorization;
41
import org.dataone.service.cn.v1.CNCore;
42
import org.dataone.service.cn.v1.CNRead;
43
import org.dataone.service.cn.v1.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.Identifier;
60
import org.dataone.service.types.v1.Node;
61
import org.dataone.service.types.v1.NodeList;
62
import org.dataone.service.types.v1.NodeReference;
63
import org.dataone.service.types.v1.NodeType;
64
import org.dataone.service.types.v1.ObjectFormat;
65
import org.dataone.service.types.v1.ObjectFormatIdentifier;
66
import org.dataone.service.types.v1.ObjectFormatList;
67
import org.dataone.service.types.v1.ObjectList;
68
import org.dataone.service.types.v1.ObjectLocationList;
69
import org.dataone.service.types.v1.Permission;
70
import org.dataone.service.types.v1.Replica;
71
import org.dataone.service.types.v1.ReplicationPolicy;
72
import org.dataone.service.types.v1.ReplicationStatus;
73
import org.dataone.service.types.v1.Session;
74
import org.dataone.service.types.v1.Subject;
75
import org.dataone.service.types.v1.SystemMetadata;
76
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
77

    
78
import edu.ucsb.nceas.metacat.EventLog;
79
import edu.ucsb.nceas.metacat.IdentifierManager;
80
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
81

    
82
/**
83
 * Represents Metacat's implementation of the DataONE Coordinating Node 
84
 * service API. Methods implement the various CN* interfaces, and methods common
85
 * to both Member Node and Coordinating Node interfaces are found in the
86
 * D1NodeService super class.
87
 *
88
 */
89
public class CNodeService extends D1NodeService implements CNAuthorization,
90
    CNCore, CNRead, CNReplication {
91

    
92
  /* the logger instance */
93
  private Logger logMetacat = null;
94

    
95
  /**
96
   * singleton accessor
97
   */
98
  public static CNodeService getInstance(HttpServletRequest request) { 
99
    return new CNodeService(request);
100
  }
101
  
102
  /**
103
   * Constructor, private for singleton access
104
   */
105
  private CNodeService(HttpServletRequest request) {
106
    super(request);
107
    logMetacat = Logger.getLogger(CNodeService.class);
108
        
109
  }
110
    
111
  /**
112
   * Set the replication policy for an object given the object identifier
113
   * 
114
   * @param session - the Session object containing the credentials for the Subject
115
   * @param pid - the object identifier for the given object
116
   * @param policy - the replication policy to be applied
117
   * 
118
   * @return true or false
119
   * 
120
   * @throws NotImplemented
121
   * @throws NotAuthorized
122
   * @throws ServiceFailure
123
   * @throws InvalidRequest
124
   * @throws VersionMismatch
125
   * 
126
   */
127
  @Override
128
  public boolean setReplicationPolicy(Session session, Identifier pid,
129
      ReplicationPolicy policy, long serialVersion) 
130
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
131
      InvalidRequest, InvalidToken, VersionMismatch {
132
      
133
      // The lock to be used for this identifier
134
      Lock lock = null;
135
      
136
      // get the subject
137
      Subject subject = session.getSubject();
138
      
139
      // are we allowed to do this?
140
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
141
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
142
                  + " not allowed by " + subject.getValue() + " on "
143
                  + pid.getValue());
144
          
145
      }
146
      
147
      SystemMetadata systemMetadata = null;
148
      try {
149
          lock = HazelcastService.getInstance().getLock(pid.getValue());
150
          lock.lock();
151
          logMetacat.debug("Locked identifier " + pid.getValue());
152

    
153
          try {
154
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
155
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
156
                  
157
              }
158
              
159
              // did we get it correctly?
160
              if ( systemMetadata == null ) {
161
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
162
                  
163
              }
164

    
165
              // does the request have the most current system metadata?
166
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
167
                 String msg = "The requested system metadata version number " + 
168
                     serialVersion + " differs from the current version at " +
169
                     systemMetadata.getSerialVersion().longValue() +
170
                     ". Please get the latest copy in order to modify it.";
171
                 throw new VersionMismatch("4886", msg);
172
                 
173
              }
174
              
175
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
176
              throw new NotFound("4884", "No record found for: " + pid.getValue());
177
            
178
          }
179
          
180
          // set the new policy
181
          systemMetadata.setReplicationPolicy(policy);
182
          
183
          // update the metadata
184
          try {
185
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
186
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
187
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
188
              notifyReplicaNodes(systemMetadata);
189
              
190
          } catch (RuntimeException e) {
191
              throw new ServiceFailure("4882", e.getMessage());
192
          
193
          }
194
          
195
      } catch (RuntimeException e) {
196
          throw new ServiceFailure("4882", e.getMessage());
197
          
198
      } finally {
199
          lock.unlock();
200
          logMetacat.debug("Unlocked identifier " + pid.getValue());
201
          
202
      }
203
    
204
      return true;
205
  }
206

    
207
  /**
208
   * Deletes the replica from the given Member Node
209
   * NOTE: MN.delete() may be an "archive" operation. TBD.
210
   * @param session
211
   * @param pid
212
   * @param nodeId
213
   * @param serialVersion
214
   * @return
215
   * @throws InvalidToken
216
   * @throws ServiceFailure
217
   * @throws NotAuthorized
218
   * @throws NotFound
219
   * @throws NotImplemented
220
   * @throws VersionMismatch
221
   */
222
  @Override
223
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
224
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
225
	  
226
	  	// The lock to be used for this identifier
227
		Lock lock = null;
228

    
229
		// get the subject
230
		Subject subject = session.getSubject();
231

    
232
		// are we allowed to do this?
233
		boolean isAuthorized = false;
234
		try {
235
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
236
		} catch (InvalidRequest e) {
237
			throw new ServiceFailure("4882", e.getDescription());
238
		}
239
		if (!isAuthorized) {
240
			throw new NotAuthorized("4881", Permission.WRITE
241
					+ " not allowed by " + subject.getValue() + " on "
242
					+ pid.getValue());
243

    
244
		}
245

    
246
		SystemMetadata systemMetadata = null;
247
		try {
248
			lock = HazelcastService.getInstance().getLock(pid.getValue());
249
			lock.lock();
250
			logMetacat.debug("Locked identifier " + pid.getValue());
251

    
252
			try {
253
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
254
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
255
				}
256

    
257
				// did we get it correctly?
258
				if (systemMetadata == null) {
259
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
260
				}
261

    
262
				// does the request have the most current system metadata?
263
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
264
					String msg = "The requested system metadata version number "
265
							+ serialVersion
266
							+ " differs from the current version at "
267
							+ systemMetadata.getSerialVersion().longValue()
268
							+ ". Please get the latest copy in order to modify it.";
269
					throw new VersionMismatch("4886", msg);
270

    
271
				}
272

    
273
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
274
				throw new NotFound("4884", "No record found for: " + pid.getValue());
275

    
276
			}
277
			  
278
			// check permissions
279
			// TODO: is this necessary?
280
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
281
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
282
			if (isAllowed) {
283
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
284
			}
285
			  
286
			// delete the replica from the given node
287
			D1Client.getMN(nodeId).delete(session, pid);
288
			  
289
			// reflect that change in the system metadata
290
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
291
			for (Replica r: systemMetadata.getReplicaList()) {
292
				  if (r.getReplicaMemberNode().equals(nodeId)) {
293
					  updatedReplicas.remove(r);
294
					  break;
295
				  }
296
			}
297
			systemMetadata.setReplicaList(updatedReplicas);
298

    
299
			// update the metadata
300
			try {
301
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
302
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
303
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
304
			} catch (RuntimeException e) {
305
				throw new ServiceFailure("4882", e.getMessage());
306
			}
307

    
308
		} catch (RuntimeException e) {
309
			throw new ServiceFailure("4882", e.getMessage());
310
		} finally {
311
			lock.unlock();
312
			logMetacat.debug("Unlocked identifier " + pid.getValue());
313
		}
314

    
315
		return true;	  
316
	  
317
  }
318
  
319
  /**
320
   * Deletes an object from the Coordinating Node, where the object is a 
321
   * a science metadata object.
322
   * 
323
   * @param session - the Session object containing the credentials for the Subject
324
   * @param pid - The object identifier to be deleted
325
   * 
326
   * @return pid - the identifier of the object used for the deletion
327
   * 
328
   * @throws InvalidToken
329
   * @throws ServiceFailure
330
   * @throws NotAuthorized
331
   * @throws NotFound
332
   * @throws NotImplemented
333
   * @throws InvalidRequest
334
   */
335
  @Override
336
  public Identifier delete(Session session, Identifier pid) 
337
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
338

    
339
	  // check that it is CN/admin
340
	  boolean allowed = isAdminAuthorized(session, pid, Permission.CHANGE_PERMISSION);
341
	  
342
	  if (!allowed) {
343
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
344
		  logMetacat.info(msg);
345
		  throw new NotAuthorized("1320", msg);
346
	  }
347
	  
348
	  // defer to superclass implementation
349
      return super.delete(session, pid);
350
  }
351
  
352
  /**
353
   * Set the obsoletedBy attribute in System Metadata
354
   * @param session
355
   * @param pid
356
   * @param obsoletedByPid
357
   * @param serialVersion
358
   * @return
359
   * @throws NotImplemented
360
   * @throws NotFound
361
   * @throws NotAuthorized
362
   * @throws ServiceFailure
363
   * @throws InvalidRequest
364
   * @throws InvalidToken
365
   * @throws VersionMismatch
366
   */
367
  @Override
368
  public boolean setObsoletedBy(Session session, Identifier pid,
369
			Identifier obsoletedByPid, long serialVersion)
370
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
371
			InvalidRequest, InvalidToken, VersionMismatch {
372

    
373
		// The lock to be used for this identifier
374
		Lock lock = null;
375

    
376
		// get the subject
377
		Subject subject = session.getSubject();
378

    
379
		// are we allowed to do this?
380
		if (!isAuthorized(session, pid, Permission.WRITE)) {
381
			throw new NotAuthorized("4881", Permission.WRITE
382
					+ " not allowed by " + subject.getValue() + " on "
383
					+ pid.getValue());
384

    
385
		}
386

    
387

    
388
		SystemMetadata systemMetadata = null;
389
		try {
390
			lock = HazelcastService.getInstance().getLock(pid.getValue());
391
			lock.lock();
392
			logMetacat.debug("Locked identifier " + pid.getValue());
393

    
394
			try {
395
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
396
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
397
				}
398

    
399
				// did we get it correctly?
400
				if (systemMetadata == null) {
401
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
402
				}
403

    
404
				// does the request have the most current system metadata?
405
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
406
					String msg = "The requested system metadata version number "
407
							+ serialVersion
408
							+ " differs from the current version at "
409
							+ systemMetadata.getSerialVersion().longValue()
410
							+ ". Please get the latest copy in order to modify it.";
411
					throw new VersionMismatch("4886", msg);
412

    
413
				}
414

    
415
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
416
				throw new NotFound("4884", "No record found for: " + pid.getValue());
417

    
418
			}
419

    
420
			// set the new policy
421
			systemMetadata.setObsoletedBy(obsoletedByPid);
422

    
423
			// update the metadata
424
			try {
425
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
426
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
427
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
428
			} catch (RuntimeException e) {
429
				throw new ServiceFailure("4882", e.getMessage());
430
			}
431

    
432
		} catch (RuntimeException e) {
433
			throw new ServiceFailure("4882", e.getMessage());
434
		} finally {
435
			lock.unlock();
436
			logMetacat.debug("Unlocked identifier " + pid.getValue());
437
		}
438

    
439
		return true;
440
	}
441
  
442
  
443
  /**
444
   * Set the replication status for an object given the object identifier
445
   * 
446
   * @param session - the Session object containing the credentials for the Subject
447
   * @param pid - the object identifier for the given object
448
   * @param status - the replication status to be applied
449
   * 
450
   * @return true or false
451
   * 
452
   * @throws NotImplemented
453
   * @throws NotAuthorized
454
   * @throws ServiceFailure
455
   * @throws InvalidRequest
456
   * @throws InvalidToken
457
   * @throws NotFound
458
   * 
459
   */
460
  @Override
461
  public boolean setReplicationStatus(Session session, Identifier pid,
462
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
463
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
464
      InvalidRequest, NotFound {
465
	  
466
	  // cannot be called by public
467
	  if (session == null) {
468
		  throw new NotAuthorized("4720", "Session cannot be null");
469
	  }
470
      
471
      // The lock to be used for this identifier
472
      Lock lock = null;
473
      
474
      boolean allowed = false;
475
      int replicaEntryIndex = -1;
476
      List<Replica> replicas = null;
477
      // get the subject
478
      Subject subject = session.getSubject();
479
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
480
          " is " + status.toString());
481
      
482
      SystemMetadata systemMetadata = null;
483

    
484
      try {
485
          lock = HazelcastService.getInstance().getLock(pid.getValue());
486
          lock.lock();
487
          logMetacat.debug("Locked identifier " + pid.getValue());
488

    
489
          try {      
490
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
491

    
492
              // did we get it correctly?
493
              if ( systemMetadata == null ) {
494
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
495
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
496
                  
497
              }
498
              replicas = systemMetadata.getReplicaList();
499
              int count = 0;
500
              
501
              // was there a failure? log it
502
              if ( failure != null && status == ReplicationStatus.FAILED ) {
503
                 String msg = "The replication request of the object identified by " + 
504
                     pid.getValue() + " failed.  The error message was " +
505
                     failure.getMessage() + ".";
506
              }
507
              
508
              if (replicas.size() > 0 && replicas != null) {
509
                  // find the target replica index in the replica list
510
                  for (Replica replica : replicas) {
511
                      String replicaNodeStr = replica.getReplicaMemberNode()
512
                              .getValue();
513
                      String targetNodeStr = targetNode.getValue();
514
                      logMetacat.debug("Comparing " + replicaNodeStr + " to "
515
                              + targetNodeStr);
516
                  
517
                      if (replicaNodeStr.equals(targetNodeStr)) {
518
                          replicaEntryIndex = count;
519
                          logMetacat.debug("replica entry index is: "
520
                                  + replicaEntryIndex);
521
                          break;
522
                      }
523
                      count++;
524
                  
525
                  }
526
              }
527
              // are we allowed to do this? only CNs and target MNs are allowed
528
              CNode cn = D1Client.getCN();
529
              List<Node> nodes = cn.listNodes().getNodeList();
530
              
531
              // find the node in the node list
532
              for ( Node node : nodes ) {
533
                  
534
                  NodeReference nodeReference = node.getIdentifier();
535
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
536
                  
537
                  // allow target MN certs
538
                  if (targetNode.getValue().equals(nodeReference.getValue()) &&
539
                      node.getType() == NodeType.MN) {
540
                      List<Subject> nodeSubjects = node.getSubjectList();
541
                      
542
                      // check if the session subject is in the node subject list
543
                      for (Subject nodeSubject : nodeSubjects) {
544
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
545
                                  nodeSubject.getValue() + " and " + subject.getValue());
546
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
547
                              
548
                              // lastly limit to COMPLETED and FAILED status updates from MNs only
549
                              if ( status == ReplicationStatus.COMPLETED ||
550
                                   status == ReplicationStatus.FAILED) {
551
                                  allowed = true;
552
                                  break;
553
                                  
554
                              }                              
555
                          }
556
                      }                 
557
                  }
558
              }
559

    
560
              if ( !allowed ) {
561
                  //check for CN admin access
562
                  allowed = isAuthorized(session, pid, Permission.WRITE);
563
                  
564
              }              
565
              
566
              if ( !allowed ) {
567
                  String msg = "The subject identified by "
568
                          + subject.getValue()
569
                          + " does not have permission to set the replication status for "
570
                          + "the replica identified by "
571
                          + targetNode.getValue() + ".";
572
                  logMetacat.info(msg);
573
                  throw new NotAuthorized("4720", msg);
574
                  
575
              }
576

    
577
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
578
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
579
                " : " + e.getMessage());
580
            
581
          }
582
          
583
          Replica targetReplica = new Replica();
584
          // set the status for the replica
585
          if ( replicaEntryIndex != -1 ) {
586
              targetReplica = replicas.get(replicaEntryIndex);
587
              targetReplica.setReplicationStatus(status);
588
              logMetacat.debug("Set the replication status for " + 
589
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
590
                  targetReplica.getReplicationStatus());
591
              
592
          } else {
593
              // this is a new entry, create it
594
              targetReplica.setReplicaMemberNode(targetNode);
595
              targetReplica.setReplicationStatus(status);
596
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
597
              replicas.add(targetReplica);
598
              
599
          }
600
          
601
          systemMetadata.setReplicaList(replicas);
602
                
603
          // update the metadata
604
          try {
605
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
606
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
607
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
608
              
609
              if ( status == ReplicationStatus.FAILED && failure != null ) {
610
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
611
                      " on target node " + targetNode + ". The exception was: " +
612
                      failure.getMessage());
613
              }
614
          } catch (RuntimeException e) {
615
              throw new ServiceFailure("4700", e.getMessage());
616
          
617
          }
618
          
619
    } catch (RuntimeException e) {
620
        String msg = "There was a RuntimeException getting the lock for " +
621
            pid.getValue();
622
        logMetacat.info(msg);
623
        
624
    } finally {
625
        lock.unlock();
626
        logMetacat.debug("Unlocked identifier " + pid.getValue());
627
        
628
    }
629
      
630
      return true;
631
  }
632
  
633
  /**
634
   * Return the checksum of the object given the identifier 
635
   * 
636
   * @param session - the Session object containing the credentials for the Subject
637
   * @param pid - the object identifier for the given object
638
   * 
639
   * @return checksum - the checksum of the object
640
   * 
641
   * @throws InvalidToken
642
   * @throws ServiceFailure
643
   * @throws NotAuthorized
644
   * @throws NotFound
645
   * @throws NotImplemented
646
   */
647
  @Override
648
  public Checksum getChecksum(Session session, Identifier pid)
649
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
650
    NotImplemented {
651
    
652
	boolean isAuthorized = false;
653
	try {
654
		isAuthorized = isAuthorized(session, pid, Permission.READ);
655
	} catch (InvalidRequest e) {
656
		throw new ServiceFailure("1410", e.getDescription());
657
	}  
658
    if (!isAuthorized) {
659
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
660
    }
661
    
662
    SystemMetadata systemMetadata = null;
663
    Checksum checksum = null;
664
    
665
    try {
666
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
667

    
668
        if (systemMetadata == null ) {
669
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
670
        }
671
        checksum = systemMetadata.getChecksum();
672
        
673
    } catch (RuntimeException e) {
674
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
675
            pid.getValue() + ". The error message was: " + e.getMessage());
676
      
677
    }
678
    
679
    return checksum;
680
  }
681

    
682
  /**
683
   * Resolve the location of a given object
684
   * 
685
   * @param session - the Session object containing the credentials for the Subject
686
   * @param pid - the object identifier for the given object
687
   * 
688
   * @return objectLocationList - the list of nodes known to contain the object
689
   * 
690
   * @throws InvalidToken
691
   * @throws ServiceFailure
692
   * @throws NotAuthorized
693
   * @throws NotFound
694
   * @throws NotImplemented
695
   */
696
  @Override
697
  public ObjectLocationList resolve(Session session, Identifier pid)
698
    throws InvalidToken, ServiceFailure, NotAuthorized,
699
    NotFound, NotImplemented {
700

    
701
    throw new NotImplemented("4131", "resolve not implemented");
702

    
703
  }
704

    
705
  /**
706
   * Search the metadata catalog for identifiers that match the criteria
707
   * 
708
   * @param session - the Session object containing the credentials for the Subject
709
   * @param queryType - An identifier for the type of query expression 
710
   *                    provided in the query
711
   * @param query -  The criteria for matching the characteristics of the 
712
   *                 metadata objects of interest
713
   * 
714
   * @return objectList - the list of objects matching the criteria
715
   * 
716
   * @throws InvalidToken
717
   * @throws ServiceFailure
718
   * @throws NotAuthorized
719
   * @throws InvalidRequest
720
   * @throws NotImplemented
721
   */
722
  @Override
723
  public ObjectList search(Session session, String queryType, String query)
724
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
725
    NotImplemented {
726

    
727
    ObjectList objectList = null;
728
    try {
729
        objectList = 
730
          IdentifierManager.getInstance().querySystemMetadata(
731
              null, //startTime, 
732
              null, //endTime,
733
              null, //objectFormat, 
734
              false, //replicaStatus, 
735
              0, //start, 
736
              -1 //count
737
              );
738
        
739
    } catch (Exception e) {
740
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
741
    }
742

    
743
      return objectList;
744
      
745
    //throw new NotImplemented("4281", "search not implemented");
746
    
747
    // the code block below is from an older implementation
748
    
749
    /*  This block commented out because of the EcoGrid circular dependency.
750
         *  For now, query will not be supported until the circularity can be
751
         *  resolved, probably by moving the ecogrid query syntax transformers
752
         *  directly into the Metacat codebase.  MBJ 2010-02-03
753
         
754
        try {
755
            EcogridQueryParser parser = new EcogridQueryParser(request
756
                    .getReader());
757
            parser.parseXML();
758
            QueryType queryType = parser.getEcogridQuery();
759
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
760
                new EcogridJavaToMetacatJavaQueryTransformer();
761
            QuerySpecification metacatQuery = queryTransformer
762
                    .transform(queryType);
763

    
764
            DBQuery metacat = new DBQuery();
765

    
766
            boolean useXMLIndex = (new Boolean(PropertyService
767
                    .getProperty("database.usexmlindex"))).booleanValue();
768
            String xmlquery = "query"; // we don't care the query in resultset,
769
            // the query can be anything
770
            PrintWriter out = null; // we don't want metacat result, so set out null
771

    
772
            // parameter: queryspecification, user, group, usingIndexOrNot
773
            StringBuffer result = metacat.createResultDocument(xmlquery,
774
                    metacatQuery, out, username, groupNames, useXMLIndex);
775

    
776
            // create result set transfer       
777
            String saxparser = PropertyService.getProperty("xml.saxparser");
778
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
779
                    new StringReader(result.toString()), saxparser, queryType
780
                            .getNamespace().get_value());
781
            ResultsetType records = metacatResultsetParser.getEcogridResult();
782

    
783
            System.out
784
                    .println(EcogridResultsetTransformer.toXMLString(records));
785
            response.setContentType("text/xml");
786
            out = response.getWriter();
787
            out.print(EcogridResultsetTransformer.toXMLString(records));
788

    
789
        } catch (Exception e) {
790
            e.printStackTrace();
791
        }*/
792
    
793

    
794
  }
795
  
796
  /**
797
   * Returns the object format registered in the DataONE Object Format 
798
   * Vocabulary for the given format identifier
799
   * 
800
   * @param fmtid - the identifier of the format requested
801
   * 
802
   * @return objectFormat - the object format requested
803
   * 
804
   * @throws ServiceFailure
805
   * @throws NotFound
806
   * @throws InsufficientResources
807
   * @throws NotImplemented
808
   */
809
  @Override
810
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
811
    throws ServiceFailure, NotFound, NotImplemented {
812
     
813
      return ObjectFormatService.getInstance().getFormat(fmtid);
814
      
815
  }
816

    
817
  /**
818
   * Returns a list of all object formats registered in the DataONE Object 
819
   * Format Vocabulary
820
    * 
821
   * @return objectFormatList - The list of object formats registered in 
822
   *                            the DataONE Object Format Vocabulary
823
   * 
824
   * @throws ServiceFailure
825
   * @throws NotImplemented
826
   * @throws InsufficientResources
827
   */
828
  @Override
829
  public ObjectFormatList listFormats() 
830
    throws ServiceFailure, NotImplemented {
831

    
832
    return ObjectFormatService.getInstance().listFormats();
833
  }
834

    
835
  /**
836
   * Returns a list of nodes that have been registered with the DataONE infrastructure
837
    * 
838
   * @return nodeList - List of nodes from the registry
839
   * 
840
   * @throws ServiceFailure
841
   * @throws NotImplemented
842
   */
843
  @Override
844
  public NodeList listNodes() 
845
    throws NotImplemented, ServiceFailure {
846

    
847
    throw new NotImplemented("4800", "listNodes not implemented");
848
  }
849

    
850
  /**
851
   * Provides a mechanism for adding system metadata independently of its 
852
   * associated object, such as when adding system metadata for data objects.
853
    * 
854
   * @param session - the Session object containing the credentials for the Subject
855
   * @param pid - The identifier of the object to register the system metadata against
856
   * @param sysmeta - The system metadata to be registered
857
   * 
858
   * @return true if the registration succeeds
859
   * 
860
   * @throws NotImplemented
861
   * @throws NotAuthorized
862
   * @throws ServiceFailure
863
   * @throws InvalidRequest
864
   * @throws InvalidSystemMetadata
865
   */
866
  @Override
867
  public Identifier registerSystemMetadata(Session session, Identifier pid,
868
      SystemMetadata sysmeta) 
869
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
870
      InvalidSystemMetadata {
871

    
872
      // The lock to be used for this identifier
873
      Lock lock = null;
874

    
875
      // TODO: control who can call this?
876
      if (session == null) {
877
          //TODO: many of the thrown exceptions do not use the correct error codes
878
          //check these against the docs and correct them
879
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
880
                  "  If you are not logged in, please do so and retry the request.");
881
      }
882
      
883
      // verify that guid == SystemMetadata.getIdentifier()
884
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
885
          "|" + sysmeta.getIdentifier().getValue());
886
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
887
          throw new InvalidRequest("4863", 
888
              "The identifier in method call (" + pid.getValue() + 
889
              ") does not match identifier in system metadata (" +
890
              sysmeta.getIdentifier().getValue() + ").");
891
      }
892

    
893
      try {
894
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
895
          lock.lock();
896
          logMetacat.debug("Locked identifier " + pid.getValue());
897
          logMetacat.debug("Checking if identifier exists...");
898
          // Check that the identifier does not already exist
899
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
900
              throw new InvalidRequest("4863", 
901
                  "The identifier is already in use by an existing object.");
902
          
903
          }
904
          
905
          // insert the system metadata into the object store
906
          logMetacat.debug("Starting to insert SystemMetadata...");
907
          try {
908
              sysmeta.setSerialVersion(BigInteger.ONE);
909
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
910
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
911
              
912
          } catch (RuntimeException e) {
913
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
914
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
915
                  e.getClass() + ": " + e.getMessage());
916
              
917
          }
918
          
919
      } catch (RuntimeException e) {
920
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
921
                  e.getClass() + ": " + e.getMessage());
922
          
923
      }  finally {
924
          lock.unlock();
925
          logMetacat.debug("Unlocked identifier " + pid.getValue());
926
          
927
      }
928

    
929
      
930
      logMetacat.debug("Returning from registerSystemMetadata");
931
      EventLog.getInstance().log(request.getRemoteAddr(), 
932
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
933
          pid.getValue(), "registerSystemMetadata");
934
      return pid;
935
  }
936
  
937
  /**
938
   * Given an optional scope and format, reserves and returns an identifier 
939
   * within that scope and format that is unique and will not be 
940
   * used by any other sessions. 
941
    * 
942
   * @param session - the Session object containing the credentials for the Subject
943
   * @param pid - The identifier of the object to register the system metadata against
944
   * @param scope - An optional string to be used to qualify the scope of 
945
   *                the identifier namespace, which is applied differently 
946
   *                depending on the format requested. If scope is not 
947
   *                supplied, a default scope will be used.
948
   * @param format - The optional name of the identifier format to be used, 
949
   *                  drawn from a DataONE-specific vocabulary of identifier 
950
   *                 format names, including several common syntaxes such 
951
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
952
   *                 format is not supplied by the caller, the CN service 
953
   *                 will use a default identifier format, which may change 
954
   *                 over time.
955
   * 
956
   * @return true if the registration succeeds
957
   * 
958
   * @throws InvalidToken
959
   * @throws ServiceFailure
960
   * @throws NotAuthorized
961
   * @throws IdentifierNotUnique
962
   * @throws NotImplemented
963
   */
964
  @Override
965
  public Identifier reserveIdentifier(Session session, Identifier pid)
966
  throws InvalidToken, ServiceFailure,
967
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
968

    
969
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
970
  }
971
  
972
  @Override
973
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
974
  throws InvalidToken, ServiceFailure,
975
        NotAuthorized, NotImplemented, InvalidRequest {
976
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
977
  }
978
  
979
  /**
980
    * Checks whether the pid is reserved by the subject in the session param
981
    * If the reservation is held on the pid by the subject, we return true.
982
    * 
983
   * @param session - the Session object containing the Subject
984
   * @param pid - The identifier to check
985
   * 
986
   * @return true if the reservation exists for the subject/pid
987
   * 
988
   * @throws InvalidToken
989
   * @throws ServiceFailure
990
   * @throws NotFound - when the pid is not found (in use or in reservation)
991
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
992
   * @throws IdentifierNotUnique - when the pid is in use
993
   * @throws NotImplemented
994
   */
995

    
996
  @Override
997
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
998
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
999
      NotImplemented, InvalidRequest {
1000
  
1001
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1002
  }
1003

    
1004
  /**
1005
   * Changes ownership (RightsHolder) of the specified object to the 
1006
   * subject specified by userId
1007
    * 
1008
   * @param session - the Session object containing the credentials for the Subject
1009
   * @param pid - Identifier of the object to be modified
1010
   * @param userId - The subject that will be taking ownership of the specified object.
1011
   *
1012
   * @return pid - the identifier of the modified object
1013
   * 
1014
   * @throws ServiceFailure
1015
   * @throws InvalidToken
1016
   * @throws NotFound
1017
   * @throws NotAuthorized
1018
   * @throws NotImplemented
1019
   * @throws InvalidRequest
1020
   */  
1021
  @Override
1022
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1023
      long serialVersion)
1024
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1025
      NotImplemented, InvalidRequest, VersionMismatch {
1026
      
1027
      // The lock to be used for this identifier
1028
      Lock lock = null;
1029

    
1030
      // get the subject
1031
      Subject subject = session.getSubject();
1032
      
1033
      // are we allowed to do this?
1034
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1035
          throw new NotAuthorized("4440", "not allowed by "
1036
                  + subject.getValue() + " on " + pid.getValue());
1037
          
1038
      }
1039
      
1040
      SystemMetadata systemMetadata = null;
1041
      try {
1042
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1043
          logMetacat.debug("Locked identifier " + pid.getValue());
1044

    
1045
          try {
1046
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1047
              
1048
              // does the request have the most current system metadata?
1049
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1050
                 String msg = "The requested system metadata version number " + 
1051
                     serialVersion + " differs from the current version at " +
1052
                     systemMetadata.getSerialVersion().longValue() +
1053
                     ". Please get the latest copy in order to modify it.";
1054
                 throw new VersionMismatch("4443", msg);
1055
              }
1056
              
1057
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1058
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1059
              
1060
          }
1061
              
1062
          // set the new rights holder
1063
          systemMetadata.setRightsHolder(userId);
1064
          
1065
          // update the metadata
1066
          try {
1067
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1068
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1069
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1070
              notifyReplicaNodes(systemMetadata);
1071
              
1072
          } catch (RuntimeException e) {
1073
              throw new ServiceFailure("4490", e.getMessage());
1074
          
1075
          }
1076
          
1077
      } catch (RuntimeException e) {
1078
          throw new ServiceFailure("4490", e.getMessage());
1079
          
1080
      } finally {
1081
          lock.unlock();
1082
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1083
      
1084
      }
1085
      
1086
      return pid;
1087
  }
1088

    
1089
  /**
1090
   * Verify that a replication task is authorized by comparing the target node's
1091
   * Subject (from the X.509 certificate-derived Session) with the list of 
1092
   * subjects in the known, pending replication tasks map.
1093
   * 
1094
   * @param originatingNodeSession - Session information that contains the 
1095
   *                                 identity of the calling user
1096
   * @param targetNodeSubject - Subject identifying the target node
1097
   * @param pid - the identifier of the object to be replicated
1098
   * @param replicatePermission - the execute permission to be granted
1099
   * 
1100
   * @throws ServiceFailure
1101
   * @throws NotImplemented
1102
   * @throws InvalidToken
1103
   * @throws NotAuthorized
1104
   * @throws InvalidRequest
1105
   * @throws NotFound
1106
   */
1107
  @Override
1108
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1109
    Subject targetNodeSubject, Identifier pid) 
1110
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1111
    NotFound, InvalidRequest {
1112
    
1113
    boolean isAllowed = false;
1114
    SystemMetadata sysmeta = null;
1115
    NodeReference targetNode = null;
1116
    
1117
    try {
1118
      // get the target node reference from the nodes list
1119
      CNode cn = D1Client.getCN();
1120
      List<Node> nodes = cn.listNodes().getNodeList();
1121
      
1122
      if ( nodes != null ) {
1123
        for (Node node : nodes) {
1124
            
1125
            for (Subject nodeSubject : node.getSubjectList()) {
1126
                
1127
                if ( nodeSubject.equals(targetNodeSubject) ) {
1128
                    targetNode = node.getIdentifier();
1129
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1130
                    break;
1131
                }
1132
            }
1133
            
1134
            if ( targetNode != null) { break; }
1135
        }
1136
        
1137
      } else {
1138
          String msg = "Couldn't get the node list from the CN";
1139
          logMetacat.debug(msg);
1140
          throw new ServiceFailure("4872", msg);
1141
          
1142
      }
1143
      
1144
      // can't find a node listed with the given subject
1145
      if ( targetNode == null ) {
1146
          String msg = "There is no Member Node registered with a node subject " +
1147
              "matching " + targetNodeSubject.getValue();
1148
          logMetacat.info(msg);
1149
          throw new NotAuthorized("4871", msg);
1150
          
1151
      }
1152
      
1153
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1154
      
1155
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1156

    
1157
      if ( sysmeta != null ) {
1158
          
1159
          List<Replica> replicaList = sysmeta.getReplicaList();
1160
          
1161
          if ( replicaList != null ) {
1162
              
1163
              // find the replica with the status set to 'requested'
1164
              for (Replica replica : replicaList) {
1165
                  ReplicationStatus status = replica.getReplicationStatus();
1166
                  NodeReference listedNode = replica.getReplicaMemberNode();
1167
                  if ( listedNode != null && targetNode != null ) {
1168
                      logMetacat.debug("Comparing " + listedNode.getValue()
1169
                              + " to " + targetNode.getValue());
1170
                      
1171
                      if (listedNode.getValue().equals(targetNode.getValue())
1172
                              && status.equals(ReplicationStatus.REQUESTED)) {
1173
                          isAllowed = true;
1174
                          break;
1175

    
1176
                      }
1177
                  }
1178
              }
1179
          }
1180
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1181
              "to replicate: " + isAllowed + " for " + pid.getValue());
1182

    
1183
          
1184
      } else {
1185
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1186
          " is null.");          
1187
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1188
          
1189
      }
1190

    
1191
    } catch (RuntimeException e) {
1192
    	  ServiceFailure sf = new ServiceFailure("4872", 
1193
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1194
                e.getCause().getMessage());
1195
    	  sf.initCause(e);
1196
        throw sf;
1197
        
1198
    }
1199
      
1200
    return isAllowed;
1201
    
1202
  }
1203

    
1204
  /**
1205
   * Adds a new object to the Node, where the object is a science metadata object.
1206
   * 
1207
   * @param session - the Session object containing the credentials for the Subject
1208
   * @param pid - The object identifier to be created
1209
   * @param object - the object bytes
1210
   * @param sysmeta - the system metadata that describes the object  
1211
   * 
1212
   * @return pid - the object identifier created
1213
   * 
1214
   * @throws InvalidToken
1215
   * @throws ServiceFailure
1216
   * @throws NotAuthorized
1217
   * @throws IdentifierNotUnique
1218
   * @throws UnsupportedType
1219
   * @throws InsufficientResources
1220
   * @throws InvalidSystemMetadata
1221
   * @throws NotImplemented
1222
   * @throws InvalidRequest
1223
   */
1224
  public Identifier create(Session session, Identifier pid, InputStream object,
1225
    SystemMetadata sysmeta) 
1226
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1227
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1228
    NotImplemented, InvalidRequest {
1229
                  
1230
      // The lock to be used for this identifier
1231
      Lock lock = null;
1232

    
1233
      try {
1234
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1235
          // are we allowed?
1236
          boolean isAllowed = false;
1237
          isAllowed = isAdminAuthorized(session, pid, Permission.WRITE);
1238

    
1239
          // proceed if we're called by a CN
1240
          if ( isAllowed ) {
1241
              // create the coordinating node version of the document      
1242
              lock.lock();
1243
              logMetacat.debug("Locked identifier " + pid.getValue());
1244
              sysmeta.setSerialVersion(BigInteger.ONE);
1245
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1246
              sysmeta.setArchived(false); // this is a create op, not update
1247
              
1248
              // the CN should have set the origin and authoritative member node fields
1249
              try {
1250
                  sysmeta.getOriginMemberNode().getValue();
1251
                  sysmeta.getAuthoritativeMemberNode().getValue();
1252
                  
1253
              } catch (NullPointerException npe) {
1254
                  throw new InvalidSystemMetadata("4896", 
1255
                      "Both the origin and authoritative member node identifiers need to be set.");
1256
                  
1257
              }
1258
              pid = super.create(session, pid, object, sysmeta);
1259

    
1260
          } else {
1261
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1262
                  " isn't allowed to call create() on a Coordinating Node.";
1263
              logMetacat.info(msg);
1264
              throw new NotAuthorized("1100", msg);
1265
          }
1266
          
1267
      } catch (RuntimeException e) {
1268
          // Convert Hazelcast runtime exceptions to service failures
1269
          String msg = "There was a problem creating the object identified by " +
1270
              pid.getValue() + ". There error message was: " + e.getMessage();
1271
          throw new ServiceFailure("4893", msg);
1272
          
1273
      } finally {
1274
    	  if (lock != null) {
1275
	          lock.unlock();
1276
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1277
    	  }
1278
      }
1279
      
1280
      return pid;
1281

    
1282
  }
1283

    
1284
  /**
1285
   * Set access for a given object using the object identifier and a Subject
1286
   * under a given Session.
1287
   * 
1288
   * @param session - the Session object containing the credentials for the Subject
1289
   * @param pid - the object identifier for the given object to apply the policy
1290
   * @param policy - the access policy to be applied
1291
   * 
1292
   * @return true if the application of the policy succeeds
1293
   * @throws InvalidToken
1294
   * @throws ServiceFailure
1295
   * @throws NotFound
1296
   * @throws NotAuthorized
1297
   * @throws NotImplemented
1298
   * @throws InvalidRequest
1299
   */
1300
  public boolean setAccessPolicy(Session session, Identifier pid, 
1301
      AccessPolicy accessPolicy, long serialVersion) 
1302
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1303
      NotImplemented, InvalidRequest, VersionMismatch {
1304
      
1305
      // The lock to be used for this identifier
1306
      Lock lock = null;
1307
      SystemMetadata systemMetadata = null;
1308
      
1309
      boolean success = false;
1310
      
1311
      // get the subject
1312
      Subject subject = session.getSubject();
1313
      
1314
      // are we allowed to do this?
1315
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1316
          throw new NotAuthorized("4420", "not allowed by "
1317
                  + subject.getValue() + " on " + pid.getValue());
1318
      }
1319
      
1320
      try {
1321
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1322
          lock.lock();
1323
          logMetacat.debug("Locked identifier " + pid.getValue());
1324

    
1325
          try {
1326
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1327

    
1328
              if ( systemMetadata == null ) {
1329
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1330
                  
1331
              }
1332
              // does the request have the most current system metadata?
1333
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1334
                 String msg = "The requested system metadata version number " + 
1335
                     serialVersion + " differs from the current version at " +
1336
                     systemMetadata.getSerialVersion().longValue() +
1337
                     ". Please get the latest copy in order to modify it.";
1338
                 throw new VersionMismatch("4402", msg);
1339
                 
1340
              }
1341
              
1342
          } catch (RuntimeException e) {
1343
              // convert Hazelcast RuntimeException to NotFound
1344
              throw new NotFound("4400", "No record found for: " + pid);
1345
            
1346
          }
1347
              
1348
          // set the access policy
1349
          systemMetadata.setAccessPolicy(accessPolicy);
1350
          
1351
          // update the system metadata
1352
          try {
1353
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1354
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1355
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1356
              notifyReplicaNodes(systemMetadata);
1357
              
1358
          } catch (RuntimeException e) {
1359
              // convert Hazelcast RuntimeException to ServiceFailure
1360
              throw new ServiceFailure("4430", e.getMessage());
1361
            
1362
          }
1363
          
1364
      } catch (RuntimeException e) {
1365
          throw new ServiceFailure("4430", e.getMessage());
1366
          
1367
      } finally {
1368
          lock.unlock();
1369
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1370
        
1371
      }
1372

    
1373
    
1374
    // TODO: how do we know if the map was persisted?
1375
    success = true;
1376
    
1377
    return success;
1378
  }
1379

    
1380
  /**
1381
   * Full replacement of replication metadata in the system metadata for the 
1382
   * specified object, changes date system metadata modified
1383
   * 
1384
   * @param session - the Session object containing the credentials for the Subject
1385
   * @param pid - the object identifier for the given object to apply the policy
1386
   * @param replica - the replica to be updated
1387
   * @return
1388
   * @throws NotImplemented
1389
   * @throws NotAuthorized
1390
   * @throws ServiceFailure
1391
   * @throws InvalidRequest
1392
   * @throws NotFound
1393
   * @throws VersionMismatch
1394
   */
1395
  @Override
1396
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1397
      Replica replica, long serialVersion) 
1398
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1399
      NotFound, VersionMismatch {
1400
      
1401
      // The lock to be used for this identifier
1402
      Lock lock = null;
1403
      
1404
      // get the subject
1405
      Subject subject = session.getSubject();
1406
      
1407
      // are we allowed to do this?
1408
      try {
1409

    
1410
          // what is the controlling permission?
1411
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1412
              throw new NotAuthorized("4851", "not allowed by "
1413
                      + subject.getValue() + " on " + pid.getValue());
1414
          }
1415

    
1416
        
1417
      } catch (InvalidToken e) {
1418
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1419
                  " on " + pid.getValue());  
1420
          
1421
      }
1422

    
1423
      SystemMetadata systemMetadata = null;
1424
      try {
1425
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1426
          lock.lock();
1427
          logMetacat.debug("Locked identifier " + pid.getValue());
1428

    
1429
          try {      
1430
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1431

    
1432
              // does the request have the most current system metadata?
1433
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1434
                 String msg = "The requested system metadata version number " + 
1435
                     serialVersion + " differs from the current version at " +
1436
                     systemMetadata.getSerialVersion().longValue() +
1437
                     ". Please get the latest copy in order to modify it.";
1438
                 throw new VersionMismatch("4855", msg);
1439
              }
1440
              
1441
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1442
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1443
                  " : " + e.getMessage());
1444
            
1445
          }
1446
              
1447
          // set the status for the replica
1448
          List<Replica> replicas = systemMetadata.getReplicaList();
1449
          NodeReference replicaNode = replica.getReplicaMemberNode();
1450
          int index = 0;
1451
          for (Replica listedReplica: replicas) {
1452
              
1453
              // remove the replica that we are replacing
1454
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1455
                  replicas.remove(index);
1456
                  break;
1457
                  
1458
              }
1459
              index++;
1460
          }
1461
          
1462
          // add the new replica item
1463
          replicas.add(replica);
1464
          systemMetadata.setReplicaList(replicas);
1465
          
1466
          // update the metadata
1467
          try {
1468
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1469
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1470
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1471
            
1472
          } catch (RuntimeException e) {
1473
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1474
              throw new ServiceFailure("4852", e.getMessage());
1475
          
1476
          }
1477
          
1478
      } catch (RuntimeException e) {
1479
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1480
          throw new ServiceFailure("4852", e.getMessage());
1481
      
1482
      } finally {
1483
          lock.unlock();
1484
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1485
          
1486
      }
1487
    
1488
      return true;
1489
      
1490
  }
1491
  
1492
  /**
1493
   * 
1494
   */
1495
  @Override
1496
  public ObjectList listObjects(Session session, Date startTime, 
1497
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1498
      Integer start, Integer count)
1499
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1500
      ServiceFailure {
1501
      
1502
      ObjectList objectList = null;
1503
        try {
1504
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1505
        } catch (Exception e) {
1506
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1507
        }
1508

    
1509
        return objectList;
1510
  }
1511

    
1512
  
1513
 	/**
1514
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1515
 	 * @return cal  the list of checksum algorithms
1516
 	 * 
1517
 	 * @throws ServiceFailure
1518
 	 * @throws NotImplemented
1519
 	 */
1520
  @Override
1521
  public ChecksumAlgorithmList listChecksumAlgorithms()
1522
			throws ServiceFailure, NotImplemented {
1523
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1524
		cal.addAlgorithm("MD5");
1525
		cal.addAlgorithm("SHA-1");
1526
		return cal;
1527
		
1528
	}
1529

    
1530
  /**
1531
   * Notify replica Member Nodes of system metadata changes for a given pid
1532
   * 
1533
   * @param currentSystemMetadata - the up to date system metadata
1534
   */
1535
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1536
      
1537
      Session session = null;
1538
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1539
      MNode mn = null;
1540
      NodeReference replicaNodeRef = null;
1541
      CNode cn = null;
1542
      NodeType nodeType = null;
1543
      List<Node> nodeList = null;
1544
      
1545
      try {
1546
          cn = D1Client.getCN();
1547
          nodeList = cn.listNodes().getNodeList();
1548
          
1549
      } catch (Exception e) { // handle BaseException and other I/O issues
1550
          
1551
          // swallow errors since the call is not critical
1552
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1553
              "to communication issues with the CN: " + e.getMessage());
1554
          
1555
      }
1556
      
1557
      if ( replicaList != null ) {
1558
          
1559
          // iterate through the replicas and inform  MN replica nodes
1560
          for (Replica replica : replicaList) {
1561
              
1562
              replicaNodeRef = replica.getReplicaMemberNode();
1563
              try {
1564
                  if (nodeList != null) {
1565
                      // find the node type
1566
                      for (Node node : nodeList) {
1567
                          if (node.getIdentifier().getValue() == 
1568
                              replicaNodeRef.getValue()) {
1569
                              nodeType = node.getType();
1570
                              break;
1571
              
1572
                          }
1573
                      }
1574
                  }
1575
              
1576
                  // notify only MNs
1577
                  if (nodeType != null && nodeType == NodeType.MN) {
1578
                      mn = D1Client.getMN(replicaNodeRef);
1579
                      mn.systemMetadataChanged(session, 
1580
                          currentSystemMetadata.getIdentifier(), 
1581
                          currentSystemMetadata.getSerialVersion().longValue(),
1582
                          currentSystemMetadata.getDateSysMetadataModified());
1583
                  }
1584
              
1585
              } catch (Exception e) { // handle BaseException and other I/O issues
1586
              
1587
                  // swallow errors since the call is not critical
1588
                  logMetacat.error("Can't inform "
1589
                          + replicaNodeRef.getValue()
1590
                          + " of system metadata changes due "
1591
                          + "to communication issues with the CN: "
1592
                          + e.getMessage());
1593
              
1594
              }
1595
          }
1596
      }
1597
  }
1598
}
(1-1/5)