Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.client.MNode;
40
import org.dataone.service.cn.v1.CNAuthorization;
41
import org.dataone.service.cn.v1.CNCore;
42
import org.dataone.service.cn.v1.CNRead;
43
import org.dataone.service.cn.v1.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.DescribeResponse;
60
import org.dataone.service.types.v1.Event;
61
import org.dataone.service.types.v1.Identifier;
62
import org.dataone.service.types.v1.Log;
63
import org.dataone.service.types.v1.Node;
64
import org.dataone.service.types.v1.NodeList;
65
import org.dataone.service.types.v1.NodeReference;
66
import org.dataone.service.types.v1.NodeType;
67
import org.dataone.service.types.v1.ObjectFormat;
68
import org.dataone.service.types.v1.ObjectFormatIdentifier;
69
import org.dataone.service.types.v1.ObjectFormatList;
70
import org.dataone.service.types.v1.ObjectList;
71
import org.dataone.service.types.v1.ObjectLocationList;
72
import org.dataone.service.types.v1.Permission;
73
import org.dataone.service.types.v1.Replica;
74
import org.dataone.service.types.v1.ReplicationPolicy;
75
import org.dataone.service.types.v1.ReplicationStatus;
76
import org.dataone.service.types.v1.Session;
77
import org.dataone.service.types.v1.Subject;
78
import org.dataone.service.types.v1.SystemMetadata;
79
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
80
import org.dataone.service.types.v1_1.QueryEngineDescription;
81
import org.dataone.service.types.v1_1.QueryEngineList;
82

    
83
import edu.ucsb.nceas.metacat.EventLog;
84
import edu.ucsb.nceas.metacat.IdentifierManager;
85
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
86

    
87
/**
88
 * Represents Metacat's implementation of the DataONE Coordinating Node 
89
 * service API. Methods implement the various CN* interfaces, and methods common
90
 * to both Member Node and Coordinating Node interfaces are found in the
91
 * D1NodeService super class.
92
 *
93
 */
94
public class CNodeService extends D1NodeService implements CNAuthorization,
95
    CNCore, CNRead, CNReplication {
96

    
97
  /* the logger instance */
98
  private Logger logMetacat = null;
99

    
100
  /**
101
   * singleton accessor
102
   */
103
  public static CNodeService getInstance(HttpServletRequest request) { 
104
    return new CNodeService(request);
105
  }
106
  
107
  /**
108
   * Constructor, private for singleton access
109
   */
110
  private CNodeService(HttpServletRequest request) {
111
    super(request);
112
    logMetacat = Logger.getLogger(CNodeService.class);
113
        
114
  }
115
    
116
  /**
117
   * Set the replication policy for an object given the object identifier
118
   * 
119
   * @param session - the Session object containing the credentials for the Subject
120
   * @param pid - the object identifier for the given object
121
   * @param policy - the replication policy to be applied
122
   * 
123
   * @return true or false
124
   * 
125
   * @throws NotImplemented
126
   * @throws NotAuthorized
127
   * @throws ServiceFailure
128
   * @throws InvalidRequest
129
   * @throws VersionMismatch
130
   * 
131
   */
132
  @Override
133
  public boolean setReplicationPolicy(Session session, Identifier pid,
134
      ReplicationPolicy policy, long serialVersion) 
135
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
136
      InvalidRequest, InvalidToken, VersionMismatch {
137
      
138
      // The lock to be used for this identifier
139
      Lock lock = null;
140
      
141
      // get the subject
142
      Subject subject = session.getSubject();
143
      
144
      // are we allowed to do this?
145
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
146
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
147
                  + " not allowed by " + subject.getValue() + " on "
148
                  + pid.getValue());
149
          
150
      }
151
      
152
      SystemMetadata systemMetadata = null;
153
      try {
154
          lock = HazelcastService.getInstance().getLock(pid.getValue());
155
          lock.lock();
156
          logMetacat.debug("Locked identifier " + pid.getValue());
157

    
158
          try {
159
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
160
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
161
                  
162
              }
163
              
164
              // did we get it correctly?
165
              if ( systemMetadata == null ) {
166
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
167
                  
168
              }
169

    
170
              // does the request have the most current system metadata?
171
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
172
                 String msg = "The requested system metadata version number " + 
173
                     serialVersion + " differs from the current version at " +
174
                     systemMetadata.getSerialVersion().longValue() +
175
                     ". Please get the latest copy in order to modify it.";
176
                 throw new VersionMismatch("4886", msg);
177
                 
178
              }
179
              
180
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
181
              throw new NotFound("4884", "No record found for: " + pid.getValue());
182
            
183
          }
184
          
185
          // set the new policy
186
          systemMetadata.setReplicationPolicy(policy);
187
          
188
          // update the metadata
189
          try {
190
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
191
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
192
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
193
              notifyReplicaNodes(systemMetadata);
194
              
195
          } catch (RuntimeException e) {
196
              throw new ServiceFailure("4882", e.getMessage());
197
          
198
          }
199
          
200
      } catch (RuntimeException e) {
201
          throw new ServiceFailure("4882", e.getMessage());
202
          
203
      } finally {
204
          lock.unlock();
205
          logMetacat.debug("Unlocked identifier " + pid.getValue());
206
          
207
      }
208
    
209
      return true;
210
  }
211

    
212
  /**
213
   * Deletes the replica from the given Member Node
214
   * NOTE: MN.delete() may be an "archive" operation. TBD.
215
   * @param session
216
   * @param pid
217
   * @param nodeId
218
   * @param serialVersion
219
   * @return
220
   * @throws InvalidToken
221
   * @throws ServiceFailure
222
   * @throws NotAuthorized
223
   * @throws NotFound
224
   * @throws NotImplemented
225
   * @throws VersionMismatch
226
   */
227
  @Override
228
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
229
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
230
	  
231
	  	// The lock to be used for this identifier
232
		Lock lock = null;
233

    
234
		// get the subject
235
		Subject subject = session.getSubject();
236

    
237
		// are we allowed to do this?
238
		boolean isAuthorized = false;
239
		try {
240
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
241
		} catch (InvalidRequest e) {
242
			throw new ServiceFailure("4882", e.getDescription());
243
		}
244
		if (!isAuthorized) {
245
			throw new NotAuthorized("4881", Permission.WRITE
246
					+ " not allowed by " + subject.getValue() + " on "
247
					+ pid.getValue());
248

    
249
		}
250

    
251
		SystemMetadata systemMetadata = null;
252
		try {
253
			lock = HazelcastService.getInstance().getLock(pid.getValue());
254
			lock.lock();
255
			logMetacat.debug("Locked identifier " + pid.getValue());
256

    
257
			try {
258
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
259
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
260
				}
261

    
262
				// did we get it correctly?
263
				if (systemMetadata == null) {
264
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
265
				}
266

    
267
				// does the request have the most current system metadata?
268
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
269
					String msg = "The requested system metadata version number "
270
							+ serialVersion
271
							+ " differs from the current version at "
272
							+ systemMetadata.getSerialVersion().longValue()
273
							+ ". Please get the latest copy in order to modify it.";
274
					throw new VersionMismatch("4886", msg);
275

    
276
				}
277

    
278
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
279
				throw new NotFound("4884", "No record found for: " + pid.getValue());
280

    
281
			}
282
			  
283
			// check permissions
284
			// TODO: is this necessary?
