Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.client.MNode;
40
import org.dataone.service.cn.v1.CNAuthorization;
41
import org.dataone.service.cn.v1.CNCore;
42
import org.dataone.service.cn.v1.CNRead;
43
import org.dataone.service.cn.v1.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.DescribeResponse;
60
import org.dataone.service.types.v1.Event;
61
import org.dataone.service.types.v1.Identifier;
62
import org.dataone.service.types.v1.Log;
63
import org.dataone.service.types.v1.Node;
64
import org.dataone.service.types.v1.NodeList;
65
import org.dataone.service.types.v1.NodeReference;
66
import org.dataone.service.types.v1.NodeType;
67
import org.dataone.service.types.v1.ObjectFormat;
68
import org.dataone.service.types.v1.ObjectFormatIdentifier;
69
import org.dataone.service.types.v1.ObjectFormatList;
70
import org.dataone.service.types.v1.ObjectList;
71
import org.dataone.service.types.v1.ObjectLocationList;
72
import org.dataone.service.types.v1.Permission;
73
import org.dataone.service.types.v1.Replica;
74
import org.dataone.service.types.v1.ReplicationPolicy;
75
import org.dataone.service.types.v1.ReplicationStatus;
76
import org.dataone.service.types.v1.Session;
77
import org.dataone.service.types.v1.Subject;
78
import org.dataone.service.types.v1.SystemMetadata;
79
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
80
import org.dataone.service.types.v1_1.QueryEngineDescription;
81
import org.dataone.service.types.v1_1.QueryEngineList;
82

    
83
import edu.ucsb.nceas.metacat.EventLog;
84
import edu.ucsb.nceas.metacat.IdentifierManager;
85
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
86

    
87
/**
88
 * Represents Metacat's implementation of the DataONE Coordinating Node 
89
 * service API. Methods implement the various CN* interfaces, and methods common
90
 * to both Member Node and Coordinating Node interfaces are found in the
91
 * D1NodeService super class.
92
 *
93
 */
94
public class CNodeService extends D1NodeService implements CNAuthorization,
95
    CNCore, CNRead, CNReplication {
96

    
97
  /* the logger instance */
98
  private Logger logMetacat = null;
99

    
100
  /**
101
   * singleton accessor
102
   */
103
  public static CNodeService getInstance(HttpServletRequest request) { 
104
    return new CNodeService(request);
105
  }
106
  
107
  /**
108
   * Constructor, private for singleton access
109
   */
110
  private CNodeService(HttpServletRequest request) {
111
    super(request);
112
    logMetacat = Logger.getLogger(CNodeService.class);
113
        
114
  }
115
    
116
  /**
117
   * Set the replication policy for an object given the object identifier
118
   * 
119
   * @param session - the Session object containing the credentials for the Subject
120
   * @param pid - the object identifier for the given object
121
   * @param policy - the replication policy to be applied
122
   * 
123
   * @return true or false
124
   * 
125
   * @throws NotImplemented
126
   * @throws NotAuthorized
127
   * @throws ServiceFailure
128
   * @throws InvalidRequest
129
   * @throws VersionMismatch
130
   * 
131
   */
132
  @Override
133
  public boolean setReplicationPolicy(Session session, Identifier pid,
134
      ReplicationPolicy policy, long serialVersion) 
135
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
136
      InvalidRequest, InvalidToken, VersionMismatch {
137
      
138
      // The lock to be used for this identifier
139
      Lock lock = null;
140
      
141
      // get the subject
142
      Subject subject = session.getSubject();
143
      
144
      // are we allowed to do this?
145
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
146
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
147
                  + " not allowed by " + subject.getValue() + " on "
148
                  + pid.getValue());
149
          
150
      }
151
      
152
      SystemMetadata systemMetadata = null;
153
      try {
154
          lock = HazelcastService.getInstance().getLock(pid.getValue());
155
          lock.lock();
156
          logMetacat.debug("Locked identifier " + pid.getValue());
157

    
158
          try {
159
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
160
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
161
                  
162
              }
163
              
164
              // did we get it correctly?
165
              if ( systemMetadata == null ) {
166
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
167
                  
168
              }
169

    
170
              // does the request have the most current system metadata?
171
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
172
                 String msg = "The requested system metadata version number " + 
173
                     serialVersion + " differs from the current version at " +
174
                     systemMetadata.getSerialVersion().longValue() +
175
                     ". Please get the latest copy in order to modify it.";
176
                 throw new VersionMismatch("4886", msg);
177
                 
178
              }
179
              
180
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
181
              throw new NotFound("4884", "No record found for: " + pid.getValue());
182
            
183
          }
184
          
185
          // set the new policy
186
          systemMetadata.setReplicationPolicy(policy);
187
          
188
          // update the metadata
189
          try {
190
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
191
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
192
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
193
              notifyReplicaNodes(systemMetadata);
194
              
195
          } catch (RuntimeException e) {
196
              throw new ServiceFailure("4882", e.getMessage());
197
          
198
          }
199
          
200
      } catch (RuntimeException e) {
201
          throw new ServiceFailure("4882", e.getMessage());
202
          
203
      } finally {
204
          lock.unlock();
205
          logMetacat.debug("Unlocked identifier " + pid.getValue());
206
          
207
      }
208
    
209
      return true;
210
  }
211

    
212
  /**
213
   * Deletes the replica from the given Member Node
214
   * NOTE: MN.delete() may be an "archive" operation. TBD.
215
   * @param session
216
   * @param pid
217
   * @param nodeId
218
   * @param serialVersion
219
   * @return
220
   * @throws InvalidToken
221
   * @throws ServiceFailure
222
   * @throws NotAuthorized
223
   * @throws NotFound
224
   * @throws NotImplemented
225
   * @throws VersionMismatch
226
   */
227
  @Override
228
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
229
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
230
	  
231
	  	// The lock to be used for this identifier
232
		Lock lock = null;
233

    
234
		// get the subject
235
		Subject subject = session.getSubject();
236

    
237
		// are we allowed to do this?
238
		boolean isAuthorized = false;
239
		try {
240
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
241
		} catch (InvalidRequest e) {
242
			throw new ServiceFailure("4882", e.getDescription());
243
		}
244
		if (!isAuthorized) {
245
			throw new NotAuthorized("4881", Permission.WRITE
246
					+ " not allowed by " + subject.getValue() + " on "
247
					+ pid.getValue());
248

    
249
		}
250

    
251
		SystemMetadata systemMetadata = null;
252
		try {
253
			lock = HazelcastService.getInstance().getLock(pid.getValue());
254
			lock.lock();
255
			logMetacat.debug("Locked identifier " + pid.getValue());
256

    
257
			try {
258
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
259
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
260
				}
261

    
262
				// did we get it correctly?
263
				if (systemMetadata == null) {
264
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
265
				}
266

    
267
				// does the request have the most current system metadata?
268
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
269
					String msg = "The requested system metadata version number "
270
							+ serialVersion
271
							+ " differs from the current version at "
272
							+ systemMetadata.getSerialVersion().longValue()
273
							+ ". Please get the latest copy in order to modify it.";
274
					throw new VersionMismatch("4886", msg);
275

    
276
				}
277

    
278
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
279
				throw new NotFound("4884", "No record found for: " + pid.getValue());
280

    
281
			}
282
			  
283
			// check permissions
284
			// TODO: is this necessary?
285
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
286
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
287
			if (!isAllowed) {
288
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
289
			}
290
			  
291
			// delete the replica from the given node
292
			// CSJ: use CN.delete() to truly delete a replica, semantically
293
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
294
			//D1Client.getMN(nodeId).delete(session, pid);