285
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
286
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
287
			if (!isAllowed) {
288
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
289
			}
290
			  
291
			// delete the replica from the given node
292
			// CSJ: use CN.delete() to truly delete a replica, semantically
293
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
294
			//D1Client.getMN(nodeId).delete(session, pid);
295
			  
296
			// reflect that change in the system metadata
297
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
298
			for (Replica r: systemMetadata.getReplicaList()) {
299
				  if (r.getReplicaMemberNode().equals(nodeId)) {
300
					  updatedReplicas.remove(r);
301
					  break;
302
				  }
303
			}
304
			systemMetadata.setReplicaList(updatedReplicas);
305

    
306
			// update the metadata
307
			try {
308
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
309
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
310
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
311
			} catch (RuntimeException e) {
312
				throw new ServiceFailure("4882", e.getMessage());
313
			}
314

    
315
		} catch (RuntimeException e) {
316
			throw new ServiceFailure("4882", e.getMessage());
317
		} finally {
318
			lock.unlock();
319
			logMetacat.debug("Unlocked identifier " + pid.getValue());
320
		}
321

    
322
		return true;	  
323
	  
324
  }
325
  
326
  /**
327
   * Deletes an object from the Coordinating Node, where the object is a 
328
   * a science metadata object.
329
   * 
330
   * @param session - the Session object containing the credentials for the Subject
331
   * @param pid - The object identifier to be deleted
332
   * 
333
   * @return pid - the identifier of the object used for the deletion
334
   * 
335
   * @throws InvalidToken
336
   * @throws ServiceFailure
337
   * @throws NotAuthorized
338
   * @throws NotFound
339
   * @throws NotImplemented
340
   * @throws InvalidRequest
341
   */
342
  @Override
343
  public Identifier delete(Session session, Identifier pid) 
344
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
345

    
346
	  // check that it is CN/admin
347
	  boolean allowed = isAdminAuthorized(session);
348
	  
349
	  if (!allowed) {
350
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
351
		  logMetacat.info(msg);
352
		  throw new NotAuthorized("1320", msg);
353
	  }
354
	  
355
	  // defer to superclass implementation
356
	  Identifier retId =  super.delete(session, pid);
357
      
358
	  // notify the replicas
359
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
360
	  if (systemMetadata.getReplicaList() != null) {
361
		  for (Replica replica: systemMetadata.getReplicaList()) {
362
			  NodeReference replicaNode = replica.getReplicaMemberNode();
363
			  try {
364
				  Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
365
			  } catch (Exception e) {
366
				  // all we can really do is log errors and carry on with life
367
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
368
			}
369
			  
370
		  }
371
	  }
372
	  
373
	  return retId;
374
      
375
  }
376
  
377
  /**
378
   * Deletes an object from the Coordinating Node, where the object is a 
379
   * a science metadata object.
380
   * 
381
   * @param session - the Session object containing the credentials for the Subject
382
   * @param pid - The object identifier to be deleted
383
   * 
384
   * @return pid - the identifier of the object used for the deletion
385
   * 
386
   * @throws InvalidToken
387
   * @throws ServiceFailure
388
   * @throws NotAuthorized
389
   * @throws NotFound
390
   * @throws NotImplemented
391
   * @throws InvalidRequest
392
   */
393
  @Override
394
  public Identifier archive(Session session, Identifier pid) 
395
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
396

    
397
	  // check that it is CN/admin
398
	  boolean allowed = isAdminAuthorized(session);
399
	  
400
	  if (!allowed) {
401
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
402
		  logMetacat.info(msg);
403
		  throw new NotAuthorized("1320", msg);
404
	  }
405
	  
406
	  // defer to superclass implementation
407
	  Identifier retId =  super.archive(session, pid);
408
      
409
	  // notify the replicas
410
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
411
	  if (systemMetadata.getReplicaList() != null) {
412
		  for (Replica replica: systemMetadata.getReplicaList()) {
413
			  NodeReference replicaNode = replica.getReplicaMemberNode();
414
			  try {
415
				  // TODO: implement in the clients
416
				  //Identifier mnRetId = D1Client.getMN(replicaNode).archive(null, pid);
417
			  } catch (Exception e) {
418
				  // all we can really do is log errors and carry on with life
419
				  logMetacat.error("Error archiving pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
420
			}
421
			  
422
		  }
423
	  }
424
	  
425
	  return retId;
426
      
427
  }
428
  
429
  /**
430
   * Set the obsoletedBy attribute in System Metadata
431
   * @param session
432
   * @param pid
433
   * @param obsoletedByPid
434
   * @param serialVersion
435
   * @return
436
   * @throws NotImplemented
437
   * @throws NotFound
438
   * @throws NotAuthorized
439
   * @throws ServiceFailure
440
   * @throws InvalidRequest
441
   * @throws InvalidToken
442
   * @throws VersionMismatch
443
   */
444
  @Override
445
  public boolean setObsoletedBy(Session session, Identifier pid,
446
			Identifier obsoletedByPid, long serialVersion)
447
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
448
			InvalidRequest, InvalidToken, VersionMismatch {
449

    
450
		// The lock to be used for this identifier
451
		Lock lock = null;
452

    
453
		// get the subject
454
		Subject subject = session.getSubject();
455

    
456
		// are we allowed to do this?
457
		if (!isAuthorized(session, pid, Permission.WRITE)) {
458
			throw new NotAuthorized("4881", Permission.WRITE
459
					+ " not allowed by " + subject.getValue() + " on "
460
					+ pid.getValue());
461

    
462
		}
463

    
464

    
465
		SystemMetadata systemMetadata = null;
466
		try {
467
			lock = HazelcastService.getInstance().getLock(pid.getValue());
468
			lock.lock();
469
			logMetacat.debug("Locked identifier " + pid.getValue());
470

    
471
			try {
472
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
473
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
474
				}
475

    
476
				// did we get it correctly?
477
				if (systemMetadata == null) {
478
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
479
				}
480

    
481
				// does the request have the most current system metadata?
482
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
483
					String msg = "The requested system metadata version number "
484
							+ serialVersion
485
							+ " differs from the current version at "
486
							+ systemMetadata.getSerialVersion().longValue()
487
							+ ". Please get the latest copy in order to modify it.";
488
					throw new VersionMismatch("4886", msg);
489

    
490
				}
491

    
492
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
493
				throw new NotFound("4884", "No record found for: " + pid.getValue());
494

    
495
			}
496

    
497
			// set the new policy
498
			systemMetadata.setObsoletedBy(obsoletedByPid);
499

    
500
			// update the metadata
501
			try {
502
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
503
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
504
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
505
			} catch (RuntimeException e) {
506
				throw new ServiceFailure("4882", e.getMessage());
507
			}
508

    
509
		} catch (RuntimeException e) {
510
			throw new ServiceFailure("4882", e.getMessage());
511
		} finally {
512
			lock.unlock();
513
			logMetacat.debug("Unlocked identifier " + pid.getValue());
514
		}
515

    
516
		return true;
517
	}
518
  
519
  
520
  /**
521
   * Set the replication status for an object given the object identifier
522
   * 
523
   * @param session - the Session object containing the credentials for the Subject
524
   * @param pid - the object identifier for the given object
525
   * @param status - the replication status to be applied
526
   * 
527
   * @return true or false
528
   * 
529
   * @throws NotImplemented
530
   * @throws NotAuthorized
531
   * @throws ServiceFailure
532
   * @throws InvalidRequest
533
   * @throws InvalidToken
534
   * @throws NotFound
535
   * 
536
   */
537
  @Override
538
  public boolean setReplicationStatus(Session session, Identifier pid,
539
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
540
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
541
      InvalidRequest, NotFound {
542
	  
543
	  // cannot be called by public
544
	  if (session == null) {
545
		  throw new NotAuthorized("4720", "Session cannot be null");
546
	  }
547
      
548
      // The lock to be used for this identifier
549
      Lock lock = null;
550
      
551
      boolean allowed = false;
552
      int replicaEntryIndex = -1;
553
      List<Replica> replicas = null;
554
      // get the subject
555
      Subject subject = session.getSubject();
556
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
557
          " is " + status.toString());
558
      
559
      SystemMetadata systemMetadata = null;
560

    
561
      try {
562
          lock = HazelcastService.getInstance().getLock(pid.getValue());
563
          lock.lock();
564
          logMetacat.debug("Locked identifier " + pid.getValue());
565

    
566
          try {      
567
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
568

    
569
              // did we get it correctly?
570
              if ( systemMetadata == null ) {
571
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
572
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
573
                  
574
              }
575
              replicas = systemMetadata.getReplicaList();
576
              int count = 0;
577
              
578
              // was there a failure? log it
579
              if ( failure != null && status == ReplicationStatus.FAILED ) {
580
                 String msg = "The replication request of the object identified by " + 
581
                     pid.getValue() + " failed.  The error message was " +
582
                     failure.getMessage() + ".";
583
              }
584
              
585
              if (replicas.size() > 0 && replicas != null) {
586
                  // find the target replica index in the replica list
587
                  for (Replica replica : replicas) {
588
                      String replicaNodeStr = replica.getReplicaMemberNode()
589
                              .getValue();
590
                      String targetNodeStr = targetNode.getValue();
591
                      logMetacat.debug("Comparing " + replicaNodeStr + " to "
592
                              + targetNodeStr);
593
                  
594
                      if (replicaNodeStr.equals(targetNodeStr)) {
595
                          replicaEntryIndex = count;
596
                          logMetacat.debug("replica entry index is: "
597
                                  + replicaEntryIndex);
598
                          break;
599
                      }
600
                      count++;
601
                  
602
                  }
603
              }
604
              // are we allowed to do this? only CNs and target MNs are allowed
605
              CNode cn = D1Client.getCN();
606
              List<Node> nodes = cn.listNodes().getNodeList();
607
              
608
              // find the node in the node list
609
              for ( Node node : nodes ) {
610
                  
611
                  NodeReference nodeReference = node.getIdentifier();
612
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
613
                  
614
                  // allow target MN certs
615
                  if (targetNode.getValue().equals(nodeReference.getValue()) &&
616
                      node.getType() == NodeType.MN) {
617
                      List<Subject> nodeSubjects = node.getSubjectList();
618
                      
619
                      // check if the session subject is in the node subject list
620
                      for (Subject nodeSubject : nodeSubjects) {
621
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
622
                                  nodeSubject.getValue() + " and " + subject.getValue());
623
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
624
                              
625
                              // lastly limit to COMPLETED, INVALIDATED,
626
                              // and FAILED status updates from MNs only
627
                              if ( status == ReplicationStatus.COMPLETED ||
628
                                   status == ReplicationStatus.INVALIDATED ||
629
                                   status == ReplicationStatus.FAILED) {
630
                                  allowed = true;
631
                                  break;
632
                                  
633
                              }                              
634
                          }
635
                      }                 
636
                  }
637
              }
638

    
639
              if ( !allowed ) {
640
                  //check for CN admin access
641
                  allowed = isAuthorized(session, pid, Permission.WRITE);
642
                  
643
              }              
644
              
645
              if ( !allowed ) {
646
                  String msg = "The subject identified by "
647
                          + subject.getValue()
648
                          + " does not have permission to set the replication status for "
649
                          + "the replica identified by "
650
                          + targetNode.getValue() + ".";
651
                  logMetacat.info(msg);
652
                  throw new NotAuthorized("4720", msg);
653
                  
654
              }
655

    
656
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
657
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
658
                " : " + e.getMessage());
659
            
660
          }
661
          
662
          Replica targetReplica = new Replica();
663
          // set the status for the replica
664
          if ( replicaEntryIndex != -1 ) {
665
              targetReplica = replicas.get(replicaEntryIndex);
666
              
667
              // don't allow status to change from COMPLETED to anything other
668
              // than INVALIDATED: prevents overwrites from race conditions
669
              if ( targetReplica.getReplicationStatus() == ReplicationStatus.COMPLETED &&
670
            	   status != ReplicationStatus.INVALIDATED) {
671
            	  throw new InvalidRequest("4730", "Status state change from " +
672
            			  targetReplica.getReplicationStatus() + " to " +
673
            			  status.toString() + "is prohibited for identifier " +
674
            			  pid.getValue() + " and target node " + 
675
            			  targetReplica.getReplicaMemberNode().getValue());
676
              }
677
              
678
              targetReplica.setReplicationStatus(status);
679
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
680
              logMetacat.debug("Set the replication status for " + 
681
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
682
                  targetReplica.getReplicationStatus() + " for identifier " +
683
                  pid.getValue());
684
              
685
          } else {
686
              // this is a new entry, create it
687
              targetReplica.setReplicaMemberNode(targetNode);
688
              targetReplica.setReplicationStatus(status);
689
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
690
              replicas.add(targetReplica);
691
              
692
          }
693
          
694
          systemMetadata.setReplicaList(replicas);
695
                
696
          // update the metadata
697
          try {
698
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
699
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
700
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
701

    
702
              if ( status != ReplicationStatus.QUEUED && status != ReplicationStatus.REQUESTED) {
703
                  
704
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
705
                          "\tNODE:\t" + targetNode.getValue() + 
706
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
707
                
708
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
709
                          "\tPID:\t"  + pid.getValue() + 
710
                          "\tNODE:\t" + targetNode.getValue() + 
711
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
712
              }
713

    
714
              if ( status == ReplicationStatus.FAILED && failure != null ) {
715
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
716
                      " on target node " + targetNode + ". The exception was: " +
717
                      failure.getMessage());
718
              }
719
          } catch (RuntimeException e) {
720
              throw new ServiceFailure("4700", e.getMessage());
721
          
722
          }
723
          
724
    } catch (RuntimeException e) {
725
        String msg = "There was a RuntimeException getting the lock for " +
726
            pid.getValue();
727
        logMetacat.info(msg);
728
        
729
    } finally {
730
        lock.unlock();
731
        logMetacat.debug("Unlocked identifier " + pid.getValue());
732
        
733
    }
734
      
735
      return true;
736
  }
737
  
738
  /**
739
   * Return the checksum of the object given the identifier 
740
   * 
741
   * @param session - the Session object containing the credentials for the Subject
742
   * @param pid - the object identifier for the given object
743
   * 
744
   * @return checksum - the checksum of the object
745
   * 
746
   * @throws InvalidToken
747
   * @throws ServiceFailure
748
   * @throws NotAuthorized
749
   * @throws NotFound
750
   * @throws NotImplemented
751
   */
752
  @Override