295
			  
296
			// reflect that change in the system metadata
297
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
298
			for (Replica r: systemMetadata.getReplicaList()) {
299
				  if (r.getReplicaMemberNode().equals(nodeId)) {
300
					  updatedReplicas.remove(r);
301
					  break;
302
				  }
303
			}
304
			systemMetadata.setReplicaList(updatedReplicas);
305

    
306
			// update the metadata
307
			try {
308
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
309
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
310
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
311
			} catch (RuntimeException e) {
312
				throw new ServiceFailure("4882", e.getMessage());
313
			}
314

    
315
		} catch (RuntimeException e) {
316
			throw new ServiceFailure("4882", e.getMessage());
317
		} finally {
318
			lock.unlock();
319
			logMetacat.debug("Unlocked identifier " + pid.getValue());
320
		}
321

    
322
		return true;	  
323
	  
324
  }
325
  
326
  /**
327
   * Deletes an object from the Coordinating Node, where the object is a 
328
   * a science metadata object.
329
   * 
330
   * @param session - the Session object containing the credentials for the Subject
331
   * @param pid - The object identifier to be deleted
332
   * 
333
   * @return pid - the identifier of the object used for the deletion
334
   * 
335
   * @throws InvalidToken
336
   * @throws ServiceFailure
337
   * @throws NotAuthorized
338
   * @throws NotFound
339
   * @throws NotImplemented
340
   * @throws InvalidRequest
341
   */
342
  @Override
343
  public Identifier delete(Session session, Identifier pid) 
344
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
345

    
346
	  // check that it is CN/admin
347
	  boolean allowed = isAdminAuthorized(session);
348
	  
349
	  if (!allowed) {
350
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
351
		  logMetacat.info(msg);
352
		  throw new NotAuthorized("1320", msg);
353
	  }
354
	  
355
	  // defer to superclass implementation
356
	  Identifier retId =  super.delete(session, pid);
357
      
358
	  // notify the replicas
359
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
360
	  if (systemMetadata.getReplicaList() != null) {
361
		  for (Replica replica: systemMetadata.getReplicaList()) {
362
			  NodeReference replicaNode = replica.getReplicaMemberNode();
363
			  try {
364
				  Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
365
			  } catch (Exception e) {
366
				  // all we can really do is log errors and carry on with life
367
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
368
			}
369
			  
370
		  }
371
	  }
372
	  
373
	  return retId;
374
      
375
  }
376
  
377
  /**
378
   * Deletes an object from the Coordinating Node, where the object is a 
379
   * a science metadata object.
380
   * 
381
   * @param session - the Session object containing the credentials for the Subject
382
   * @param pid - The object identifier to be deleted
383
   * 
384
   * @return pid - the identifier of the object used for the deletion
385
   * 
386
   * @throws InvalidToken
387
   * @throws ServiceFailure
388
   * @throws NotAuthorized
389
   * @throws NotFound
390
   * @throws NotImplemented
391
   * @throws InvalidRequest
392
   */
393
  @Override
394
  public Identifier archive(Session session, Identifier pid) 
395
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
396

    
397
	  // check that it is CN/admin
398
	  boolean allowed = isAdminAuthorized(session);
399
	  
400
	  if (!allowed) {
401
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
402
		  logMetacat.info(msg);
403
		  throw new NotAuthorized("1320", msg);
404
	  }
405
	  
406
	  // defer to superclass implementation
407
	  Identifier retId =  super.archive(session, pid);
408
      
409
	  // notify the replicas
410
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
411
	  if (systemMetadata.getReplicaList() != null) {
412
		  for (Replica replica: systemMetadata.getReplicaList()) {
413
			  NodeReference replicaNode = replica.getReplicaMemberNode();
414
			  try {
415
				  // TODO: implement in the clients
416
				  //Identifier mnRetId = D1Client.getMN(replicaNode).archive(null, pid);
417
			  } catch (Exception e) {
418
				  // all we can really do is log errors and carry on with life
419
				  logMetacat.error("Error archiving pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
420
			}
421
			  
422
		  }
423
	  }
424
	  
425
	  return retId;
426
      
427
  }
428
  
429
  /**
430
   * Set the obsoletedBy attribute in System Metadata
431
   * @param session
432
   * @param pid
433
   * @param obsoletedByPid
434
   * @param serialVersion
435
   * @return
436
   * @throws NotImplemented
437
   * @throws NotFound
438
   * @throws NotAuthorized
439
   * @throws ServiceFailure
440
   * @throws InvalidRequest
441
   * @throws InvalidToken
442
   * @throws VersionMismatch
443
   */
444
  @Override
445
  public boolean setObsoletedBy(Session session, Identifier pid,
446
			Identifier obsoletedByPid, long serialVersion)
447
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
448
			InvalidRequest, InvalidToken, VersionMismatch {
449

    
450
		// The lock to be used for this identifier
451
		Lock lock = null;
452

    
453
		// get the subject
454
		Subject subject = session.getSubject();
455

    
456
		// are we allowed to do this?
457
		if (!isAuthorized(session, pid, Permission.WRITE)) {
458
			throw new NotAuthorized("4881", Permission.WRITE
459
					+ " not allowed by " + subject.getValue() + " on "
460
					+ pid.getValue());
461

    
462
		}
463

    
464

    
465
		SystemMetadata systemMetadata = null;
466
		try {
467
			lock = HazelcastService.getInstance().getLock(pid.getValue());
468
			lock.lock();
469
			logMetacat.debug("Locked identifier " + pid.getValue());
470

    
471
			try {
472
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
473
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
474
				}
475

    
476
				// did we get it correctly?
477
				if (systemMetadata == null) {
478
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
479
				}
480

    
481
				// does the request have the most current system metadata?
482
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
483
					String msg = "The requested system metadata version number "
484
							+ serialVersion
485
							+ " differs from the current version at "
486
							+ systemMetadata.getSerialVersion().longValue()
487
							+ ". Please get the latest copy in order to modify it.";
488
					throw new VersionMismatch("4886", msg);
489

    
490
				}
491

    
492
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
493
				throw new NotFound("4884", "No record found for: " + pid.getValue());
494

    
495
			}
496

    
497
			// set the new policy
498
			systemMetadata.setObsoletedBy(obsoletedByPid);
499

    
500
			// update the metadata
501
			try {
502
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
503
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
504
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
505
			} catch (RuntimeException e) {
506
				throw new ServiceFailure("4882", e.getMessage());
507
			}
508

    
509
		} catch (RuntimeException e) {
510
			throw new ServiceFailure("4882", e.getMessage());
511
		} finally {
512
			lock.unlock();
513
			logMetacat.debug("Unlocked identifier " + pid.getValue());
514
		}
515

    
516
		return true;
517
	}
518
  
519
  
520
  /**
521
   * Set the replication status for an object given the object identifier
522
   * 
523
   * @param session - the Session object containing the credentials for the Subject
524
   * @param pid - the object identifier for the given object
525
   * @param status - the replication status to be applied
526
   * 
527
   * @return true or false
528
   * 
529
   * @throws NotImplemented
530
   * @throws NotAuthorized
531
   * @throws ServiceFailure
532
   * @throws InvalidRequest
533
   * @throws InvalidToken
534
   * @throws NotFound
535
   * 
536
   */
537
  @Override