753
  public Checksum getChecksum(Session session, Identifier pid)
754
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
755
    NotImplemented {
756
    
757
	boolean isAuthorized = false;
758
	try {
759
		isAuthorized = isAuthorized(session, pid, Permission.READ);
760
	} catch (InvalidRequest e) {
761
		throw new ServiceFailure("1410", e.getDescription());
762
	}  
763
    if (!isAuthorized) {
764
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
765
    }
766
    
767
    SystemMetadata systemMetadata = null;
768
    Checksum checksum = null;
769
    
770
    try {
771
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
772

    
773
        if (systemMetadata == null ) {
774
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
775
        }
776
        checksum = systemMetadata.getChecksum();
777
        
778
    } catch (RuntimeException e) {
779
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
780
            pid.getValue() + ". The error message was: " + e.getMessage());
781
      
782
    }
783
    
784
    return checksum;
785
  }
786

    
787
  /**
788
   * Resolve the location of a given object
789
   * 
790
   * @param session - the Session object containing the credentials for the Subject
791
   * @param pid - the object identifier for the given object
792
   * 
793
   * @return objectLocationList - the list of nodes known to contain the object
794
   * 
795
   * @throws InvalidToken
796
   * @throws ServiceFailure
797
   * @throws NotAuthorized
798
   * @throws NotFound
799
   * @throws NotImplemented
800
   */
801
  @Override
802
  public ObjectLocationList resolve(Session session, Identifier pid)
803
    throws InvalidToken, ServiceFailure, NotAuthorized,
804
    NotFound, NotImplemented {
805

    
806
    throw new NotImplemented("4131", "resolve not implemented");
807

    
808
  }
809

    
810
  /**
811
   * Metacat does not implement this method at the CN level
812
   */
813
  @Override
814
  public ObjectList search(Session session, String queryType, String query)
815
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
816
    NotImplemented {
817

    
818
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
819
	  
820
//    ObjectList objectList = null;
821
//    try {
822
//        objectList = 
823
//          IdentifierManager.getInstance().querySystemMetadata(
824
//              null, //startTime, 
825
//              null, //endTime,
826
//              null, //objectFormat, 
827
//              false, //replicaStatus, 
828
//              0, //start, 
829
//              1000 //count
830
//              );
831
//        
832
//    } catch (Exception e) {
833
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
834
//    }
835
//
836
//      return objectList;
837
		  
838
  }
839
  
840
  /**
841
   * Returns the object format registered in the DataONE Object Format 
842
   * Vocabulary for the given format identifier
843
   * 
844
   * @param fmtid - the identifier of the format requested
845
   * 
846
   * @return objectFormat - the object format requested
847
   * 
848
   * @throws ServiceFailure
849
   * @throws NotFound
850
   * @throws InsufficientResources
851
   * @throws NotImplemented
852
   */
853
  @Override
854
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
855
    throws ServiceFailure, NotFound, NotImplemented {
856
     
857
      return ObjectFormatService.getInstance().getFormat(fmtid);
858
      
859
  }
860

    
861
  /**
862
   * Returns a list of all object formats registered in the DataONE Object 
863
   * Format Vocabulary
864
    * 
865
   * @return objectFormatList - The list of object formats registered in 
866
   *                            the DataONE Object Format Vocabulary
867
   * 
868
   * @throws ServiceFailure
869
   * @throws NotImplemented
870
   * @throws InsufficientResources
871
   */
872
  @Override
873
  public ObjectFormatList listFormats() 
874
    throws ServiceFailure, NotImplemented {
875

    
876
    return ObjectFormatService.getInstance().listFormats();
877
  }
878

    
879
  /**
880
   * Returns a list of nodes that have been registered with the DataONE infrastructure
881
    * 
882
   * @return nodeList - List of nodes from the registry
883
   * 
884
   * @throws ServiceFailure
885
   * @throws NotImplemented
886
   */
887
  @Override
888
  public NodeList listNodes() 
889
    throws NotImplemented, ServiceFailure {
890

    
891
    throw new NotImplemented("4800", "listNodes not implemented");
892
  }
893

    
894
  /**
895
   * Provides a mechanism for adding system metadata independently of its 
896
   * associated object, such as when adding system metadata for data objects.
897
    * 
898
   * @param session - the Session object containing the credentials for the Subject
899
   * @param pid - The identifier of the object to register the system metadata against
900
   * @param sysmeta - The system metadata to be registered
901
   * 
902
   * @return true if the registration succeeds
903
   * 
904
   * @throws NotImplemented
905
   * @throws NotAuthorized
906
   * @throws ServiceFailure
907
   * @throws InvalidRequest
908
   * @throws InvalidSystemMetadata
909
   */
910
  @Override
911
  public Identifier registerSystemMetadata(Session session, Identifier pid,
912
      SystemMetadata sysmeta) 
913
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
914
      InvalidSystemMetadata {
915

    
916
      // The lock to be used for this identifier
917
      Lock lock = null;
918

    
919
      // TODO: control who can call this?
920
      if (session == null) {
921
          //TODO: many of the thrown exceptions do not use the correct error codes
922
          //check these against the docs and correct them
923
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
924
                  "  If you are not logged in, please do so and retry the request.");
925
      }
926
      
927
      // verify that guid == SystemMetadata.getIdentifier()
928
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
929
          "|" + sysmeta.getIdentifier().getValue());
930
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
931
          throw new InvalidRequest("4863", 
932
              "The identifier in method call (" + pid.getValue() + 
933
              ") does not match identifier in system metadata (" +
934
              sysmeta.getIdentifier().getValue() + ").");
935
      }
936

    
937
      try {
938
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
939
          lock.lock();
940
          logMetacat.debug("Locked identifier " + pid.getValue());
941
          logMetacat.debug("Checking if identifier exists...");
942
          // Check that the identifier does not already exist
943
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
944
              throw new InvalidRequest("4863", 
945
                  "The identifier is already in use by an existing object.");
946
          
947
          }
948
          
949
          // insert the system metadata into the object store
950
          logMetacat.debug("Starting to insert SystemMetadata...");
951
          try {
952
              sysmeta.setSerialVersion(BigInteger.ONE);
953
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
954
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
955
              
956
          } catch (RuntimeException e) {
957
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
958
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
959
                  e.getClass() + ": " + e.getMessage());
960
              
961
          }
962
          
963
      } catch (RuntimeException e) {
964
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
965
                  e.getClass() + ": " + e.getMessage());
966
          
967
      }  finally {
968
          lock.unlock();
969
          logMetacat.debug("Unlocked identifier " + pid.getValue());
970
          
971
      }
972

    
973
      
974
      logMetacat.debug("Returning from registerSystemMetadata");
975
      EventLog.getInstance().log(request.getRemoteAddr(), 
976
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
977
          pid.getValue(), "registerSystemMetadata");
978
      return pid;
979
  }
980
  