538
  public boolean setReplicationStatus(Session session, Identifier pid,
539
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
540
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
541
      InvalidRequest, NotFound {
542
	  
543
	  // cannot be called by public
544
	  if (session == null) {
545
		  throw new NotAuthorized("4720", "Session cannot be null");
546
	  }
547
      
548
      // The lock to be used for this identifier
549
      Lock lock = null;
550
      
551
      boolean allowed = false;
552
      int replicaEntryIndex = -1;
553
      List<Replica> replicas = null;
554
      // get the subject
555
      Subject subject = session.getSubject();
556
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
557
          " is " + status.toString());
558
      
559
      SystemMetadata systemMetadata = null;
560

    
561
      try {
562
          lock = HazelcastService.getInstance().getLock(pid.getValue());
563
          lock.lock();
564
          logMetacat.debug("Locked identifier " + pid.getValue());
565

    
566
          try {      
567
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
568

    
569
              // did we get it correctly?
570
              if ( systemMetadata == null ) {
571
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
572
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
573
                  
574
              }
575
              replicas = systemMetadata.getReplicaList();
576
              int count = 0;
577
              
578
              // was there a failure? log it
579
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
580
                 String msg = "The replication request of the object identified by " + 
581
                     pid.getValue() + " failed.  The error message was " +
582
                     failure.getMessage() + ".";
583
              }
584
              
585
              if (replicas.size() > 0 && replicas != null) {
586
                  // find the target replica index in the replica list
587
                  for (Replica replica : replicas) {
588
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
589
                      String targetNodeStr = targetNode.getValue();
590
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
591
                  
592
                      if (replicaNodeStr.equals(targetNodeStr)) {
593
                          replicaEntryIndex = count;
594
                          logMetacat.debug("replica entry index is: "
595
                                  + replicaEntryIndex);
596
                          break;
597
                      }
598
                      count++;
599
                  
600
                  }
601
              }
602
              // are we allowed to do this? only CNs and target MNs are allowed
603
              CNode cn = D1Client.getCN();
604
              List<Node> nodes = cn.listNodes().getNodeList();
605
              
606
              // find the node in the node list
607
              for ( Node node : nodes ) {
608
                  
609
                  NodeReference nodeReference = node.getIdentifier();
610
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
611
                      nodeReference.getValue());
612
                  
613
                  // allow target MN certs
614
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
615
                      node.getType().equals(NodeType.MN)) {
616
                      List<Subject> nodeSubjects = node.getSubjectList();
617
                      
618
                      // check if the session subject is in the node subject list
619
                      for (Subject nodeSubject : nodeSubjects) {
620
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
621
                                  nodeSubject.getValue() + " and " + subject.getValue());
622
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
623
                              
624
                              // lastly limit to COMPLETED, INVALIDATED,
625
                              // and FAILED status updates from MNs only
626
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
627
                                   status.equals(ReplicationStatus.INVALIDATED) ||
628
                                   status.equals(ReplicationStatus.FAILED)) {
629
                                  allowed = true;
630
                                  break;
631
                                  
632
                              }                              
633
                          }
634
                      }                 
635
                  }
636
              }
637

    
638
              if ( !allowed ) {
639
                  //check for CN admin access
640
                  allowed = isAuthorized(session, pid, Permission.WRITE);
641
                  
642
              }              
643
              
644
              if ( !allowed ) {
645
                  String msg = "The subject identified by "
646
                          + subject.getValue()
647
                          + " does not have permission to set the replication status for "
648
                          + "the replica identified by "
649
                          + targetNode.getValue() + ".";
650
                  logMetacat.info(msg);
651
                  throw new NotAuthorized("4720", msg);
652
                  
653
              }
654

    
655
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
656
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
657
                " : " + e.getMessage());
658
            
659
          }
660
          
661
          Replica targetReplica = new Replica();
662
          // set the status for the replica
663
          if ( replicaEntryIndex != -1 ) {
664
              targetReplica = replicas.get(replicaEntryIndex);
665
              
666
              // don't allow status to change from COMPLETED to anything other
667
              // than INVALIDATED: prevents overwrites from race conditions
668
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
669
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
670
            	  throw new InvalidRequest("4730", "Status state change from " +
671
            			  targetReplica.getReplicationStatus() + " to " +
672
            			  status.toString() + "is prohibited for identifier " +
673
            			  pid.getValue() + " and target node " + 
674
            			  targetReplica.getReplicaMemberNode().getValue());
675
              }
676
              
677
              targetReplica.setReplicationStatus(status);
678
              
679
              logMetacat.debug("Set the replication status for " + 
680
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
681
                  targetReplica.getReplicationStatus() + " for identifier " +
682
                  pid.getValue());
683
              
684
          } else {
685
              // this is a new entry, create it
686
              targetReplica.setReplicaMemberNode(targetNode);
687
              targetReplica.setReplicationStatus(status);
688
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
689
              replicas.add(targetReplica);
690
              
691
          }
692
          
693
          systemMetadata.setReplicaList(replicas);
694
                
695
          // update the metadata
696
          try {
697
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
698
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
699
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
700

    
701
              if ( !status.equals(ReplicationStatus.QUEUED) && 
702
            	   !status.equals(ReplicationStatus.REQUESTED)) {
703
                  
704
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
705
                          "\tNODE:\t" + targetNode.getValue() + 
706
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
707
                
708
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
709
                          "\tPID:\t"  + pid.getValue() + 
710
                          "\tNODE:\t" + targetNode.getValue() + 
711
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
712
              }
713

    
714
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
715
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
716
                      " on target node " + targetNode + ". The exception was: " +
717
                      failure.getMessage());
718
              }
719
              
720
			  // update the replica nodes about the completed replica when complete
721
              if (status.equals(ReplicationStatus.COMPLETED)) {
722
				broadcastSystemMetadataChange(systemMetadata);
723
			}
724
          
725
          } catch (RuntimeException e) {
726
              throw new ServiceFailure("4700", e.getMessage());
727
          
728
          }
729
          
730
    } catch (RuntimeException e) {
731
        String msg = "There was a RuntimeException getting the lock for " +
732
            pid.getValue();
733
        logMetacat.info(msg);
734
        
735
    } finally {
736
        lock.unlock();
737
        logMetacat.debug("Unlocked identifier " + pid.getValue());
738
        
739
    }
740
      
741
      return true;
742
  }
743
  
744
  /*
745
   * Inform each replica node that system metadata has changed
746
   * 
747
   * @param systemMetadata  the system metadata object with the replica list
748
   */
749
  private void broadcastSystemMetadataChange(SystemMetadata systemMetadata) {
750
	  
751
      CNode cn = null;
752
      NodeList nodeList = new NodeList();
753
      List<Node> nodes = new ArrayList<Node>();
754

    
755
	  List<Replica> replicaList = systemMetadata.getReplicaList();
756

    
757
	  // get the node list so we know the node type
758
	  try {
759
		cn = D1Client.getCN();
760
		nodeList = cn.listNodes();
761
	    nodes = nodeList.getNodeList();
762
		
763
	    // iterate through the replica list and inform each MN of the system metadata change
764
		for (Replica replica : replicaList) {
765
		    NodeReference nodeId = replica.getReplicaMemberNode();
766
		    try {
767
		        for (Node node : nodes) {
768
		      	    if ( node.getIdentifier().equals(nodeId) ) {
769
		      		    if ( node.getType().equals(NodeType.MN) ) {
770
		      		        MNode replicaNode = D1Client.getMN(nodeId);
771
		      		        // call MN.systemMetadataChanged();
772
		      		        replicaNode.systemMetadataChanged(null, 
773
		      		            systemMetadata.getIdentifier(), 
774
		      		            systemMetadata.getSerialVersion().longValue(), 
775
		      		            systemMetadata.getDateSysMetadataModified());
776
		      		        if (logMetacat.isDebugEnabled()) {
777
								logMetacat.debug("Called systemMetadataChanged() for identifier " + 
778
		      		                systemMetadata.getIdentifier().getValue() + 
779
		      		                " for node " + nodeId.getValue());
780
							}
781
		      		    }
782
		      	    }
783
		        }
784
		        
785
		    } catch (BaseException e) {
786
			    logMetacat.error("Couldn't contact " + nodeId.getValue() + 
787
		            " to inform it of the system metadata change for identifier " + 
788
			  	  systemMetadata.getIdentifier().getValue());
789
		    }     
790
		}
791

    
792
	  } catch (BaseException e1) {
793
		  logMetacat.error("Couldn't get the node list from the CN to broadcast the system " +
794
				    "metadata change for identifier " + systemMetadata.getIdentifier().getValue());
795
	  }
796
	  
797
	  
798
}
799

    
800
/**
801
   * Return the checksum of the object given the identifier 
802
   * 
803
   * @param session - the Session object containing the credentials for the Subject
804
   * @param pid - the object identifier for the given object
805
   * 
806
   * @return checksum - the checksum of the object
807
   * 
808
   * @throws InvalidToken
809
   * @throws ServiceFailure
810
   * @throws NotAuthorized
811
   * @throws NotFound
812
   * @throws NotImplemented
813
   */