981
  /**
982
   * Given an optional scope and format, reserves and returns an identifier 
983
   * within that scope and format that is unique and will not be 
984
   * used by any other sessions. 
985
    * 
986
   * @param session - the Session object containing the credentials for the Subject
987
   * @param pid - The identifier of the object to register the system metadata against
988
   * @param scope - An optional string to be used to qualify the scope of 
989
   *                the identifier namespace, which is applied differently 
990
   *                depending on the format requested. If scope is not 
991
   *                supplied, a default scope will be used.
992
   * @param format - The optional name of the identifier format to be used, 
993
   *                  drawn from a DataONE-specific vocabulary of identifier 
994
   *                 format names, including several common syntaxes such 
995
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
996
   *                 format is not supplied by the caller, the CN service 
997
   *                 will use a default identifier format, which may change 
998
   *                 over time.
999
   * 
1000
   * @return true if the registration succeeds
1001
   * 
1002
   * @throws InvalidToken
1003
   * @throws ServiceFailure
1004
   * @throws NotAuthorized
1005
   * @throws IdentifierNotUnique
1006
   * @throws NotImplemented
1007
   */
1008
  @Override
1009
  public Identifier reserveIdentifier(Session session, Identifier pid)
1010
  throws InvalidToken, ServiceFailure,
1011
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1012

    
1013
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1014
  }
1015
  
1016
  @Override
1017
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1018
  throws InvalidToken, ServiceFailure,
1019
        NotAuthorized, NotImplemented, InvalidRequest {
1020
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1021
  }
1022
  
1023
  /**
1024
    * Checks whether the pid is reserved by the subject in the session param
1025
    * If the reservation is held on the pid by the subject, we return true.
1026
    * 
1027
   * @param session - the Session object containing the Subject
1028
   * @param pid - The identifier to check
1029
   * 
1030
   * @return true if the reservation exists for the subject/pid
1031
   * 
1032
   * @throws InvalidToken
1033
   * @throws ServiceFailure
1034
   * @throws NotFound - when the pid is not found (in use or in reservation)
1035
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1036
   * @throws IdentifierNotUnique - when the pid is in use
1037
   * @throws NotImplemented
1038
   */
1039

    
1040
  @Override
1041
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1042
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1043
      NotImplemented, InvalidRequest {
1044
  
1045
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1046
  }
1047

    
1048
  /**
1049
   * Changes ownership (RightsHolder) of the specified object to the 
1050
   * subject specified by userId
1051
    * 
1052
   * @param session - the Session object containing the credentials for the Subject
1053
   * @param pid - Identifier of the object to be modified
1054
   * @param userId - The subject that will be taking ownership of the specified object.
1055
   *
1056
   * @return pid - the identifier of the modified object
1057
   * 
1058
   * @throws ServiceFailure
1059
   * @throws InvalidToken
1060
   * @throws NotFound
1061
   * @throws NotAuthorized
1062
   * @throws NotImplemented
1063
   * @throws InvalidRequest
1064
   */  
1065
  @Override
1066
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1067
      long serialVersion)
1068
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1069
      NotImplemented, InvalidRequest, VersionMismatch {
1070
      
1071
      // The lock to be used for this identifier
1072
      Lock lock = null;
1073

    
1074
      // get the subject
1075
      Subject subject = session.getSubject();
1076
      
1077
      // are we allowed to do this?
1078
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1079
          throw new NotAuthorized("4440", "not allowed by "
1080
                  + subject.getValue() + " on " + pid.getValue());
1081
          
1082
      }
1083
      
1084
      SystemMetadata systemMetadata = null;
1085
      try {
1086
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1087
          lock.lock();
1088
          logMetacat.debug("Locked identifier " + pid.getValue());
1089

    
1090
          try {
1091
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1092
              
1093
              // does the request have the most current system metadata?
1094
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1095
                 String msg = "The requested system metadata version number " + 
1096
                     serialVersion + " differs from the current version at " +
1097
                     systemMetadata.getSerialVersion().longValue() +
1098
                     ". Please get the latest copy in order to modify it.";
1099
                 throw new VersionMismatch("4443", msg);
1100
              }
1101
              
1102
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1103
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1104
              
1105
          }
1106
              
1107
          // set the new rights holder
1108
          systemMetadata.setRightsHolder(userId);
1109
          
1110
          // update the metadata
1111
          try {
1112
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1113
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1114
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1115
              notifyReplicaNodes(systemMetadata);
1116
              
1117
          } catch (RuntimeException e) {
1118
              throw new ServiceFailure("4490", e.getMessage());
1119
          
1120
          }
1121
          
1122
      } catch (RuntimeException e) {
1123
          throw new ServiceFailure("4490", e.getMessage());
1124
          
1125
      } finally {
1126
          lock.unlock();
1127
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1128
      
1129
      }
1130
      
1131
      return pid;
1132
  }
1133

    
1134
  /**
1135
   * Verify that a replication task is authorized by comparing the target node's
1136
   * Subject (from the X.509 certificate-derived Session) with the list of 
1137
   * subjects in the known, pending replication tasks map.
1138
   * 
1139
   * @param originatingNodeSession - Session information that contains the 
1140
   *                                 identity of the calling user
1141
   * @param targetNodeSubject - Subject identifying the target node
1142
   * @param pid - the identifier of the object to be replicated
1143
   * @param replicatePermission - the execute permission to be granted
1144
   * 
1145
   * @throws ServiceFailure
1146
   * @throws NotImplemented
1147
   * @throws InvalidToken
1148
   * @throws NotAuthorized
1149
   * @throws InvalidRequest
1150
   * @throws NotFound
1151
   */
1152
  @Override
1153
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1154
    Subject targetNodeSubject, Identifier pid) 
1155
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1156
    NotFound, InvalidRequest {
1157
    
1158
    boolean isAllowed = false;
1159
    SystemMetadata sysmeta = null;
1160
    NodeReference targetNode = null;
1161
    
1162
    try {
1163
      // get the target node reference from the nodes list
1164
      CNode cn = D1Client.getCN();
1165
      List<Node> nodes = cn.listNodes().getNodeList();
1166
      
1167
      if ( nodes != null ) {
1168
        for (Node node : nodes) {
1169
            
1170
        	if (node.getSubjectList() != null) {
1171
        		
1172
	            for (Subject nodeSubject : node.getSubjectList()) {
1173
	            	
1174
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1175
	                    targetNode = node.getIdentifier();
1176
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1177
	                    break;
1178
	                }
1179
	            }
1180
        	}
1181
            
1182
            if ( targetNode != null) { break; }
1183
        }
1184
        
1185
      } else {
1186
          String msg = "Couldn't get the node list from the CN";
1187
          logMetacat.debug(msg);
1188
          throw new ServiceFailure("4872", msg);
1189
          
1190
      }
1191
      
1192
      // can't find a node listed with the given subject
1193
      if ( targetNode == null ) {
1194
          String msg = "There is no Member Node registered with a node subject " +
1195
              "matching " + targetNodeSubject.getValue();
1196
          logMetacat.info(msg);
1197
          throw new NotAuthorized("4871", msg);
1198
          
1199
      }
1200
      
1201
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1202
      
1203
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1204

    
1205
      if ( sysmeta != null ) {
1206
          
1207
          List<Replica> replicaList = sysmeta.getReplicaList();
1208
          
1209
          if ( replicaList != null ) {
1210
              
1211
              // find the replica with the status set to 'requested'
1212
              for (Replica replica : replicaList) {
1213
                  ReplicationStatus status = replica.getReplicationStatus();
1214
                  NodeReference listedNode = replica.getReplicaMemberNode();
1215
                  if ( listedNode != null && targetNode != null ) {
1216
                      logMetacat.debug("Comparing " + listedNode.getValue()
1217
                              + " to " + targetNode.getValue());
1218
                      
1219
                      if (listedNode.getValue().equals(targetNode.getValue())
1220
                              && status.equals(ReplicationStatus.REQUESTED)) {
1221
                          isAllowed = true;
1222
                          break;
1223

    
1224
                      }
1225
                  }
1226
              }
1227
          }
1228
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1229
              "to replicate: " + isAllowed + " for " + pid.getValue());
1230

    
1231
          
1232
      } else {
1233
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1234
          " is null.");          
1235
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1236
          
1237
      }
1238

    
1239
    } catch (RuntimeException e) {
1240
    	  ServiceFailure sf = new ServiceFailure("4872", 
1241
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1242
                e.getMessage());
1243
    	  sf.initCause(e);
1244
        throw sf;
1245
        
1246
    }
1247
      
1248
    return isAllowed;
1249
    
1250
  }
1251

    
1252
  /**
1253
   * Adds a new object to the Node, where the object is a science metadata object.
1254
   * 
1255
   * @param session - the Session object containing the credentials for the Subject
1256
   * @param pid - The object identifier to be created
1257
   * @param object - the object bytes
1258
   * @param sysmeta - the system metadata that describes the object  
1259
   * 
1260
   * @return pid - the object identifier created
1261
   * 
1262
   * @throws InvalidToken
1263
   * @throws ServiceFailure
1264
   * @throws NotAuthorized
1265
   * @throws IdentifierNotUnique
1266
   * @throws UnsupportedType
1267
   * @throws InsufficientResources
1268
   * @throws InvalidSystemMetadata
1269
   * @throws NotImplemented
1270
   * @throws InvalidRequest
1271
   */
1272
  public Identifier create(Session session, Identifier pid, InputStream object,
1273
    SystemMetadata sysmeta) 
1274
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1275
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1276
    NotImplemented, InvalidRequest {
1277
                  
1278
      // The lock to be used for this identifier
1279
      Lock lock = null;
1280

    
1281
      try {
1282
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1283
          // are we allowed?
1284
          boolean isAllowed = false;
1285
          isAllowed = isAdminAuthorized(session);
1286

    
1287
          // proceed if we're called by a CN
1288
          if ( isAllowed ) {
1289
              // create the coordinating node version of the document      
1290
              lock.lock();
1291
              logMetacat.debug("Locked identifier " + pid.getValue());
1292
              sysmeta.setSerialVersion(BigInteger.ONE);
1293
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1294
              sysmeta.setArchived(false); // this is a create op, not update
1295
              
1296
              // the CN should have set the origin and authoritative member node fields
1297
              try {
1298
                  sysmeta.getOriginMemberNode().getValue();
1299
                  sysmeta.getAuthoritativeMemberNode().getValue();
1300
                  
1301
              } catch (NullPointerException npe) {
1302
                  throw new InvalidSystemMetadata("4896", 
1303
                      "Both the origin and authoritative member node identifiers need to be set.");
1304
                  
1305
              }
1306
              pid = super.create(session, pid, object, sysmeta);
1307

    
1308
          } else {
1309
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1310
                  " isn't allowed to call create() on a Coordinating Node.";
1311
              logMetacat.info(msg);
1312
              throw new NotAuthorized("1100", msg);
1313
          }
1314
          
1315
      } catch (RuntimeException e) {
1316
          // Convert Hazelcast runtime exceptions to service failures
1317
          String msg = "There was a problem creating the object identified by " +
1318
              pid.getValue() + ". There error message was: " + e.getMessage();
1319
          throw new ServiceFailure("4893", msg);
1320
          
1321
      } finally {
1322
    	  if (lock != null) {
1323
	          lock.unlock();
1324
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1325
    	  }
1326
      }
1327
      
1328
      return pid;
1329

    
1330
  }
1331

    
1332
  /**
1333
   * Set access for a given object using the object identifier and a Subject
1334
   * under a given Session.
1335
   * 
1336
   * @param session - the Session object containing the credentials for the Subject
1337
   * @param pid - the object identifier for the given object to apply the policy
1338
   * @param policy - the access policy to be applied
1339
   * 
1340
   * @return true if the application of the policy succeeds
1341
   * @throws InvalidToken
1342
   * @throws ServiceFailure
1343
   * @throws NotFound
1344
   * @throws NotAuthorized
1345
   * @throws NotImplemented
1346
   * @throws InvalidRequest
1347
   */
1348
  public boolean setAccessPolicy(Session session, Identifier pid, 
1349
      AccessPolicy accessPolicy, long serialVersion) 
1350
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1351
      NotImplemented, InvalidRequest, VersionMismatch {
1352
      
1353
      // The lock to be used for this identifier
1354
      Lock lock = null;
1355
      SystemMetadata systemMetadata = null;
1356
      
1357
      boolean success = false;
1358
      
1359
      // get the subject
1360
      Subject subject = session.getSubject();
1361
      
1362
      // are we allowed to do this?
1363
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1364
          throw new NotAuthorized("4420", "not allowed by "
1365
                  + subject.getValue() + " on " + pid.getValue());
1366
      }
1367
      
1368
      try {
1369
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1370
          lock.lock();
1371
          logMetacat.debug("Locked identifier " + pid.getValue());
1372

    
1373
          try {
1374
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1375

    
1376
              if ( systemMetadata == null ) {
1377
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1378
                  
1379
              }
1380
              // does the request have the most current system metadata?
1381
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1382
                 String msg = "The requested system metadata version number " + 
1383
                     serialVersion + " differs from the current version at " +
1384
                     systemMetadata.getSerialVersion().longValue() +
1385
                     ". Please get the latest copy in order to modify it.";
1386
                 throw new VersionMismatch("4402", msg);
1387
                 
1388
              }
1389
              
1390
          } catch (RuntimeException e) {
1391
              // convert Hazelcast RuntimeException to NotFound
1392
              throw new NotFound("4400", "No record found for: " + pid);
1393
            
1394
          }
1395
              
1396
          // set the access policy
1397
          systemMetadata.setAccessPolicy(accessPolicy);
1398
          
1399
          // update the system metadata
1400
          try {
1401
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1402
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1403
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1404
              notifyReplicaNodes(systemMetadata);
1405
              
1406
          } catch (RuntimeException e) {
1407
              // convert Hazelcast RuntimeException to ServiceFailure
1408
              throw new ServiceFailure("4430", e.getMessage());
1409
            
1410
          }
1411
          
1412
      } catch (RuntimeException e) {
1413
          throw new ServiceFailure("4430", e.getMessage());
1414
          
1415
      } finally {
1416
          lock.unlock();
1417
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1418
        
1419
      }
1420

    
1421
    
1422
    // TODO: how do we know if the map was persisted?
1423
    success = true;
1424
    
1425
    return success;
1426
  }
1427

    
1428
  /**
1429
   * Full replacement of replication metadata in the system metadata for the 
1430
   * specified object, changes date system metadata modified
1431
   * 
1432
   * @param session - the Session object containing the credentials for the Subject
1433
   * @param pid - the object identifier for the given object to apply the policy
1434
   * @param replica - the replica to be updated
1435
   * @return
1436
   * @throws NotImplemented
1437
   * @throws NotAuthorized
1438
   * @throws ServiceFailure
1439
   * @throws InvalidRequest
1440
   * @throws NotFound
1441
   * @throws VersionMismatch
1442
   */
1443
  @Override