814
  @Override
815
  public Checksum getChecksum(Session session, Identifier pid)
816
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
817
    NotImplemented {
818
    
819
	boolean isAuthorized = false;
820
	try {
821
		isAuthorized = isAuthorized(session, pid, Permission.READ);
822
	} catch (InvalidRequest e) {
823
		throw new ServiceFailure("1410", e.getDescription());
824
	}  
825
    if (!isAuthorized) {
826
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
827
    }
828
    
829
    SystemMetadata systemMetadata = null;
830
    Checksum checksum = null;
831
    
832
    try {
833
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
834

    
835
        if (systemMetadata == null ) {
836
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
837
        }
838
        checksum = systemMetadata.getChecksum();
839
        
840
    } catch (RuntimeException e) {
841
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
842
            pid.getValue() + ". The error message was: " + e.getMessage());
843
      
844
    }
845
    
846
    return checksum;
847
  }
848

    
849
  /**
850
   * Resolve the location of a given object
851
   * 
852
   * @param session - the Session object containing the credentials for the Subject
853
   * @param pid - the object identifier for the given object
854
   * 
855
   * @return objectLocationList - the list of nodes known to contain the object
856
   * 
857
   * @throws InvalidToken
858
   * @throws ServiceFailure
859
   * @throws NotAuthorized
860
   * @throws NotFound
861
   * @throws NotImplemented
862
   */
863
  @Override
864
  public ObjectLocationList resolve(Session session, Identifier pid)
865
    throws InvalidToken, ServiceFailure, NotAuthorized,
866
    NotFound, NotImplemented {
867

    
868
    throw new NotImplemented("4131", "resolve not implemented");
869

    
870
  }
871

    
872
  /**
873
   * Metacat does not implement this method at the CN level
874
   */
875
  @Override
876
  public ObjectList search(Session session, String queryType, String query)
877
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
878
    NotImplemented {
879

    
880
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
881
	  
882
//    ObjectList objectList = null;
883
//    try {
884
//        objectList = 
885
//          IdentifierManager.getInstance().querySystemMetadata(
886
//              null, //startTime, 
887
//              null, //endTime,
888
//              null, //objectFormat, 
889
//              false, //replicaStatus, 
890
//              0, //start, 
891
//              1000 //count
892
//              );
893
//        
894
//    } catch (Exception e) {
895
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
896
//    }
897
//
898
//      return objectList;
899
		  
900
  }
901
  
902
  /**
903
   * Returns the object format registered in the DataONE Object Format 
904
   * Vocabulary for the given format identifier
905
   * 
906
   * @param fmtid - the identifier of the format requested
907
   * 
908
   * @return objectFormat - the object format requested
909
   * 
910
   * @throws ServiceFailure
911
   * @throws NotFound
912
   * @throws InsufficientResources
913
   * @throws NotImplemented
914
   */
915
  @Override
916
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
917
    throws ServiceFailure, NotFound, NotImplemented {
918
     
919
      return ObjectFormatService.getInstance().getFormat(fmtid);
920
      
921
  }
922

    
923
  /**
924
   * Returns a list of all object formats registered in the DataONE Object 
925
   * Format Vocabulary
926
    * 
927
   * @return objectFormatList - The list of object formats registered in 
928
   *                            the DataONE Object Format Vocabulary
929
   * 
930
   * @throws ServiceFailure
931
   * @throws NotImplemented
932
   * @throws InsufficientResources
933
   */
934
  @Override
935
  public ObjectFormatList listFormats() 
936
    throws ServiceFailure, NotImplemented {
937

    
938
    return ObjectFormatService.getInstance().listFormats();
939
  }
940

    
941
  /**
942
   * Returns a list of nodes that have been registered with the DataONE infrastructure
943
    * 
944
   * @return nodeList - List of nodes from the registry
945
   * 
946
   * @throws ServiceFailure
947
   * @throws NotImplemented
948
   */
949
  @Override
950
  public NodeList listNodes() 
951
    throws NotImplemented, ServiceFailure {
952

    
953
    throw new NotImplemented("4800", "listNodes not implemented");
954
  }
955

    
956
  /**
957
   * Provides a mechanism for adding system metadata independently of its 
958
   * associated object, such as when adding system metadata for data objects.
959
    * 
960
   * @param session - the Session object containing the credentials for the Subject
961
   * @param pid - The identifier of the object to register the system metadata against
962
   * @param sysmeta - The system metadata to be registered
963
   * 
964
   * @return true if the registration succeeds
965
   * 
966
   * @throws NotImplemented
967
   * @throws NotAuthorized
968
   * @throws ServiceFailure
969
   * @throws InvalidRequest
970
   * @throws InvalidSystemMetadata
971
   */
972
  @Override
973
  public Identifier registerSystemMetadata(Session session, Identifier pid,
974
      SystemMetadata sysmeta) 
975
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
976
      InvalidSystemMetadata {
977

    
978
      // The lock to be used for this identifier
979
      Lock lock = null;
980

    
981
      // TODO: control who can call this?
982
      if (session == null) {
983
          //TODO: many of the thrown exceptions do not use the correct error codes
984
          //check these against the docs and correct them
985
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
986
                  "  If you are not logged in, please do so and retry the request.");
987
      }
988
      
989
      // verify that guid == SystemMetadata.getIdentifier()
990
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
991
          "|" + sysmeta.getIdentifier().getValue());
992
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
993
          throw new InvalidRequest("4863", 
994
              "The identifier in method call (" + pid.getValue() + 
995
              ") does not match identifier in system metadata (" +
996
              sysmeta.getIdentifier().getValue() + ").");
997
      }
998

    
999
      try {
1000
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1001
          lock.lock();
1002
          logMetacat.debug("Locked identifier " + pid.getValue());
1003
          logMetacat.debug("Checking if identifier exists...");
1004
          // Check that the identifier does not already exist
1005
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1006
              throw new InvalidRequest("4863", 
1007
                  "The identifier is already in use by an existing object.");
1008
          
1009
          }
1010
          
1011
          // insert the system metadata into the object store
1012
          logMetacat.debug("Starting to insert SystemMetadata...");
1013
          try {
1014
              sysmeta.setSerialVersion(BigInteger.ONE);
1015
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1016
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1017
              
1018
          } catch (RuntimeException e) {
1019
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1020
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1021
                  e.getClass() + ": " + e.getMessage());
1022
              
1023
          }