1444
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1445
      Replica replica, long serialVersion) 
1446
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1447
      NotFound, VersionMismatch {
1448
      
1449
      // The lock to be used for this identifier
1450
      Lock lock = null;
1451
      
1452
      // get the subject
1453
      Subject subject = session.getSubject();
1454
      
1455
      // are we allowed to do this?
1456
      try {
1457

    
1458
          // what is the controlling permission?
1459
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1460
              throw new NotAuthorized("4851", "not allowed by "
1461
                      + subject.getValue() + " on " + pid.getValue());
1462
          }
1463

    
1464
        
1465
      } catch (InvalidToken e) {
1466
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1467
                  " on " + pid.getValue());  
1468
          
1469
      }
1470

    
1471
      SystemMetadata systemMetadata = null;
1472
      try {
1473
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1474
          lock.lock();
1475
          logMetacat.debug("Locked identifier " + pid.getValue());
1476

    
1477
          try {      
1478
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1479

    
1480
              // does the request have the most current system metadata?
1481
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1482
                 String msg = "The requested system metadata version number " + 
1483
                     serialVersion + " differs from the current version at " +
1484
                     systemMetadata.getSerialVersion().longValue() +
1485
                     ". Please get the latest copy in order to modify it.";
1486
                 throw new VersionMismatch("4855", msg);
1487
              }
1488
              
1489
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1490
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1491
                  " : " + e.getMessage());
1492
            
1493
          }
1494
              
1495
          // set the status for the replica
1496
          List<Replica> replicas = systemMetadata.getReplicaList();
1497
          NodeReference replicaNode = replica.getReplicaMemberNode();
1498
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1499
          int index = 0;
1500
          for (Replica listedReplica: replicas) {
1501
              
1502
              // remove the replica that we are replacing
1503
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1504
                      // don't allow status to change from COMPLETED to anything other
1505
                      // than INVALIDATED: prevents overwrites from race conditions
1506
                	  if ( listedReplica.getReplicationStatus() == ReplicationStatus.COMPLETED &&
1507
            		    replicaStatus != ReplicationStatus.INVALIDATED ) {
1508
                	  throw new InvalidRequest("4853", "Status state change from " +
1509
                			  listedReplica.getReplicationStatus() + " to " +
1510
                			  replicaStatus.toString() + "is prohibited for identifier " +
1511
                			  pid.getValue() + " and target node " + 
1512
                			  listedReplica.getReplicaMemberNode().getValue());
1513

    
1514
            	  }
1515
                  replicas.remove(index);
1516
                  break;
1517
                  
1518
              }
1519
              index++;
1520
          }
1521
          
1522
          // add the new replica item
1523
          replicas.add(replica);
1524
          systemMetadata.setReplicaList(replicas);
1525
          
1526
          // update the metadata
1527
          try {
1528
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1529
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1530
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1531
            
1532
          } catch (RuntimeException e) {
1533
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1534
              throw new ServiceFailure("4852", e.getMessage());
1535
          
1536
          }
1537
          
1538
      } catch (RuntimeException e) {
1539
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1540
          throw new ServiceFailure("4852", e.getMessage());
1541
      
1542
      } finally {
1543
          lock.unlock();
1544
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1545
          
1546
      }
1547
    
1548
      return true;
1549
      
1550
  }
1551
  
1552
  /**
1553
   * 
1554
   */
1555
  @Override
1556
  public ObjectList listObjects(Session session, Date startTime, 
1557
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1558
      Integer start, Integer count)
1559
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1560
      ServiceFailure {
1561
      
1562
      ObjectList objectList = null;
1563
        try {
1564
        	if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
1565
            	count = MAXIMUM_DB_RECORD_COUNT;
1566
            }
1567
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1568
        } catch (Exception e) {
1569
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1570
        }
1571

    
1572
        return objectList;
1573
  }
1574

    
1575
  
1576
 	/**
1577
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1578
 	 * @return cal  the list of checksum algorithms
1579
 	 * 
1580
 	 * @throws ServiceFailure
1581
 	 * @throws NotImplemented
1582
 	 */
1583
  @Override
1584
  public ChecksumAlgorithmList listChecksumAlgorithms()
1585
			throws ServiceFailure, NotImplemented {
1586
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1587
		cal.addAlgorithm("MD5");
1588
		cal.addAlgorithm("SHA-1");
1589
		return cal;
1590
		
1591
	}
1592

    
1593
  /**
1594
   * Notify replica Member Nodes of system metadata changes for a given pid
1595
   * 
1596
   * @param currentSystemMetadata - the up to date system metadata
1597
   */
1598
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1599
      
1600
      Session session = null;
1601
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1602
      MNode mn = null;
1603
      NodeReference replicaNodeRef = null;
1604
      CNode cn = null;
1605
      NodeType nodeType = null;
1606
      List<Node> nodeList = null;
1607
      
1608
      try {
1609
          cn = D1Client.getCN();
1610
          nodeList = cn.listNodes().getNodeList();
1611
          
1612
      } catch (Exception e) { // handle BaseException and other I/O issues
1613
          
1614
          // swallow errors since the call is not critical
1615
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1616
              "to communication issues with the CN: " + e.getMessage());
1617
          
1618
      }
1619
      
1620
      if ( replicaList != null ) {
1621
          
1622
          // iterate through the replicas and inform  MN replica nodes
1623
          for (Replica replica : replicaList) {
1624
              
1625
              replicaNodeRef = replica.getReplicaMemberNode();
1626
              try {
1627
                  if (nodeList != null) {
1628
                      // find the node type
1629
                      for (Node node : nodeList) {
1630
                          if (node.getIdentifier().getValue() == 
1631
                              replicaNodeRef.getValue()) {
1632
                              nodeType = node.getType();
1633
                              break;
1634
              
1635
                          }
1636
                      }
1637
                  }
1638
              
1639
                  // notify only MNs
1640
                  if (nodeType != null && nodeType == NodeType.MN) {
1641
                      mn = D1Client.getMN(replicaNodeRef);
1642
                      mn.systemMetadataChanged(session, 
1643
                          currentSystemMetadata.getIdentifier(), 
1644
                          currentSystemMetadata.getSerialVersion().longValue(),
1645
                          currentSystemMetadata.getDateSysMetadataModified());
1646
                  }
1647
              
1648
              } catch (Exception e) { // handle BaseException and other I/O issues
1649
              
1650
                  // swallow errors since the call is not critical
1651
                  logMetacat.error("Can't inform "
1652
                          + replicaNodeRef.getValue()
1653
                          + " of system metadata changes due "
1654
                          + "to communication issues with the CN: "
1655
                          + e.getMessage());
1656
              
1657
              }
1658
          }
1659
      }
1660
  }
1661

    
1662
	@Override
1663
	public boolean isAuthorized(Identifier pid, Permission permission)
1664
			throws ServiceFailure, InvalidToken, NotFound, NotAuthorized,
1665
			NotImplemented, InvalidRequest {
1666
		
1667
		return isAuthorized(null, pid, permission);
1668
	}
1669
	
1670
	@Override
1671
	public boolean setAccessPolicy(Identifier pid, AccessPolicy accessPolicy, long serialVersion)
1672
			throws InvalidToken, NotFound, NotImplemented, NotAuthorized,
1673
			ServiceFailure, InvalidRequest, VersionMismatch {
1674
		
1675
		return setAccessPolicy(null, pid, accessPolicy, serialVersion);
1676
	}
1677
	
1678
	@Override
1679
	public Identifier setRightsHolder(Identifier pid, Subject userId, long serialVersion)
1680
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1681
			NotImplemented, InvalidRequest, VersionMismatch {
1682
		
1683
		return setRightsHolder(null, pid, userId, serialVersion);
1684
	}
1685
	
1686
	@Override
1687
	public Identifier create(Identifier pid, InputStream object, SystemMetadata sysmeta)
1688
			throws InvalidToken, ServiceFailure, NotAuthorized,
1689
			IdentifierNotUnique, UnsupportedType, InsufficientResources,
1690
			InvalidSystemMetadata, NotImplemented, InvalidRequest {
1691

    
1692
		return create(null, pid, object, sysmeta);
1693
	}
1694
	
1695
	@Override
1696
	public Identifier delete(Identifier pid) throws InvalidToken, ServiceFailure,
1697
			NotAuthorized, NotFound, NotImplemented {
1698

    
1699
		return delete(null, pid);
1700
	}
1701
	
1702
	@Override
1703
	public Identifier generateIdentifier(String scheme, String fragment)
1704
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1705
			InvalidRequest {
1706

    
1707
		return generateIdentifier(null, scheme, fragment);
1708
	}
1709
	
1710
	@Override
1711
	public Log getLogRecords(Date fromDate, Date toDate, Event event, String pidFilter,
1712
			Integer start, Integer count) throws InvalidToken, InvalidRequest,
1713
			ServiceFailure, NotAuthorized, NotImplemented, InsufficientResources {
1714

    
1715
		return getLogRecords(null, fromDate, toDate, event, pidFilter, start, count);
1716
	}
1717
	
1718
	@Override
1719
	public boolean hasReservation(Subject subject, Identifier pid)
1720
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1721
			NotImplemented, InvalidRequest, IdentifierNotUnique {
1722

    
1723
		return hasReservation(null, subject, pid);
1724
	}
1725
	
1726
	@Override
1727
	public Identifier registerSystemMetadata(Identifier pid, SystemMetadata sysmeta)
1728
			throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1729
			InvalidSystemMetadata, InvalidToken {
1730

    
1731
		return registerSystemMetadata(null, pid, sysmeta);
1732
	}
1733
	
1734
	@Override
1735
	public Identifier reserveIdentifier(Identifier pid) throws InvalidToken,
1736
			ServiceFailure, NotAuthorized, IdentifierNotUnique, NotImplemented,
1737
			InvalidRequest {
1738

    
1739
		return reserveIdentifier(null, pid);
1740
	}
1741
	
1742
	@Override
1743
	public boolean setObsoletedBy(Identifier pid, Identifier obsoletedByPid, long serialVersion)
1744
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
1745
			InvalidRequest, InvalidToken, VersionMismatch {
1746

    
1747
		return setObsoletedBy(null, pid, obsoletedByPid, serialVersion);
1748
	}
1749
	
1750
	@Override
1751
	public DescribeResponse describe(Identifier pid) throws InvalidToken,
1752
			NotAuthorized, NotImplemented, ServiceFailure, NotFound {
1753

    
1754
		return describe(null, pid);
1755
	}
1756
	
1757
	@Override
1758
	public InputStream get(Identifier pid) throws InvalidToken, ServiceFailure,
1759
			NotAuthorized, NotFound, NotImplemented {
1760

    
1761
		return get(null, pid);
1762
	}
1763
	
1764
	@Override
1765
	public Checksum getChecksum(Identifier pid) throws InvalidToken,
1766
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1767

    
1768
		return getChecksum(null, pid);
1769
	}
1770
	
1771
	@Override
1772
	public SystemMetadata getSystemMetadata(Identifier pid) throws InvalidToken,
1773
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1774

    
1775
		return getSystemMetadata(null, pid);
1776
	}
1777
	
1778
	@Override
1779
	public ObjectList listObjects(Date startTime, Date endTime,
1780
			ObjectFormatIdentifier formatid, Boolean replicaStatus, Integer start, Integer count)
1781
			throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1782
			ServiceFailure {
1783

    
1784
		return listObjects(null, startTime, endTime, formatid, replicaStatus, start, count);
1785
	}
1786
	
1787
	@Override
1788
	public ObjectLocationList resolve(Identifier pid) throws InvalidToken,
1789
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1790

    
1791
		return resolve(null, pid);
1792
	}
1793
	
1794
	@Override
1795
	public ObjectList search(String queryType, String query) throws InvalidToken,
1796
			ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented {
1797

    
1798
		return search(null, queryType, query);
1799
	}
1800
	
1801
	@Override
1802
	public boolean deleteReplicationMetadata(Identifier pid, NodeReference nodeId,
1803
			long serialVersion) throws InvalidToken, InvalidRequest, ServiceFailure,
1804
			NotAuthorized, NotFound, NotImplemented, VersionMismatch {
1805

    
1806
		return deleteReplicationMetadata(null, pid, nodeId, serialVersion);
1807
	}
1808
	
1809
	@Override
1810
	public boolean isNodeAuthorized(Subject targetNodeSubject, Identifier pid)
1811
			throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure,
1812
			NotFound, InvalidRequest {
1813

    
1814
		return isNodeAuthorized(null, targetNodeSubject, pid);
1815
	}
1816
	
1817
	@Override
1818
	public boolean setReplicationPolicy(Identifier pid, ReplicationPolicy policy,
1819
			long serialVersion) throws NotImplemented, NotFound, NotAuthorized,
1820
			ServiceFailure, InvalidRequest, InvalidToken, VersionMismatch {
1821

    
1822
		return setReplicationPolicy(null, pid, policy, serialVersion);
1823
	}
1824
	
1825
	@Override
1826
	public boolean setReplicationStatus(Identifier pid, NodeReference targetNode,
1827
			ReplicationStatus status, BaseException failure) throws ServiceFailure,
1828
			NotImplemented, InvalidToken, NotAuthorized, InvalidRequest, NotFound {
1829

    
1830
		return setReplicationStatus(null, pid, targetNode, status, failure);
1831
	}
1832
	
1833
	@Override
1834
	public boolean updateReplicationMetadata(Identifier pid, Replica replica,
1835
			long serialVersion) throws NotImplemented, NotAuthorized, ServiceFailure,
1836
			NotFound, InvalidRequest, InvalidToken, VersionMismatch {
1837

    
1838
		return updateReplicationMetadata(null, pid, replica, serialVersion);
1839
	}
1840

    
1841
  @Override
1842
  public QueryEngineDescription getQueryEngineDescription(String arg0)
1843
          throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1844
          NotFound {
1845
      throw new NotImplemented("4410", "getQueryEngineDescription not implemented");
1846
      
1847
  }
1848

    
1849
  @Override
1850
  public QueryEngineList listQueryEngines() throws InvalidToken, ServiceFailure,
1851
          NotAuthorized, NotImplemented {
1852
      throw new NotImplemented("4420", "listQueryEngines not implemented");
1853
      
1854
  }
1855

    
1856
  @Override
1857
  public InputStream query(String arg0, String arg1) throws InvalidToken,
1858
          ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented, NotFound {
1859
      throw new NotImplemented("4324", "query not implemented");
1860
      
1861
  }
1862
}
(1-1/6)