1024
          
1025
      } catch (RuntimeException e) {
1026
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1027
                  e.getClass() + ": " + e.getMessage());
1028
          
1029
      }  finally {
1030
          lock.unlock();
1031
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1032
          
1033
      }
1034

    
1035
      
1036
      logMetacat.debug("Returning from registerSystemMetadata");
1037
      EventLog.getInstance().log(request.getRemoteAddr(), 
1038
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1039
          pid.getValue(), "registerSystemMetadata");
1040
      return pid;
1041
  }
1042
  
1043
  /**
1044
   * Given an optional scope and format, reserves and returns an identifier 
1045
   * within that scope and format that is unique and will not be 
1046
   * used by any other sessions. 
1047
    * 
1048
   * @param session - the Session object containing the credentials for the Subject
1049
   * @param pid - The identifier of the object to register the system metadata against
1050
   * @param scope - An optional string to be used to qualify the scope of 
1051
   *                the identifier namespace, which is applied differently 
1052
   *                depending on the format requested. If scope is not 
1053
   *                supplied, a default scope will be used.
1054
   * @param format - The optional name of the identifier format to be used, 
1055
   *                  drawn from a DataONE-specific vocabulary of identifier 
1056
   *                 format names, including several common syntaxes such 
1057
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1058
   *                 format is not supplied by the caller, the CN service 
1059
   *                 will use a default identifier format, which may change 
1060
   *                 over time.
1061
   * 
1062
   * @return true if the registration succeeds
1063
   * 
1064
   * @throws InvalidToken
1065
   * @throws ServiceFailure
1066
   * @throws NotAuthorized
1067
   * @throws IdentifierNotUnique
1068
   * @throws NotImplemented
1069
   */
1070
  @Override
1071
  public Identifier reserveIdentifier(Session session, Identifier pid)
1072
  throws InvalidToken, ServiceFailure,
1073
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1074

    
1075
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1076
  }
1077
  
1078
  @Override
1079
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1080
  throws InvalidToken, ServiceFailure,
1081
        NotAuthorized, NotImplemented, InvalidRequest {
1082
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1083
  }
1084
  
1085
  /**
1086
    * Checks whether the pid is reserved by the subject in the session param
1087
    * If the reservation is held on the pid by the subject, we return true.
1088
    * 
1089
   * @param session - the Session object containing the Subject
1090
   * @param pid - The identifier to check
1091
   * 
1092
   * @return true if the reservation exists for the subject/pid
1093
   * 
1094
   * @throws InvalidToken
1095
   * @throws ServiceFailure
1096
   * @throws NotFound - when the pid is not found (in use or in reservation)
1097
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1098
   * @throws IdentifierNotUnique - when the pid is in use
1099
   * @throws NotImplemented
1100
   */
1101

    
1102
  @Override
1103
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1104
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1105
      NotImplemented, InvalidRequest {
1106
  
1107
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1108
  }
1109

    
1110
  /**
1111
   * Changes ownership (RightsHolder) of the specified object to the 
1112
   * subject specified by userId
1113
    * 
1114
   * @param session - the Session object containing the credentials for the Subject
1115
   * @param pid - Identifier of the object to be modified
1116
   * @param userId - The subject that will be taking ownership of the specified object.
1117
   *
1118
   * @return pid - the identifier of the modified object
1119
   * 
1120
   * @throws ServiceFailure
1121
   * @throws InvalidToken
1122
   * @throws NotFound
1123
   * @throws NotAuthorized
1124
   * @throws NotImplemented
1125
   * @throws InvalidRequest
1126
   */  
1127
  @Override
1128
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1129
      long serialVersion)
1130
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1131
      NotImplemented, InvalidRequest, VersionMismatch {
1132
      
1133
      // The lock to be used for this identifier
1134
      Lock lock = null;
1135

    
1136
      // get the subject
1137
      Subject subject = session.getSubject();
1138
      
1139
      // are we allowed to do this?
1140
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1141
          throw new NotAuthorized("4440", "not allowed by "
1142
                  + subject.getValue() + " on " + pid.getValue());
1143
          
1144
      }
1145
      
1146
      SystemMetadata systemMetadata = null;
1147
      try {
1148
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1149
          lock.lock();
1150
          logMetacat.debug("Locked identifier " + pid.getValue());
1151

    
1152
          try {
1153
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1154
              
1155
              // does the request have the most current system metadata?
1156
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1157
                 String msg = "The requested system metadata version number " + 
1158
                     serialVersion + " differs from the current version at " +
1159
                     systemMetadata.getSerialVersion().longValue() +
1160
                     ". Please get the latest copy in order to modify it.";
1161
                 throw new VersionMismatch("4443", msg);
1162
              }
1163
              
1164
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1165
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1166
              
1167
          }
1168
              
1169
          // set the new rights holder
1170
          systemMetadata.setRightsHolder(userId);
1171
          
1172
          // update the metadata
1173
          try {
1174
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1175
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1176
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1177
              notifyReplicaNodes(systemMetadata);
1178
              
1179
          } catch (RuntimeException e) {
1180
              throw new ServiceFailure("4490", e.getMessage());
1181
          
1182
          }
1183
          
1184
      } catch (RuntimeException e) {
1185
          throw new ServiceFailure("4490", e.getMessage());
1186
          
1187
      } finally {
1188
          lock.unlock();
1189
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1190
      
1191
      }
1192
      
1193
      return pid;
1194
  }
1195

    
1196
  /**
1197
   * Verify that a replication task is authorized by comparing the target node's
1198
   * Subject (from the X.509 certificate-derived Session) with the list of 
1199
   * subjects in the known, pending replication tasks map.
1200
   * 
1201
   * @param originatingNodeSession - Session information that contains the 
1202
   *                                 identity of the calling user
1203
   * @param targetNodeSubject - Subject identifying the target node
1204
   * @param pid - the identifier of the object to be replicated
1205
   * @param replicatePermission - the execute permission to be granted
1206
   * 
1207
   * @throws ServiceFailure
1208
   * @throws NotImplemented
1209
   * @throws InvalidToken
1210
   * @throws NotAuthorized
1211
   * @throws InvalidRequest
1212
   * @throws NotFound
1213
   */
1214
  @Override
1215
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1216
    Subject targetNodeSubject, Identifier pid) 
1217
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1218
    NotFound, InvalidRequest {
1219
    
1220
    boolean isAllowed = false;
1221
    SystemMetadata sysmeta = null;
1222
    NodeReference targetNode = null;
1223
    
1224
    try {
1225
      // get the target node reference from the nodes list
1226
      CNode cn = D1Client.getCN();
1227
      List<Node> nodes = cn.listNodes().getNodeList();
1228
      
1229
      if ( nodes != null ) {
1230
        for (Node node : nodes) {
1231
            
1232
        	if (node.getSubjectList() != null) {
1233
        		
1234
	            for (Subject nodeSubject : node.getSubjectList()) {
1235
	            	
1236
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1237
	                    targetNode = node.getIdentifier();
1238
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1239
	                    break;
1240
	                }
1241
	            }
1242
        	}
1243
            
1244
            if ( targetNode != null) { break; }
1245
        }
1246
        
1247
      } else {
1248
          String msg = "Couldn't get the node list from the CN";
1249
          logMetacat.debug(msg);
1250
          throw new ServiceFailure("4872", msg);
1251
          
1252
      }
1253
      
1254
      // can't find a node listed with the given subject
1255
      if ( targetNode == null ) {
1256
          String msg = "There is no Member Node registered with a node subject " +
1257
              "matching " + targetNodeSubject.getValue();
1258
          logMetacat.info(msg);
1259
          throw new NotAuthorized("4871", msg);
1260
          
1261
      }
1262
      
1263
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1264
      
1265
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1266

    
1267
      if ( sysmeta != null ) {
1268
          
1269
          List<Replica> replicaList = sysmeta.getReplicaList();
1270
          
1271
          if ( replicaList != null ) {
1272
              
1273
              // find the replica with the status set to 'requested'
1274
              for (Replica replica : replicaList) {
1275
                  ReplicationStatus status = replica.getReplicationStatus();
1276
                  NodeReference listedNode = replica.getReplicaMemberNode();
1277
                  if ( listedNode != null && targetNode != null ) {
1278
                      logMetacat.debug("Comparing " + listedNode.getValue()
1279
                              + " to " + targetNode.getValue());
1280
                      
1281
                      if (listedNode.getValue().equals(targetNode.getValue())
1282
                              && status.equals(ReplicationStatus.REQUESTED)) {
1283
                          isAllowed = true;
1284
                          break;
1285

    
1286
                      }
1287
                  }
1288
              }
1289
          }
1290
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1291
              "to replicate: " + isAllowed + " for " + pid.getValue());
1292

    
1293
          
1294
      } else {
1295
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1296
          " is null.");          
1297
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1298
          
1299
      }
1300

    
1301
    } catch (RuntimeException e) {
1302
    	  ServiceFailure sf = new ServiceFailure("4872", 
1303
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1304
                e.getMessage());
1305
    	  sf.initCause(e);
1306
        throw sf;
1307
        
1308
    }
1309
      
1310
    return isAllowed;
1311
    
1312
  }
1313

    
1314
  /**
1315
   * Adds a new object to the Node, where the object is a science metadata object.
1316
   * 
1317
   * @param session - the Session object containing the credentials for the Subject
1318
   * @param pid - The object identifier to be created
1319
   * @param object - the object bytes
1320
   * @param sysmeta - the system metadata that describes the object  
1321
   * 
1322
   * @return pid - the object identifier created
1323
   * 
1324
   * @throws InvalidToken
1325
   * @throws ServiceFailure
1326
   * @throws NotAuthorized
1327
   * @throws IdentifierNotUnique
1328
   * @throws UnsupportedType
1329
   * @throws InsufficientResources
1330
   * @throws InvalidSystemMetadata
1331
   * @throws NotImplemented
1332
   * @throws InvalidRequest
1333
   */
1334
  public Identifier create(Session session, Identifier pid, InputStream object,
1335
    SystemMetadata sysmeta) 
1336
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1337
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1338
    NotImplemented, InvalidRequest {
1339
                  
1340
      // The lock to be used for this identifier
1341
      Lock lock = null;
1342

    
1343
      try {
1344
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1345
          // are we allowed?
1346
          boolean isAllowed = false;
1347
          isAllowed = isAdminAuthorized(session);
1348

    
1349
          // proceed if we're called by a CN
1350
          if ( isAllowed ) {
1351
              // create the coordinating node version of the document      
1352
              lock.lock();
1353
              logMetacat.debug("Locked identifier " + pid.getValue());
1354
              sysmeta.setSerialVersion(BigInteger.ONE);
1355
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1356
              sysmeta.setArchived(false); // this is a create op, not update
1357
              
1358
              // the CN should have set the origin and authoritative member node fields
1359
              try {
1360
                  sysmeta.getOriginMemberNode().getValue();
1361
                  sysmeta.getAuthoritativeMemberNode().getValue();
1362
                  
1363
              } catch (NullPointerException npe) {
1364
                  throw new InvalidSystemMetadata("4896", 
1365
                      "Both the origin and authoritative member node identifiers need to be set.");
1366
                  
1367
              }
1368
              pid = super.create(session, pid, object, sysmeta);
1369

    
1370
          } else {
1371
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1372
                  " isn't allowed to call create() on a Coordinating Node.";
1373
              logMetacat.info(msg);
1374
              throw new NotAuthorized("1100", msg);
1375
          }
1376
          
1377
      } catch (RuntimeException e) {
1378
          // Convert Hazelcast runtime exceptions to service failures
1379
          String msg = "There was a problem creating the object identified by " +
1380
              pid.getValue() + ". There error message was: " + e.getMessage();
1381
          throw new ServiceFailure("4893", msg);
1382
          
1383
      } finally {
1384
    	  if (lock != null) {
1385
	          lock.unlock();
1386
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1387
    	  }
1388
      }
1389
      
1390
      return pid;
1391

    
1392
  }
1393

    
1394
  /**
1395
   * Set access for a given object using the object identifier and a Subject
1396
   * under a given Session.
1397
   * 
1398
   * @param session - the Session object containing the credentials for the Subject
1399
   * @param pid - the object identifier for the given object to apply the policy
1400
   * @param policy - the access policy to be applied
1401
   * 
1402
   * @return true if the application of the policy succeeds
1403
   * @throws InvalidToken
1404
   * @throws ServiceFailure
1405
   * @throws NotFound
1406
   * @throws NotAuthorized
1407
   * @throws NotImplemented
1408
   * @throws InvalidRequest
1409
   */
1410
  public boolean setAccessPolicy(Session session, Identifier pid, 
1411
      AccessPolicy accessPolicy, long serialVersion) 
1412
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1413
      NotImplemented, InvalidRequest, VersionMismatch {
1414
      
1415
      // The lock to be used for this identifier
1416
      Lock lock = null;
1417
      SystemMetadata systemMetadata = null;
1418
      
1419
      boolean success = false;
1420
      
1421
      // get the subject
1422
      Subject subject = session.getSubject();
1423
      
1424
      // are we allowed to do this?
1425
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1426
          throw new NotAuthorized("4420", "not allowed by "
1427
                  + subject.getValue() + " on " + pid.getValue());
1428
      }
1429
      
1430
      try {
1431
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1432
          lock.lock();
1433
          logMetacat.debug("Locked identifier " + pid.getValue());
1434

    
1435
          try {
1436
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1437

    
1438
              if ( systemMetadata == null ) {
1439
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1440
                  
1441
              }
1442
              // does the request have the most current system metadata?
1443
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1444
                 String msg = "The requested system metadata version number " + 
1445
                     serialVersion + " differs from the current version at " +
1446
                     systemMetadata.getSerialVersion().longValue() +
1447
                     ". Please get the latest copy in order to modify it.";
1448
                 throw new VersionMismatch("4402", msg);
1449
                 
1450
              }
1451
              
1452
          } catch (RuntimeException e) {
1453
              // convert Hazelcast RuntimeException to NotFound
1454
              throw new NotFound("4400", "No record found for: " + pid);
1455
            
1456
          }
1457
              
1458
          // set the access policy
1459
          systemMetadata.setAccessPolicy(accessPolicy);
1460
          
1461
          // update the system metadata
1462
          try {
1463
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1464
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1465
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1466
              notifyReplicaNodes(systemMetadata);
1467
              
1468
          } catch (RuntimeException e) {
1469
              // convert Hazelcast RuntimeException to ServiceFailure
1470
              throw new ServiceFailure("4430", e.getMessage());
1471
            
1472
          }
1473
          
1474
      } catch (RuntimeException e) {
1475
          throw new ServiceFailure("4430", e.getMessage());
1476
          
1477
      } finally {
1478
          lock.unlock();
1479
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1480
        
1481
      }
1482

    
1483
    
1484
    // TODO: how do we know if the map was persisted?
1485
    success = true;
1486
    
1487
    return success;
1488
  }
1489

    
1490
  /**
1491
   * Full replacement of replication metadata in the system metadata for the 
1492
   * specified object, changes date system metadata modified
1493
   * 
1494
   * @param session - the Session object containing the credentials for the Subject
1495
   * @param pid - the object identifier for the given object to apply the policy
1496
   * @param replica - the replica to be updated
1497
   * @return
1498
   * @throws NotImplemented
1499
   * @throws NotAuthorized
1500
   * @throws ServiceFailure
1501
   * @throws InvalidRequest
1502
   * @throws NotFound
1503
   * @throws VersionMismatch
1504
   */
1505
  @Override
1506
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1507
      Replica replica, long serialVersion) 
1508
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1509
      NotFound, VersionMismatch {
1510
      
1511
      // The lock to be used for this identifier
1512
      Lock lock = null;
1513
      
1514
      // get the subject
1515
      Subject subject = session.getSubject();
1516
      
1517
      // are we allowed to do this?
1518
      try {
1519

    
1520
          // what is the controlling permission?
1521
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1522
              throw new NotAuthorized("4851", "not allowed by "
1523
                      + subject.getValue() + " on " + pid.getValue());
1524
          }
1525

    
1526
        
1527
      } catch (InvalidToken e) {
1528
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1529
                  " on " + pid.getValue());  
1530
          
1531
      }
1532

    
1533
      SystemMetadata systemMetadata = null;
1534
      try {
1535
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1536
          lock.lock();
1537
          logMetacat.debug("Locked identifier " + pid.getValue());
1538

    
1539
          try {      
1540
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1541

    
1542
              // does the request have the most current system metadata?
1543
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1544
                 String msg = "The requested system metadata version number " + 
1545
                     serialVersion + " differs from the current version at " +
1546
                     systemMetadata.getSerialVersion().longValue() +
1547
                     ". Please get the latest copy in order to modify it.";
1548
                 throw new VersionMismatch("4855", msg);
1549
              }
1550
              
1551
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1552
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1553
                  " : " + e.getMessage());
1554
            
1555
          }
1556
              
1557
          // set the status for the replica
1558
          List<Replica> replicas = systemMetadata.getReplicaList();
1559
          NodeReference replicaNode = replica.getReplicaMemberNode();
1560
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1561
          int index = 0;
1562
          for (Replica listedReplica: replicas) {
1563
              
1564
              // remove the replica that we are replacing
1565
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1566
                      // don't allow status to change from COMPLETED to anything other
1567
                      // than INVALIDATED: prevents overwrites from race conditions
1568
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1569
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1570
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1571
                	  throw new InvalidRequest("4853", "Status state change from " +
1572
                			  listedReplica.getReplicationStatus() + " to " +
1573
                			  replicaStatus.toString() + "is prohibited for identifier " +
1574
                			  pid.getValue() + " and target node " + 
1575
                			  listedReplica.getReplicaMemberNode().getValue());
1576

    
1577
            	  }
1578
                  replicas.remove(index);
1579
                  break;
1580
                  
1581
              }
1582
              index++;
1583
          }
1584
          
1585
          // add the new replica item
1586
          replicas.add(replica);
1587
          systemMetadata.setReplicaList(replicas);
1588
          
1589
          // update the metadata
1590
          try {
1591
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1592
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1593
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1594
              
1595
              // inform replica nodes of the change if the status is complete
1596
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1597
            	  broadcastSystemMetadataChange(systemMetadata);
1598
            	  
1599
              }
1600
          } catch (RuntimeException e) {
1601
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1602
              throw new ServiceFailure("4852", e.getMessage());
1603
          
1604
          }
1605
          
1606
      } catch (RuntimeException e) {
1607
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1608
          throw new ServiceFailure("4852", e.getMessage());
1609
      
1610
      } finally {
1611
          lock.unlock();
1612
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1613
          
1614
      }
1615
    
1616
      return true;
1617
      
1618
  }
1619
  
1620
  /**
1621
   * 
1622
   */
1623
  @Override
1624
  public ObjectList listObjects(Session session, Date startTime, 
1625
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1626
      Integer start, Integer count)
1627
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1628
      ServiceFailure {
1629
      
1630
      ObjectList objectList = null;
1631
        try {
1632
        	if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
1633
            	count = MAXIMUM_DB_RECORD_COUNT;
1634
            }
1635
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1636
        } catch (Exception e) {
1637
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1638
        }
1639

    
1640
        return objectList;
1641
  }
1642

    
1643
  
1644
 	/**
1645
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1646
 	 * @return cal  the list of checksum algorithms
1647
 	 * 
1648
 	 * @throws ServiceFailure
1649
 	 * @throws NotImplemented
1650
 	 */
1651
  @Override
1652
  public ChecksumAlgorithmList listChecksumAlgorithms()
1653
			throws ServiceFailure, NotImplemented {
1654
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1655
		cal.addAlgorithm("MD5");
1656
		cal.addAlgorithm("SHA-1");
1657
		return cal;
1658
		
1659
	}
1660

    
1661
  /**
1662
   * Notify replica Member Nodes of system metadata changes for a given pid
1663
   * 
1664
   * @param currentSystemMetadata - the up to date system metadata
1665
   */
1666
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1667
      
1668
      Session session = null;
1669
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1670
      MNode mn = null;
1671
      NodeReference replicaNodeRef = null;
1672
      CNode cn = null;
1673
      NodeType nodeType = null;
1674
      List<Node> nodeList = null;
1675
      
1676
      try {
1677
          cn = D1Client.getCN();
1678
          nodeList = cn.listNodes().getNodeList();
1679
          
1680
      } catch (Exception e) { // handle BaseException and other I/O issues
1681
          
1682
          // swallow errors since the call is not critical
1683
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1684
              "to communication issues with the CN: " + e.getMessage());
1685
          
1686
      }
1687
      
1688
      if ( replicaList != null ) {
1689
          
1690
          // iterate through the replicas and inform  MN replica nodes
1691
          for (Replica replica : replicaList) {
1692
              
1693
              replicaNodeRef = replica.getReplicaMemberNode();
1694
              try {
1695
                  if (nodeList != null) {
1696
                      // find the node type
1697
                      for (Node node : nodeList) {
1698
                          if (node.getIdentifier().getValue() == 
1699
                              replicaNodeRef.getValue()) {
1700
                              nodeType = node.getType();
1701
                              break;
1702
              
1703
                          }
1704
                      }
1705
                  }
1706
              
1707
                  // notify only MNs
1708
                  if (nodeType != null && nodeType == NodeType.MN) {
1709
                      mn = D1Client.getMN(replicaNodeRef);
1710
                      mn.systemMetadataChanged(session, 
1711
                          currentSystemMetadata.getIdentifier(), 
1712
                          currentSystemMetadata.getSerialVersion().longValue(),
1713
                          currentSystemMetadata.getDateSysMetadataModified());
1714
                  }
1715
              
1716
              } catch (Exception e) { // handle BaseException and other I/O issues
1717
              
1718
                  // swallow errors since the call is not critical
1719
                  logMetacat.error("Can't inform "
1720
                          + replicaNodeRef.getValue()
1721
                          + " of system metadata changes due "
1722
                          + "to communication issues with the CN: "
1723
                          + e.getMessage());
1724
              
1725
              }
1726
          }
1727
      }
1728
  }
1729

    
1730
	@Override
1731
	public boolean isAuthorized(Identifier pid, Permission permission)
1732
			throws ServiceFailure, InvalidToken, NotFound, NotAuthorized,
1733
			NotImplemented, InvalidRequest {
1734
		
1735
		return isAuthorized(null, pid, permission);
1736
	}
1737
	
1738
	@Override
1739
	public boolean setAccessPolicy(Identifier pid, AccessPolicy accessPolicy, long serialVersion)
1740
			throws InvalidToken, NotFound, NotImplemented, NotAuthorized,
1741
			ServiceFailure, InvalidRequest, VersionMismatch {
1742
		
1743
		return setAccessPolicy(null, pid, accessPolicy, serialVersion);
1744
	}
1745
	
1746
	@Override
1747
	public Identifier setRightsHolder(Identifier pid, Subject userId, long serialVersion)
1748
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1749
			NotImplemented, InvalidRequest, VersionMismatch {
1750
		
1751
		return setRightsHolder(null, pid, userId, serialVersion);
1752
	}
1753
	
1754
	@Override
1755
	public Identifier create(Identifier pid, InputStream object, SystemMetadata sysmeta)
1756
			throws InvalidToken, ServiceFailure, NotAuthorized,
1757
			IdentifierNotUnique, UnsupportedType, InsufficientResources,
1758
			InvalidSystemMetadata, NotImplemented, InvalidRequest {
1759

    
1760
		return create(null, pid, object, sysmeta);
1761
	}
1762
	
1763
	@Override
1764
	public Identifier delete(Identifier pid) throws InvalidToken, ServiceFailure,
1765
			NotAuthorized, NotFound, NotImplemented {
1766

    
1767
		return delete(null, pid);
1768
	}
1769
	
1770
	@Override
1771
	public Identifier generateIdentifier(String scheme, String fragment)
1772
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1773
			InvalidRequest {
1774

    
1775
		return generateIdentifier(null, scheme, fragment);
1776
	}
1777
	
1778
	@Override
1779
	public Log getLogRecords(Date fromDate, Date toDate, Event event, String pidFilter,
1780
			Integer start, Integer count) throws InvalidToken, InvalidRequest,
1781
			ServiceFailure, NotAuthorized, NotImplemented, InsufficientResources {
1782

    
1783
		return getLogRecords(null, fromDate, toDate, event, pidFilter, start, count);
1784
	}
1785
	
1786
	@Override
1787
	public boolean hasReservation(Subject subject, Identifier pid)
1788
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1789
			NotImplemented, InvalidRequest, IdentifierNotUnique {
1790

    
1791
		return hasReservation(null, subject, pid);
1792
	}
1793
	
1794
	@Override
1795
	public Identifier registerSystemMetadata(Identifier pid, SystemMetadata sysmeta)
1796
			throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1797
			InvalidSystemMetadata, InvalidToken {
1798

    
1799
		return registerSystemMetadata(null, pid, sysmeta);
1800
	}
1801
	
1802
	@Override
1803
	public Identifier reserveIdentifier(Identifier pid) throws InvalidToken,
1804
			ServiceFailure, NotAuthorized, IdentifierNotUnique, NotImplemented,
1805
			InvalidRequest {
1806

    
1807
		return reserveIdentifier(null, pid);
1808
	}
1809
	
1810
	@Override
1811
	public boolean setObsoletedBy(Identifier pid, Identifier obsoletedByPid, long serialVersion)
1812
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
1813
			InvalidRequest, InvalidToken, VersionMismatch {
1814

    
1815
		return setObsoletedBy(null, pid, obsoletedByPid, serialVersion);
1816
	}
1817
	
1818
	@Override
1819
	public DescribeResponse describe(Identifier pid) throws InvalidToken,
1820
			NotAuthorized, NotImplemented, ServiceFailure, NotFound {
1821

    
1822
		return describe(null, pid);
1823
	}
1824
	
1825
	@Override
1826
	public InputStream get(Identifier pid) throws InvalidToken, ServiceFailure,
1827
			NotAuthorized, NotFound, NotImplemented {
1828

    
1829
		return get(null, pid);
1830
	}
1831
	
1832
	@Override
1833
	public Checksum getChecksum(Identifier pid) throws InvalidToken,
1834
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1835

    
1836
		return getChecksum(null, pid);
1837
	}
1838
	
1839
	@Override
1840
	public SystemMetadata getSystemMetadata(Identifier pid) throws InvalidToken,
1841
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1842

    
1843
		return getSystemMetadata(null, pid);
1844
	}
1845
	
1846
	@Override
1847
	public ObjectList listObjects(Date startTime, Date endTime,
1848
			ObjectFormatIdentifier formatid, Boolean replicaStatus, Integer start, Integer count)
1849
			throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1850
			ServiceFailure {
1851

    
1852
		return listObjects(null, startTime, endTime, formatid, replicaStatus, start, count);
1853
	}
1854
	
1855
	@Override
1856
	public ObjectLocationList resolve(Identifier pid) throws InvalidToken,
1857
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1858

    
1859
		return resolve(null, pid);
1860
	}
1861
	
1862
	@Override
1863
	public ObjectList search(String queryType, String query) throws InvalidToken,
1864
			ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented {
1865

    
1866
		return search(null, queryType, query);
1867
	}
1868
	
1869
	@Override
1870
	public boolean deleteReplicationMetadata(Identifier pid, NodeReference nodeId,
1871
			long serialVersion) throws InvalidToken, InvalidRequest, ServiceFailure,
1872
			NotAuthorized, NotFound, NotImplemented, VersionMismatch {
1873

    
1874
		return deleteReplicationMetadata(null, pid, nodeId, serialVersion);
1875
	}
1876
	
1877
	@Override
1878
	public boolean isNodeAuthorized(Subject targetNodeSubject, Identifier pid)
1879
			throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure,
1880
			NotFound, InvalidRequest {
1881

    
1882
		return isNodeAuthorized(null, targetNodeSubject, pid);
1883
	}
1884
	
1885
	@Override
1886
	public boolean setReplicationPolicy(Identifier pid, ReplicationPolicy policy,
1887
			long serialVersion) throws NotImplemented, NotFound, NotAuthorized,
1888
			ServiceFailure, InvalidRequest, InvalidToken, VersionMismatch {
1889

    
1890
		return setReplicationPolicy(null, pid, policy, serialVersion);
1891
	}
1892
	
1893
	@Override
1894
	public boolean setReplicationStatus(Identifier pid, NodeReference targetNode,
1895
			ReplicationStatus status, BaseException failure) throws ServiceFailure,
1896
			NotImplemented, InvalidToken, NotAuthorized, InvalidRequest, NotFound {
1897

    
1898
		return setReplicationStatus(null, pid, targetNode, status, failure);
1899
	}
1900
	
1901
	@Override
1902
	public boolean updateReplicationMetadata(Identifier pid, Replica replica,
1903
			long serialVersion) throws NotImplemented, NotAuthorized, ServiceFailure,
1904
			NotFound, InvalidRequest, InvalidToken, VersionMismatch {
1905

    
1906
		return updateReplicationMetadata(null, pid, replica, serialVersion);
1907
	}
1908

    
1909
  @Override
1910
  public QueryEngineDescription getQueryEngineDescription(String arg0)
1911
          throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1912
          NotFound {
1913
      throw new NotImplemented("4410", "getQueryEngineDescription not implemented");
1914
      
1915
  }
1916

    
1917
  @Override
1918
  public QueryEngineList listQueryEngines() throws InvalidToken, ServiceFailure,
1919
          NotAuthorized, NotImplemented {
1920
      throw new NotImplemented("4420", "listQueryEngines not implemented");
1921
      
1922
  }
1923

    
1924
  @Override
1925
  public InputStream query(String arg0, String arg1) throws InvalidToken,
1926
          ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented, NotFound {
1927
      throw new NotImplemented("4324", "query not implemented");
1928
      
1929
  }
1930
}
(1-1/6)