Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.client.MNode;
40
import org.dataone.service.cn.v1.CNAuthorization;
41
import org.dataone.service.cn.v1.CNCore;
42
import org.dataone.service.cn.v1.CNRead;
43
import org.dataone.service.cn.v1.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.DescribeResponse;
60
import org.dataone.service.types.v1.Event;
61
import org.dataone.service.types.v1.Identifier;
62
import org.dataone.service.types.v1.Log;
63
import org.dataone.service.types.v1.Node;
64
import org.dataone.service.types.v1.NodeList;
65
import org.dataone.service.types.v1.NodeReference;
66
import org.dataone.service.types.v1.NodeType;
67
import org.dataone.service.types.v1.ObjectFormat;
68
import org.dataone.service.types.v1.ObjectFormatIdentifier;
69
import org.dataone.service.types.v1.ObjectFormatList;
70
import org.dataone.service.types.v1.ObjectList;
71
import org.dataone.service.types.v1.ObjectLocationList;
72
import org.dataone.service.types.v1.Permission;
73
import org.dataone.service.types.v1.Replica;
74
import org.dataone.service.types.v1.ReplicationPolicy;
75
import org.dataone.service.types.v1.ReplicationStatus;
76
import org.dataone.service.types.v1.Session;
77
import org.dataone.service.types.v1.Subject;
78
import org.dataone.service.types.v1.SystemMetadata;
79
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
80
import org.dataone.service.types.v1_1.QueryEngineDescription;
81
import org.dataone.service.types.v1_1.QueryEngineList;
82

    
83
import edu.ucsb.nceas.metacat.EventLog;
84
import edu.ucsb.nceas.metacat.IdentifierManager;
85
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
86

    
87
/**
88
 * Represents Metacat's implementation of the DataONE Coordinating Node 
89
 * service API. Methods implement the various CN* interfaces, and methods common
90
 * to both Member Node and Coordinating Node interfaces are found in the
91
 * D1NodeService super class.
92
 *
93
 */
94
public class CNodeService extends D1NodeService implements CNAuthorization,
95
    CNCore, CNRead, CNReplication {
96

    
97
  /* the logger instance */
98
  private Logger logMetacat = null;
99

    
100
  /**
101
   * singleton accessor
102
   */
103
  public static CNodeService getInstance(HttpServletRequest request) { 
104
    return new CNodeService(request);
105
  }
106
  
107
  /**
108
   * Constructor, private for singleton access
109
   */
110
  private CNodeService(HttpServletRequest request) {
111
    super(request);
112
    logMetacat = Logger.getLogger(CNodeService.class);
113
        
114
  }
115
    
116
  /**
117
   * Set the replication policy for an object given the object identifier
118
   * 
119
   * @param session - the Session object containing the credentials for the Subject
120
   * @param pid - the object identifier for the given object
121
   * @param policy - the replication policy to be applied
122
   * 
123
   * @return true or false
124
   * 
125
   * @throws NotImplemented
126
   * @throws NotAuthorized
127
   * @throws ServiceFailure
128
   * @throws InvalidRequest
129
   * @throws VersionMismatch
130
   * 
131
   */
132
  @Override
133
  public boolean setReplicationPolicy(Session session, Identifier pid,
134
      ReplicationPolicy policy, long serialVersion) 
135
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
136
      InvalidRequest, InvalidToken, VersionMismatch {
137
      
138
      // The lock to be used for this identifier
139
      Lock lock = null;
140
      
141
      // get the subject
142
      Subject subject = session.getSubject();
143
      
144
      // are we allowed to do this?
145
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
146
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
147
                  + " not allowed by " + subject.getValue() + " on "
148
                  + pid.getValue());
149
          
150
      }
151
      
152
      SystemMetadata systemMetadata = null;
153
      try {
154
          lock = HazelcastService.getInstance().getLock(pid.getValue());
155
          lock.lock();
156
          logMetacat.debug("Locked identifier " + pid.getValue());
157

    
158
          try {
159
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
160
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
161
                  
162
              }
163
              
164
              // did we get it correctly?
165
              if ( systemMetadata == null ) {
166
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
167
                  
168
              }
169

    
170
              // does the request have the most current system metadata?
171
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
172
                 String msg = "The requested system metadata version number " + 
173
                     serialVersion + " differs from the current version at " +
174
                     systemMetadata.getSerialVersion().longValue() +
175
                     ". Please get the latest copy in order to modify it.";
176
                 throw new VersionMismatch("4886", msg);
177
                 
178
              }
179
              
180
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
181
              throw new NotFound("4884", "No record found for: " + pid.getValue());
182
            
183
          }
184
          
185
          // set the new policy
186
          systemMetadata.setReplicationPolicy(policy);
187
          
188
          // update the metadata
189
          try {
190
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
191
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
192
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
193
              notifyReplicaNodes(systemMetadata);
194
              
195
          } catch (RuntimeException e) {
196
              throw new ServiceFailure("4882", e.getMessage());
197
          
198
          }
199
          
200
      } catch (RuntimeException e) {
201
          throw new ServiceFailure("4882", e.getMessage());
202
          
203
      } finally {
204
          lock.unlock();
205
          logMetacat.debug("Unlocked identifier " + pid.getValue());
206
          
207
      }
208
    
209
      return true;
210
  }
211

    
212
  /**
213
   * Deletes the replica from the given Member Node
214
   * NOTE: MN.delete() may be an "archive" operation. TBD.
215
   * @param session
216
   * @param pid
217
   * @param nodeId
218
   * @param serialVersion
219
   * @return
220
   * @throws InvalidToken
221
   * @throws ServiceFailure
222
   * @throws NotAuthorized
223
   * @throws NotFound
224
   * @throws NotImplemented
225
   * @throws VersionMismatch
226
   */
227
  @Override
228
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
229
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
230
	  
231
	  	// The lock to be used for this identifier
232
		Lock lock = null;
233

    
234
		// get the subject
235
		Subject subject = session.getSubject();
236

    
237
		// are we allowed to do this?
238
		boolean isAuthorized = false;
239
		try {
240
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
241
		} catch (InvalidRequest e) {
242
			throw new ServiceFailure("4882", e.getDescription());
243
		}
244
		if (!isAuthorized) {
245
			throw new NotAuthorized("4881", Permission.WRITE
246
					+ " not allowed by " + subject.getValue() + " on "
247
					+ pid.getValue());
248

    
249
		}
250

    
251
		SystemMetadata systemMetadata = null;
252
		try {
253
			lock = HazelcastService.getInstance().getLock(pid.getValue());
254
			lock.lock();
255
			logMetacat.debug("Locked identifier " + pid.getValue());
256

    
257
			try {
258
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
259
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
260
				}
261

    
262
				// did we get it correctly?
263
				if (systemMetadata == null) {
264
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
265
				}
266

    
267
				// does the request have the most current system metadata?
268
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
269
					String msg = "The requested system metadata version number "
270
							+ serialVersion
271
							+ " differs from the current version at "
272
							+ systemMetadata.getSerialVersion().longValue()
273
							+ ". Please get the latest copy in order to modify it.";
274
					throw new VersionMismatch("4886", msg);
275

    
276
				}
277

    
278
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
279
				throw new NotFound("4884", "No record found for: " + pid.getValue());
280

    
281
			}
282
			  
283
			// check permissions
284
			// TODO: is this necessary?
285
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
286
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
287
			if (!isAllowed) {
288
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
289
			}
290
			  
291
			// delete the replica from the given node
292
			// CSJ: use CN.delete() to truly delete a replica, semantically
293
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
294
			//D1Client.getMN(nodeId).delete(session, pid);
295
			  
296
			// reflect that change in the system metadata
297
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
298
			for (Replica r: systemMetadata.getReplicaList()) {
299
				  if (r.getReplicaMemberNode().equals(nodeId)) {
300
					  updatedReplicas.remove(r);
301
					  break;
302
				  }
303
			}
304
			systemMetadata.setReplicaList(updatedReplicas);
305

    
306
			// update the metadata
307
			try {
308
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
309
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
310
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
311
			} catch (RuntimeException e) {
312
				throw new ServiceFailure("4882", e.getMessage());
313
			}
314

    
315
		} catch (RuntimeException e) {
316
			throw new ServiceFailure("4882", e.getMessage());
317
		} finally {
318
			lock.unlock();
319
			logMetacat.debug("Unlocked identifier " + pid.getValue());
320
		}
321

    
322
		return true;	  
323
	  
324
  }
325
  
326
  /**
327
   * Deletes an object from the Coordinating Node, where the object is a 
328
   * a science metadata object.
329
   * 
330
   * @param session - the Session object containing the credentials for the Subject
331
   * @param pid - The object identifier to be deleted
332
   * 
333
   * @return pid - the identifier of the object used for the deletion
334
   * 
335
   * @throws InvalidToken
336
   * @throws ServiceFailure
337
   * @throws NotAuthorized
338
   * @throws NotFound
339
   * @throws NotImplemented
340
   * @throws InvalidRequest
341
   */
342
  @Override
343
  public Identifier delete(Session session, Identifier pid) 
344
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
345

    
346
	  // check that it is CN/admin
347
	  boolean allowed = isAdminAuthorized(session);
348
	  
349
	  // additional check if it is the authoritative node if it is not the admin
350
      if(!allowed) {
351
          allowed = isAuthoritativeMNodeAdmin(session, pid);
352
      }
353
	  
354
	  if (!allowed) {
355
		  String msg = "The subject is not allowed to call delete() on a Coordinating Node.";
356
		  logMetacat.info(msg);
357
		  throw new NotAuthorized("1320", msg);
358
	  }
359
	  
360
	  // defer to superclass implementation
361
	  Identifier retId =  super.delete(session, pid);
362
      
363
	  // notify the replicas
364
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
365
	  if (systemMetadata.getReplicaList() != null) {
366
		  for (Replica replica: systemMetadata.getReplicaList()) {
367
			  NodeReference replicaNode = replica.getReplicaMemberNode();
368
			  try {
369
				  Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
370
			  } catch (Exception e) {
371
				  // all we can really do is log errors and carry on with life
372
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
373
			}
374
			  
375
		  }
376
	  }
377
	  
378
	  return retId;
379
      
380
  }
381
  
382
  /**
383
   * Deletes an object from the Coordinating Node, where the object is a 
384
   * a science metadata object.
385
   * 
386
   * @param session - the Session object containing the credentials for the Subject
387
   * @param pid - The object identifier to be deleted
388
   * 
389
   * @return pid - the identifier of the object used for the deletion
390
   * 
391
   * @throws InvalidToken
392
   * @throws ServiceFailure
393
   * @throws NotAuthorized
394
   * @throws NotFound
395
   * @throws NotImplemented
396
   * @throws InvalidRequest
397
   */
398
  @Override
399
  public Identifier archive(Session session, Identifier pid) 
400
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
401

    
402
	  // check that it is CN/admin
403
	  boolean allowed = isAdminAuthorized(session);
404
	  
405
	  //check if it is the authoritative member node
406
	  if(!allowed) {
407
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
408
	  }
409
	  
410
	  if (!allowed) {
411
		  String msg = "The subject is not allowed to call archive() on a Coordinating Node.";
412
		  logMetacat.info(msg);
413
		  throw new NotAuthorized("1320", msg);
414
	  }
415
	  
416
	  // defer to superclass implementation
417
	  Identifier retId =  super.archive(session, pid);
418
      
419
	  // notify the replicas
420
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
421
	  if (systemMetadata.getReplicaList() != null) {
422
		  for (Replica replica: systemMetadata.getReplicaList()) {
423
			  NodeReference replicaNode = replica.getReplicaMemberNode();
424
			  try {
425
				  // TODO: implement in the clients
426
				  //Identifier mnRetId = D1Client.getMN(replicaNode).archive(null, pid);
427
			  } catch (Exception e) {
428
				  // all we can really do is log errors and carry on with life
429
				  logMetacat.error("Error archiving pid: " +  pid.getValue() + " from replica MN: " + replicaNode.getValue(), e);
430
			}
431
			  
432
		  }
433
	  }
434
	  
435
	  return retId;
436
      
437
  }
438
  
439
  /**
440
   * Set the obsoletedBy attribute in System Metadata
441
   * @param session
442
   * @param pid
443
   * @param obsoletedByPid
444
   * @param serialVersion
445
   * @return
446
   * @throws NotImplemented
447
   * @throws NotFound
448
   * @throws NotAuthorized
449
   * @throws ServiceFailure
450
   * @throws InvalidRequest
451
   * @throws InvalidToken
452
   * @throws VersionMismatch
453
   */
454
  @Override
455
  public boolean setObsoletedBy(Session session, Identifier pid,
456
			Identifier obsoletedByPid, long serialVersion)
457
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
458
			InvalidRequest, InvalidToken, VersionMismatch {
459

    
460
		// The lock to be used for this identifier
461
		Lock lock = null;
462

    
463
		// get the subject
464
		Subject subject = session.getSubject();
465

    
466
		// are we allowed to do this?
467
		if (!isAuthorized(session, pid, Permission.WRITE)) {
468
			throw new NotAuthorized("4881", Permission.WRITE
469
					+ " not allowed by " + subject.getValue() + " on "
470
					+ pid.getValue());
471

    
472
		}
473

    
474

    
475
		SystemMetadata systemMetadata = null;
476
		try {
477
			lock = HazelcastService.getInstance().getLock(pid.getValue());
478
			lock.lock();
479
			logMetacat.debug("Locked identifier " + pid.getValue());
480

    
481
			try {
482
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
483
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
484
				}
485

    
486
				// did we get it correctly?
487
				if (systemMetadata == null) {
488
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
489
				}
490

    
491
				// does the request have the most current system metadata?
492
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
493
					String msg = "The requested system metadata version number "
494
							+ serialVersion
495
							+ " differs from the current version at "
496
							+ systemMetadata.getSerialVersion().longValue()
497
							+ ". Please get the latest copy in order to modify it.";
498
					throw new VersionMismatch("4886", msg);
499

    
500
				}
501

    
502
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
503
				throw new NotFound("4884", "No record found for: " + pid.getValue());
504

    
505
			}
506

    
507
			// set the new policy
508
			systemMetadata.setObsoletedBy(obsoletedByPid);
509

    
510
			// update the metadata
511
			try {
512
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
513
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
514
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
515
			} catch (RuntimeException e) {
516
				throw new ServiceFailure("4882", e.getMessage());
517
			}
518

    
519
		} catch (RuntimeException e) {
520
			throw new ServiceFailure("4882", e.getMessage());
521
		} finally {
522
			lock.unlock();
523
			logMetacat.debug("Unlocked identifier " + pid.getValue());
524
		}
525

    
526
		return true;
527
	}
528
  
529
  
530
  /**
531
   * Set the replication status for an object given the object identifier
532
   * 
533
   * @param session - the Session object containing the credentials for the Subject
534
   * @param pid - the object identifier for the given object
535
   * @param status - the replication status to be applied
536
   * 
537
   * @return true or false
538
   * 
539
   * @throws NotImplemented
540
   * @throws NotAuthorized
541
   * @throws ServiceFailure
542
   * @throws InvalidRequest
543
   * @throws InvalidToken
544
   * @throws NotFound
545
   * 
546
   */
547
  @Override
548
  public boolean setReplicationStatus(Session session, Identifier pid,
549
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
550
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
551
      InvalidRequest, NotFound {
552
	  
553
	  // cannot be called by public
554
	  if (session == null) {
555
		  throw new NotAuthorized("4720", "Session cannot be null");
556
	  }
557
      
558
      // The lock to be used for this identifier
559
      Lock lock = null;
560
      
561
      boolean allowed = false;
562
      int replicaEntryIndex = -1;
563
      List<Replica> replicas = null;
564
      // get the subject
565
      Subject subject = session.getSubject();
566
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
567
          " is " + status.toString());
568
      
569
      SystemMetadata systemMetadata = null;
570

    
571
      try {
572
          lock = HazelcastService.getInstance().getLock(pid.getValue());
573
          lock.lock();
574
          logMetacat.debug("Locked identifier " + pid.getValue());
575

    
576
          try {      
577
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
578

    
579
              // did we get it correctly?
580
              if ( systemMetadata == null ) {
581
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
582
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
583
                  
584
              }
585
              replicas = systemMetadata.getReplicaList();
586
              int count = 0;
587
              
588
              // was there a failure? log it
589
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
590
                 String msg = "The replication request of the object identified by " + 
591
                     pid.getValue() + " failed.  The error message was " +
592
                     failure.getMessage() + ".";
593
              }
594
              
595
              if (replicas.size() > 0 && replicas != null) {
596
                  // find the target replica index in the replica list
597
                  for (Replica replica : replicas) {
598
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
599
                      String targetNodeStr = targetNode.getValue();
600
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
601
                  
602
                      if (replicaNodeStr.equals(targetNodeStr)) {
603
                          replicaEntryIndex = count;
604
                          logMetacat.debug("replica entry index is: "
605
                                  + replicaEntryIndex);
606
                          break;
607
                      }
608
                      count++;
609
                  
610
                  }
611
              }
612
              // are we allowed to do this? only CNs and target MNs are allowed
613
              CNode cn = D1Client.getCN();
614
              List<Node> nodes = cn.listNodes().getNodeList();
615
              
616
              // find the node in the node list
617
              for ( Node node : nodes ) {
618
                  
619
                  NodeReference nodeReference = node.getIdentifier();
620
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
621
                      nodeReference.getValue());
622
                  
623
                  // allow target MN certs
624
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
625
                      node.getType().equals(NodeType.MN)) {
626
                      List<Subject> nodeSubjects = node.getSubjectList();
627
                      
628
                      // check if the session subject is in the node subject list
629
                      for (Subject nodeSubject : nodeSubjects) {
630
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
631
                                  nodeSubject.getValue() + " and " + subject.getValue());
632
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
633
                              
634
                              // lastly limit to COMPLETED, INVALIDATED,
635
                              // and FAILED status updates from MNs only
636
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
637
                                   status.equals(ReplicationStatus.INVALIDATED) ||
638
                                   status.equals(ReplicationStatus.FAILED)) {
639
                                  allowed = true;
640
                                  break;
641
                                  
642
                              }                              
643
                          }
644
                      }                 
645
                  }
646
              }
647

    
648
              if ( !allowed ) {
649
                  //check for CN admin access
650
                  allowed = isAuthorized(session, pid, Permission.WRITE);
651
                  
652
              }              
653
              
654
              if ( !allowed ) {
655
                  String msg = "The subject identified by "
656
                          + subject.getValue()
657
                          + " does not have permission to set the replication status for "
658
                          + "the replica identified by "
659
                          + targetNode.getValue() + ".";
660
                  logMetacat.info(msg);
661
                  throw new NotAuthorized("4720", msg);
662
                  
663
              }
664

    
665
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
666
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
667
                " : " + e.getMessage());
668
            
669
          }
670
          
671
          Replica targetReplica = new Replica();
672
          // set the status for the replica
673
          if ( replicaEntryIndex != -1 ) {
674
              targetReplica = replicas.get(replicaEntryIndex);
675
              
676
              // don't allow status to change from COMPLETED to anything other
677
              // than INVALIDATED: prevents overwrites from race conditions
678
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
679
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
680
            	  throw new InvalidRequest("4730", "Status state change from " +
681
            			  targetReplica.getReplicationStatus() + " to " +
682
            			  status.toString() + "is prohibited for identifier " +
683
            			  pid.getValue() + " and target node " + 
684
            			  targetReplica.getReplicaMemberNode().getValue());
685
              }
686
              
687
              targetReplica.setReplicationStatus(status);
688
              
689
              logMetacat.debug("Set the replication status for " + 
690
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
691
                  targetReplica.getReplicationStatus() + " for identifier " +
692
                  pid.getValue());
693
              
694
          } else {
695
              // this is a new entry, create it
696
              targetReplica.setReplicaMemberNode(targetNode);
697
              targetReplica.setReplicationStatus(status);
698
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
699
              replicas.add(targetReplica);
700
              
701
          }
702
          
703
          systemMetadata.setReplicaList(replicas);
704
                
705
          // update the metadata
706
          try {
707
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
708
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
709
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
710

    
711
              if ( !status.equals(ReplicationStatus.QUEUED) && 
712
            	   !status.equals(ReplicationStatus.REQUESTED)) {
713
                  
714
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
715
                          "\tNODE:\t" + targetNode.getValue() + 
716
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
717
                
718
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
719
                          "\tPID:\t"  + pid.getValue() + 
720
                          "\tNODE:\t" + targetNode.getValue() + 
721
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
722
              }
723

    
724
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
725
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
726
                      " on target node " + targetNode + ". The exception was: " +
727
                      failure.getMessage());
728
              }
729
              
730
			  // update the replica nodes about the completed replica when complete
731
              if (status.equals(ReplicationStatus.COMPLETED)) {
732
				broadcastSystemMetadataChange(systemMetadata);
733
			}
734
          
735
          } catch (RuntimeException e) {
736
              throw new ServiceFailure("4700", e.getMessage());
737
          
738
          }
739
          
740
    } catch (RuntimeException e) {
741
        String msg = "There was a RuntimeException getting the lock for " +
742
            pid.getValue();
743
        logMetacat.info(msg);
744
        
745
    } finally {
746
        lock.unlock();
747
        logMetacat.debug("Unlocked identifier " + pid.getValue());
748
        
749
    }
750
      
751
      return true;
752
  }
753
  
754
  /*
755
   * Inform each replica node that system metadata has changed
756
   * 
757
   * @param systemMetadata  the system metadata object with the replica list
758
   */
759
  private void broadcastSystemMetadataChange(SystemMetadata systemMetadata) {
760
	  
761
      CNode cn = null;
762
      NodeList nodeList = new NodeList();
763
      List<Node> nodes = new ArrayList<Node>();
764

    
765
	  List<Replica> replicaList = systemMetadata.getReplicaList();
766

    
767
	  // get the node list so we know the node type
768
	  try {
769
		cn = D1Client.getCN();
770
		nodeList = cn.listNodes();
771
	    nodes = nodeList.getNodeList();
772
		
773
	    // iterate through the replica list and inform each MN of the system metadata change
774
		for (Replica replica : replicaList) {
775
		    NodeReference nodeId = replica.getReplicaMemberNode();
776
		    try {
777
		        for (Node node : nodes) {
778
		      	    if ( node.getIdentifier().equals(nodeId) ) {
779
		      		    if ( node.getType().equals(NodeType.MN) ) {
780
		      		        MNode replicaNode = D1Client.getMN(nodeId);
781
		      		        // call MN.systemMetadataChanged();
782
		      		        replicaNode.systemMetadataChanged(null, 
783
		      		            systemMetadata.getIdentifier(), 
784
		      		            systemMetadata.getSerialVersion().longValue(), 
785
		      		            systemMetadata.getDateSysMetadataModified());
786
		      		        if (logMetacat.isDebugEnabled()) {
787
								logMetacat.debug("Called systemMetadataChanged() for identifier " + 
788
		      		                systemMetadata.getIdentifier().getValue() + 
789
		      		                " for node " + nodeId.getValue());
790
							}
791
		      		    }
792
		      	    }
793
		        }
794
		        
795
		    } catch (BaseException e) {
796
			    logMetacat.error("Couldn't contact " + nodeId.getValue() + 
797
		            " to inform it of the system metadata change for identifier " + 
798
			  	  systemMetadata.getIdentifier().getValue());
799
		    }     
800
		}
801

    
802
	  } catch (BaseException e1) {
803
		  logMetacat.error("Couldn't get the node list from the CN to broadcast the system " +
804
				    "metadata change for identifier " + systemMetadata.getIdentifier().getValue());
805
	  }
806
	  
807
	  
808
}
809

    
810
/**
811
   * Return the checksum of the object given the identifier 
812
   * 
813
   * @param session - the Session object containing the credentials for the Subject
814
   * @param pid - the object identifier for the given object
815
   * 
816
   * @return checksum - the checksum of the object
817
   * 
818
   * @throws InvalidToken
819
   * @throws ServiceFailure
820
   * @throws NotAuthorized
821
   * @throws NotFound
822
   * @throws NotImplemented
823
   */
824
  @Override
825
  public Checksum getChecksum(Session session, Identifier pid)
826
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
827
    NotImplemented {
828
    
829
	boolean isAuthorized = false;
830
	try {
831
		isAuthorized = isAuthorized(session, pid, Permission.READ);
832
	} catch (InvalidRequest e) {
833
		throw new ServiceFailure("1410", e.getDescription());
834
	}  
835
    if (!isAuthorized) {
836
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
837
    }
838
    
839
    SystemMetadata systemMetadata = null;
840
    Checksum checksum = null;
841
    
842
    try {
843
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
844

    
845
        if (systemMetadata == null ) {
846
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
847
        }
848
        checksum = systemMetadata.getChecksum();
849
        
850
    } catch (RuntimeException e) {
851
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
852
            pid.getValue() + ". The error message was: " + e.getMessage());
853
      
854
    }
855
    
856
    return checksum;
857
  }
858

    
859
  /**
860
   * Resolve the location of a given object
861
   * 
862
   * @param session - the Session object containing the credentials for the Subject
863
   * @param pid - the object identifier for the given object
864
   * 
865
   * @return objectLocationList - the list of nodes known to contain the object
866
   * 
867
   * @throws InvalidToken
868
   * @throws ServiceFailure
869
   * @throws NotAuthorized
870
   * @throws NotFound
871
   * @throws NotImplemented
872
   */
873
  @Override
874
  public ObjectLocationList resolve(Session session, Identifier pid)
875
    throws InvalidToken, ServiceFailure, NotAuthorized,
876
    NotFound, NotImplemented {
877

    
878
    throw new NotImplemented("4131", "resolve not implemented");
879

    
880
  }
881

    
882
  /**
883
   * Metacat does not implement this method at the CN level
884
   */
885
  @Override
886
  public ObjectList search(Session session, String queryType, String query)
887
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
888
    NotImplemented {
889

    
890
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
891
	  
892
//    ObjectList objectList = null;
893
//    try {
894
//        objectList = 
895
//          IdentifierManager.getInstance().querySystemMetadata(
896
//              null, //startTime, 
897
//              null, //endTime,
898
//              null, //objectFormat, 
899
//              false, //replicaStatus, 
900
//              0, //start, 
901
//              1000 //count
902
//              );
903
//        
904
//    } catch (Exception e) {
905
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
906
//    }
907
//
908
//      return objectList;
909
		  
910
  }
911
  
912
  /**
913
   * Returns the object format registered in the DataONE Object Format 
914
   * Vocabulary for the given format identifier
915
   * 
916
   * @param fmtid - the identifier of the format requested
917
   * 
918
   * @return objectFormat - the object format requested
919
   * 
920
   * @throws ServiceFailure
921
   * @throws NotFound
922
   * @throws InsufficientResources
923
   * @throws NotImplemented
924
   */
925
  @Override
926
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
927
    throws ServiceFailure, NotFound, NotImplemented {
928
     
929
      return ObjectFormatService.getInstance().getFormat(fmtid);
930
      
931
  }
932

    
933
  /**
934
   * Returns a list of all object formats registered in the DataONE Object 
935
   * Format Vocabulary
936
    * 
937
   * @return objectFormatList - The list of object formats registered in 
938
   *                            the DataONE Object Format Vocabulary
939
   * 
940
   * @throws ServiceFailure
941
   * @throws NotImplemented
942
   * @throws InsufficientResources
943
   */
944
  @Override
945
  public ObjectFormatList listFormats() 
946
    throws ServiceFailure, NotImplemented {
947

    
948
    return ObjectFormatService.getInstance().listFormats();
949
  }
950

    
951
  /**
952
   * Returns a list of nodes that have been registered with the DataONE infrastructure
953
    * 
954
   * @return nodeList - List of nodes from the registry
955
   * 
956
   * @throws ServiceFailure
957
   * @throws NotImplemented
958
   */
959
  @Override
960
  public NodeList listNodes() 
961
    throws NotImplemented, ServiceFailure {
962

    
963
    throw new NotImplemented("4800", "listNodes not implemented");
964
  }
965

    
966
  /**
967
   * Provides a mechanism for adding system metadata independently of its 
968
   * associated object, such as when adding system metadata for data objects.
969
    * 
970
   * @param session - the Session object containing the credentials for the Subject
971
   * @param pid - The identifier of the object to register the system metadata against
972
   * @param sysmeta - The system metadata to be registered
973
   * 
974
   * @return true if the registration succeeds
975
   * 
976
   * @throws NotImplemented
977
   * @throws NotAuthorized
978
   * @throws ServiceFailure
979
   * @throws InvalidRequest
980
   * @throws InvalidSystemMetadata
981
   */
982
  @Override
983
  public Identifier registerSystemMetadata(Session session, Identifier pid,
984
      SystemMetadata sysmeta) 
985
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
986
      InvalidSystemMetadata {
987

    
988
      // The lock to be used for this identifier
989
      Lock lock = null;
990

    
991
      // TODO: control who can call this?
992
      if (session == null) {
993
          //TODO: many of the thrown exceptions do not use the correct error codes
994
          //check these against the docs and correct them
995
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
996
                  "  If you are not logged in, please do so and retry the request.");
997
      }
998
      
999
      // verify that guid == SystemMetadata.getIdentifier()
1000
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1001
          "|" + sysmeta.getIdentifier().getValue());
1002
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1003
          throw new InvalidRequest("4863", 
1004
              "The identifier in method call (" + pid.getValue() + 
1005
              ") does not match identifier in system metadata (" +
1006
              sysmeta.getIdentifier().getValue() + ").");
1007
      }
1008

    
1009
      try {
1010
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1011
          lock.lock();
1012
          logMetacat.debug("Locked identifier " + pid.getValue());
1013
          logMetacat.debug("Checking if identifier exists...");
1014
          // Check that the identifier does not already exist
1015
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1016
              throw new InvalidRequest("4863", 
1017
                  "The identifier is already in use by an existing object.");
1018
          
1019
          }
1020
          
1021
          // insert the system metadata into the object store
1022
          logMetacat.debug("Starting to insert SystemMetadata...");
1023
          try {
1024
              sysmeta.setSerialVersion(BigInteger.ONE);
1025
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1026
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1027
              
1028
          } catch (RuntimeException e) {
1029
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1030
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1031
                  e.getClass() + ": " + e.getMessage());
1032
              
1033
          }
1034
          
1035
      } catch (RuntimeException e) {
1036
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1037
                  e.getClass() + ": " + e.getMessage());
1038
          
1039
      }  finally {
1040
          lock.unlock();
1041
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1042
          
1043
      }
1044

    
1045
      
1046
      logMetacat.debug("Returning from registerSystemMetadata");
1047
      EventLog.getInstance().log(request.getRemoteAddr(), 
1048
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1049
          pid.getValue(), "registerSystemMetadata");
1050
      return pid;
1051
  }
1052
  
1053
  /**
1054
   * Given an optional scope and format, reserves and returns an identifier 
1055
   * within that scope and format that is unique and will not be 
1056
   * used by any other sessions. 
1057
    * 
1058
   * @param session - the Session object containing the credentials for the Subject
1059
   * @param pid - The identifier of the object to register the system metadata against
1060
   * @param scope - An optional string to be used to qualify the scope of 
1061
   *                the identifier namespace, which is applied differently 
1062
   *                depending on the format requested. If scope is not 
1063
   *                supplied, a default scope will be used.
1064
   * @param format - The optional name of the identifier format to be used, 
1065
   *                  drawn from a DataONE-specific vocabulary of identifier 
1066
   *                 format names, including several common syntaxes such 
1067
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1068
   *                 format is not supplied by the caller, the CN service 
1069
   *                 will use a default identifier format, which may change 
1070
   *                 over time.
1071
   * 
1072
   * @return true if the registration succeeds
1073
   * 
1074
   * @throws InvalidToken
1075
   * @throws ServiceFailure
1076
   * @throws NotAuthorized
1077
   * @throws IdentifierNotUnique
1078
   * @throws NotImplemented
1079
   */
1080
  @Override
1081
  public Identifier reserveIdentifier(Session session, Identifier pid)
1082
  throws InvalidToken, ServiceFailure,
1083
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1084

    
1085
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1086
  }
1087
  
1088
  @Override
1089
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1090
  throws InvalidToken, ServiceFailure,
1091
        NotAuthorized, NotImplemented, InvalidRequest {
1092
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1093
  }
1094
  
1095
  /**
1096
    * Checks whether the pid is reserved by the subject in the session param
1097
    * If the reservation is held on the pid by the subject, we return true.
1098
    * 
1099
   * @param session - the Session object containing the Subject
1100
   * @param pid - The identifier to check
1101
   * 
1102
   * @return true if the reservation exists for the subject/pid
1103
   * 
1104
   * @throws InvalidToken
1105
   * @throws ServiceFailure
1106
   * @throws NotFound - when the pid is not found (in use or in reservation)
1107
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1108
   * @throws IdentifierNotUnique - when the pid is in use
1109
   * @throws NotImplemented
1110
   */
1111

    
1112
  @Override
1113
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1114
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1115
      NotImplemented, InvalidRequest {
1116
  
1117
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1118
  }
1119

    
1120
  /**
1121
   * Changes ownership (RightsHolder) of the specified object to the 
1122
   * subject specified by userId
1123
    * 
1124
   * @param session - the Session object containing the credentials for the Subject
1125
   * @param pid - Identifier of the object to be modified
1126
   * @param userId - The subject that will be taking ownership of the specified object.
1127
   *
1128
   * @return pid - the identifier of the modified object
1129
   * 
1130
   * @throws ServiceFailure
1131
   * @throws InvalidToken
1132
   * @throws NotFound
1133
   * @throws NotAuthorized
1134
   * @throws NotImplemented
1135
   * @throws InvalidRequest
1136
   */  
1137
  @Override
1138
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1139
      long serialVersion)
1140
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1141
      NotImplemented, InvalidRequest, VersionMismatch {
1142
      
1143
      // The lock to be used for this identifier
1144
      Lock lock = null;
1145

    
1146
      // get the subject
1147
      Subject subject = session.getSubject();
1148
      
1149
      // are we allowed to do this?
1150
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1151
          throw new NotAuthorized("4440", "not allowed by "
1152
                  + subject.getValue() + " on " + pid.getValue());
1153
          
1154
      }
1155
      
1156
      SystemMetadata systemMetadata = null;
1157
      try {
1158
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1159
          lock.lock();
1160
          logMetacat.debug("Locked identifier " + pid.getValue());
1161

    
1162
          try {
1163
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1164
              
1165
              // does the request have the most current system metadata?
1166
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1167
                 String msg = "The requested system metadata version number " + 
1168
                     serialVersion + " differs from the current version at " +
1169
                     systemMetadata.getSerialVersion().longValue() +
1170
                     ". Please get the latest copy in order to modify it.";
1171
                 throw new VersionMismatch("4443", msg);
1172
              }
1173
              
1174
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1175
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1176
              
1177
          }
1178
              
1179
          // set the new rights holder
1180
          systemMetadata.setRightsHolder(userId);
1181
          
1182
          // update the metadata
1183
          try {
1184
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1185
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1186
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1187
              notifyReplicaNodes(systemMetadata);
1188
              
1189
          } catch (RuntimeException e) {
1190
              throw new ServiceFailure("4490", e.getMessage());
1191
          
1192
          }
1193
          
1194
      } catch (RuntimeException e) {
1195
          throw new ServiceFailure("4490", e.getMessage());
1196
          
1197
      } finally {
1198
          lock.unlock();
1199
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1200
      
1201
      }
1202
      
1203
      return pid;
1204
  }
1205

    
1206
  /**
1207
   * Verify that a replication task is authorized by comparing the target node's
1208
   * Subject (from the X.509 certificate-derived Session) with the list of 
1209
   * subjects in the known, pending replication tasks map.
1210
   * 
1211
   * @param originatingNodeSession - Session information that contains the 
1212
   *                                 identity of the calling user
1213
   * @param targetNodeSubject - Subject identifying the target node
1214
   * @param pid - the identifier of the object to be replicated
1215
   * @param replicatePermission - the execute permission to be granted
1216
   * 
1217
   * @throws ServiceFailure
1218
   * @throws NotImplemented
1219
   * @throws InvalidToken
1220
   * @throws NotAuthorized
1221
   * @throws InvalidRequest
1222
   * @throws NotFound
1223
   */
1224
  @Override
1225
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1226
    Subject targetNodeSubject, Identifier pid) 
1227
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1228
    NotFound, InvalidRequest {
1229
    
1230
    boolean isAllowed = false;
1231
    SystemMetadata sysmeta = null;
1232
    NodeReference targetNode = null;
1233
    
1234
    try {
1235
      // get the target node reference from the nodes list
1236
      CNode cn = D1Client.getCN();
1237
      List<Node> nodes = cn.listNodes().getNodeList();
1238
      
1239
      if ( nodes != null ) {
1240
        for (Node node : nodes) {
1241
            
1242
        	if (node.getSubjectList() != null) {
1243
        		
1244
	            for (Subject nodeSubject : node.getSubjectList()) {
1245
	            	
1246
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1247
	                    targetNode = node.getIdentifier();
1248
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1249
	                    break;
1250
	                }
1251
	            }
1252
        	}
1253
            
1254
            if ( targetNode != null) { break; }
1255
        }
1256
        
1257
      } else {
1258
          String msg = "Couldn't get the node list from the CN";
1259
          logMetacat.debug(msg);
1260
          throw new ServiceFailure("4872", msg);
1261
          
1262
      }
1263
      
1264
      // can't find a node listed with the given subject
1265
      if ( targetNode == null ) {
1266
          String msg = "There is no Member Node registered with a node subject " +
1267
              "matching " + targetNodeSubject.getValue();
1268
          logMetacat.info(msg);
1269
          throw new NotAuthorized("4871", msg);
1270
          
1271
      }
1272
      
1273
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1274
      
1275
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1276

    
1277
      if ( sysmeta != null ) {
1278
          
1279
          List<Replica> replicaList = sysmeta.getReplicaList();
1280
          
1281
          if ( replicaList != null ) {
1282
              
1283
              // find the replica with the status set to 'requested'
1284
              for (Replica replica : replicaList) {
1285
                  ReplicationStatus status = replica.getReplicationStatus();
1286
                  NodeReference listedNode = replica.getReplicaMemberNode();
1287
                  if ( listedNode != null && targetNode != null ) {
1288
                      logMetacat.debug("Comparing " + listedNode.getValue()
1289
                              + " to " + targetNode.getValue());
1290
                      
1291
                      if (listedNode.getValue().equals(targetNode.getValue())
1292
                              && status.equals(ReplicationStatus.REQUESTED)) {
1293
                          isAllowed = true;
1294
                          break;
1295

    
1296
                      }
1297
                  }
1298
              }
1299
          }
1300
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1301
              "to replicate: " + isAllowed + " for " + pid.getValue());
1302

    
1303
          
1304
      } else {
1305
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1306
          " is null.");          
1307
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1308
          
1309
      }
1310

    
1311
    } catch (RuntimeException e) {
1312
    	  ServiceFailure sf = new ServiceFailure("4872", 
1313
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1314
                e.getMessage());
1315
    	  sf.initCause(e);
1316
        throw sf;
1317
        
1318
    }
1319
      
1320
    return isAllowed;
1321
    
1322
  }
1323

    
1324
  /**
1325
   * Adds a new object to the Node, where the object is a science metadata object.
1326
   * 
1327
   * @param session - the Session object containing the credentials for the Subject
1328
   * @param pid - The object identifier to be created
1329
   * @param object - the object bytes
1330
   * @param sysmeta - the system metadata that describes the object  
1331
   * 
1332
   * @return pid - the object identifier created
1333
   * 
1334
   * @throws InvalidToken
1335
   * @throws ServiceFailure
1336
   * @throws NotAuthorized
1337
   * @throws IdentifierNotUnique
1338
   * @throws UnsupportedType
1339
   * @throws InsufficientResources
1340
   * @throws InvalidSystemMetadata
1341
   * @throws NotImplemented
1342
   * @throws InvalidRequest
1343
   */
1344
  public Identifier create(Session session, Identifier pid, InputStream object,
1345
    SystemMetadata sysmeta) 
1346
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1347
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1348
    NotImplemented, InvalidRequest {
1349
                  
1350
      // The lock to be used for this identifier
1351
      Lock lock = null;
1352

    
1353
      try {
1354
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1355
          // are we allowed?
1356
          boolean isAllowed = false;
1357
          isAllowed = isAdminAuthorized(session);
1358
          
1359
          // additional check if it is the authoritative node if it is not the admin
1360
          if(!isAllowed) {
1361
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1362
          }
1363

    
1364
          // proceed if we're called by a CN
1365
          if ( isAllowed ) {
1366
              // create the coordinating node version of the document      
1367
              lock.lock();
1368
              logMetacat.debug("Locked identifier " + pid.getValue());
1369
              sysmeta.setSerialVersion(BigInteger.ONE);
1370
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1371
              sysmeta.setArchived(false); // this is a create op, not update
1372
              
1373
              // the CN should have set the origin and authoritative member node fields
1374
              try {
1375
                  sysmeta.getOriginMemberNode().getValue();
1376
                  sysmeta.getAuthoritativeMemberNode().getValue();
1377
                  
1378
              } catch (NullPointerException npe) {
1379
                  throw new InvalidSystemMetadata("4896", 
1380
                      "Both the origin and authoritative member node identifiers need to be set.");
1381
                  
1382
              }
1383
              pid = super.create(session, pid, object, sysmeta);
1384

    
1385
          } else {
1386
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1387
                  " isn't allowed to call create() on a Coordinating Node.";
1388
              logMetacat.info(msg);
1389
              throw new NotAuthorized("1100", msg);
1390
          }
1391
          
1392
      } catch (RuntimeException e) {
1393
          // Convert Hazelcast runtime exceptions to service failures
1394
          String msg = "There was a problem creating the object identified by " +
1395
              pid.getValue() + ". There error message was: " + e.getMessage();
1396
          throw new ServiceFailure("4893", msg);
1397
          
1398
      } finally {
1399
    	  if (lock != null) {
1400
	          lock.unlock();
1401
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1402
    	  }
1403
      }
1404
      
1405
      return pid;
1406

    
1407
  }
1408

    
1409
  /**
1410
   * Set access for a given object using the object identifier and a Subject
1411
   * under a given Session.
1412
   * 
1413
   * @param session - the Session object containing the credentials for the Subject
1414
   * @param pid - the object identifier for the given object to apply the policy
1415
   * @param policy - the access policy to be applied
1416
   * 
1417
   * @return true if the application of the policy succeeds
1418
   * @throws InvalidToken
1419
   * @throws ServiceFailure
1420
   * @throws NotFound
1421
   * @throws NotAuthorized
1422
   * @throws NotImplemented
1423
   * @throws InvalidRequest
1424
   */
1425
  public boolean setAccessPolicy(Session session, Identifier pid, 
1426
      AccessPolicy accessPolicy, long serialVersion) 
1427
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1428
      NotImplemented, InvalidRequest, VersionMismatch {
1429
      
1430
      // The lock to be used for this identifier
1431
      Lock lock = null;
1432
      SystemMetadata systemMetadata = null;
1433
      
1434
      boolean success = false;
1435
      
1436
      // get the subject
1437
      Subject subject = session.getSubject();
1438
      
1439
      // are we allowed to do this?
1440
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1441
          throw new NotAuthorized("4420", "not allowed by "
1442
                  + subject.getValue() + " on " + pid.getValue());
1443
      }
1444
      
1445
      try {
1446
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1447
          lock.lock();
1448
          logMetacat.debug("Locked identifier " + pid.getValue());
1449

    
1450
          try {
1451
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1452

    
1453
              if ( systemMetadata == null ) {
1454
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1455
                  
1456
              }
1457
              // does the request have the most current system metadata?
1458
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1459
                 String msg = "The requested system metadata version number " + 
1460
                     serialVersion + " differs from the current version at " +
1461
                     systemMetadata.getSerialVersion().longValue() +
1462
                     ". Please get the latest copy in order to modify it.";
1463
                 throw new VersionMismatch("4402", msg);
1464
                 
1465
              }
1466
              
1467
          } catch (RuntimeException e) {
1468
              // convert Hazelcast RuntimeException to NotFound
1469
              throw new NotFound("4400", "No record found for: " + pid);
1470
            
1471
          }
1472
              
1473
          // set the access policy
1474
          systemMetadata.setAccessPolicy(accessPolicy);
1475
          
1476
          // update the system metadata
1477
          try {
1478
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1479
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1480
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1481
              notifyReplicaNodes(systemMetadata);
1482
              
1483
          } catch (RuntimeException e) {
1484
              // convert Hazelcast RuntimeException to ServiceFailure
1485
              throw new ServiceFailure("4430", e.getMessage());
1486
            
1487
          }
1488
          
1489
      } catch (RuntimeException e) {
1490
          throw new ServiceFailure("4430", e.getMessage());
1491
          
1492
      } finally {
1493
          lock.unlock();
1494
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1495
        
1496
      }
1497

    
1498
    
1499
    // TODO: how do we know if the map was persisted?
1500
    success = true;
1501
    
1502
    return success;
1503
  }
1504

    
1505
  /**
1506
   * Full replacement of replication metadata in the system metadata for the 
1507
   * specified object, changes date system metadata modified
1508
   * 
1509
   * @param session - the Session object containing the credentials for the Subject
1510
   * @param pid - the object identifier for the given object to apply the policy
1511
   * @param replica - the replica to be updated
1512
   * @return
1513
   * @throws NotImplemented
1514
   * @throws NotAuthorized
1515
   * @throws ServiceFailure
1516
   * @throws InvalidRequest
1517
   * @throws NotFound
1518
   * @throws VersionMismatch
1519
   */
1520
  @Override
1521
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1522
      Replica replica, long serialVersion) 
1523
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1524
      NotFound, VersionMismatch {
1525
      
1526
      // The lock to be used for this identifier
1527
      Lock lock = null;
1528
      
1529
      // get the subject
1530
      Subject subject = session.getSubject();
1531
      
1532
      // are we allowed to do this?
1533
      try {
1534

    
1535
          // what is the controlling permission?
1536
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1537
              throw new NotAuthorized("4851", "not allowed by "
1538
                      + subject.getValue() + " on " + pid.getValue());
1539
          }
1540

    
1541
        
1542
      } catch (InvalidToken e) {
1543
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1544
                  " on " + pid.getValue());  
1545
          
1546
      }
1547

    
1548
      SystemMetadata systemMetadata = null;
1549
      try {
1550
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1551
          lock.lock();
1552
          logMetacat.debug("Locked identifier " + pid.getValue());
1553

    
1554
          try {      
1555
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1556

    
1557
              // does the request have the most current system metadata?
1558
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1559
                 String msg = "The requested system metadata version number " + 
1560
                     serialVersion + " differs from the current version at " +
1561
                     systemMetadata.getSerialVersion().longValue() +
1562
                     ". Please get the latest copy in order to modify it.";
1563
                 throw new VersionMismatch("4855", msg);
1564
              }
1565
              
1566
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1567
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1568
                  " : " + e.getMessage());
1569
            
1570
          }
1571
              
1572
          // set the status for the replica
1573
          List<Replica> replicas = systemMetadata.getReplicaList();
1574
          NodeReference replicaNode = replica.getReplicaMemberNode();
1575
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1576
          int index = 0;
1577
          for (Replica listedReplica: replicas) {
1578
              
1579
              // remove the replica that we are replacing
1580
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1581
                      // don't allow status to change from COMPLETED to anything other
1582
                      // than INVALIDATED: prevents overwrites from race conditions
1583
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1584
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1585
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1586
                	  throw new InvalidRequest("4853", "Status state change from " +
1587
                			  listedReplica.getReplicationStatus() + " to " +
1588
                			  replicaStatus.toString() + "is prohibited for identifier " +
1589
                			  pid.getValue() + " and target node " + 
1590
                			  listedReplica.getReplicaMemberNode().getValue());
1591

    
1592
            	  }
1593
                  replicas.remove(index);
1594
                  break;
1595
                  
1596
              }
1597
              index++;
1598
          }
1599
          
1600
          // add the new replica item
1601
          replicas.add(replica);
1602
          systemMetadata.setReplicaList(replicas);
1603
          
1604
          // update the metadata
1605
          try {
1606
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1607
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1608
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1609
              
1610
              // inform replica nodes of the change if the status is complete
1611
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1612
            	  broadcastSystemMetadataChange(systemMetadata);
1613
            	  
1614
              }
1615
          } catch (RuntimeException e) {
1616
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1617
              throw new ServiceFailure("4852", e.getMessage());
1618
          
1619
          }
1620
          
1621
      } catch (RuntimeException e) {
1622
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1623
          throw new ServiceFailure("4852", e.getMessage());
1624
      
1625
      } finally {
1626
          lock.unlock();
1627
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1628
          
1629
      }
1630
    
1631
      return true;
1632
      
1633
  }
1634
  
1635
  /**
1636
   * 
1637
   */
1638
  @Override
1639
  public ObjectList listObjects(Session session, Date startTime, 
1640
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1641
      Integer start, Integer count)
1642
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1643
      ServiceFailure {
1644
      
1645
      ObjectList objectList = null;
1646
        try {
1647
        	if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
1648
            	count = MAXIMUM_DB_RECORD_COUNT;
1649
            }
1650
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1651
        } catch (Exception e) {
1652
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1653
        }
1654

    
1655
        return objectList;
1656
  }
1657

    
1658
  
1659
 	/**
1660
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1661
 	 * @return cal  the list of checksum algorithms
1662
 	 * 
1663
 	 * @throws ServiceFailure
1664
 	 * @throws NotImplemented
1665
 	 */
1666
  @Override
1667
  public ChecksumAlgorithmList listChecksumAlgorithms()
1668
			throws ServiceFailure, NotImplemented {
1669
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1670
		cal.addAlgorithm("MD5");
1671
		cal.addAlgorithm("SHA-1");
1672
		return cal;
1673
		
1674
	}
1675

    
1676
  /**
1677
   * Notify replica Member Nodes of system metadata changes for a given pid
1678
   * 
1679
   * @param currentSystemMetadata - the up to date system metadata
1680
   */
1681
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1682
      
1683
      Session session = null;
1684
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1685
      MNode mn = null;
1686
      NodeReference replicaNodeRef = null;
1687
      CNode cn = null;
1688
      NodeType nodeType = null;
1689
      List<Node> nodeList = null;
1690
      
1691
      try {
1692
          cn = D1Client.getCN();
1693
          nodeList = cn.listNodes().getNodeList();
1694
          
1695
      } catch (Exception e) { // handle BaseException and other I/O issues
1696
          
1697
          // swallow errors since the call is not critical
1698
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1699
              "to communication issues with the CN: " + e.getMessage());
1700
          
1701
      }
1702
      
1703
      if ( replicaList != null ) {
1704
          
1705
          // iterate through the replicas and inform  MN replica nodes
1706
          for (Replica replica : replicaList) {
1707
              
1708
              replicaNodeRef = replica.getReplicaMemberNode();
1709
              try {
1710
                  if (nodeList != null) {
1711
                      // find the node type
1712
                      for (Node node : nodeList) {
1713
                          if (node.getIdentifier().getValue() == 
1714
                              replicaNodeRef.getValue()) {
1715
                              nodeType = node.getType();
1716
                              break;
1717
              
1718
                          }
1719
                      }
1720
                  }
1721
              
1722
                  // notify only MNs
1723
                  if (nodeType != null && nodeType == NodeType.MN) {
1724
                      mn = D1Client.getMN(replicaNodeRef);
1725
                      mn.systemMetadataChanged(session, 
1726
                          currentSystemMetadata.getIdentifier(), 
1727
                          currentSystemMetadata.getSerialVersion().longValue(),
1728
                          currentSystemMetadata.getDateSysMetadataModified());
1729
                  }
1730
              
1731
              } catch (Exception e) { // handle BaseException and other I/O issues
1732
              
1733
                  // swallow errors since the call is not critical
1734
                  logMetacat.error("Can't inform "
1735
                          + replicaNodeRef.getValue()
1736
                          + " of system metadata changes due "
1737
                          + "to communication issues with the CN: "
1738
                          + e.getMessage());
1739
              
1740
              }
1741
          }
1742
      }
1743
  }
1744

    
1745
	@Override
1746
	public boolean isAuthorized(Identifier pid, Permission permission)
1747
			throws ServiceFailure, InvalidToken, NotFound, NotAuthorized,
1748
			NotImplemented, InvalidRequest {
1749
		
1750
		return isAuthorized(null, pid, permission);
1751
	}
1752
	
1753
	@Override
1754
	public boolean setAccessPolicy(Identifier pid, AccessPolicy accessPolicy, long serialVersion)
1755
			throws InvalidToken, NotFound, NotImplemented, NotAuthorized,
1756
			ServiceFailure, InvalidRequest, VersionMismatch {
1757
		
1758
		return setAccessPolicy(null, pid, accessPolicy, serialVersion);
1759
	}
1760
	
1761
	@Override
1762
	public Identifier setRightsHolder(Identifier pid, Subject userId, long serialVersion)
1763
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1764
			NotImplemented, InvalidRequest, VersionMismatch {
1765
		
1766
		return setRightsHolder(null, pid, userId, serialVersion);
1767
	}
1768
	
1769
	@Override
1770
	public Identifier create(Identifier pid, InputStream object, SystemMetadata sysmeta)
1771
			throws InvalidToken, ServiceFailure, NotAuthorized,
1772
			IdentifierNotUnique, UnsupportedType, InsufficientResources,
1773
			InvalidSystemMetadata, NotImplemented, InvalidRequest {
1774

    
1775
		return create(null, pid, object, sysmeta);
1776
	}
1777
	
1778
	@Override
1779
	public Identifier delete(Identifier pid) throws InvalidToken, ServiceFailure,
1780
			NotAuthorized, NotFound, NotImplemented {
1781

    
1782
		return delete(null, pid);
1783
	}
1784
	
1785
	@Override
1786
	public Identifier generateIdentifier(String scheme, String fragment)
1787
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1788
			InvalidRequest {
1789

    
1790
		return generateIdentifier(null, scheme, fragment);
1791
	}
1792
	
1793
	@Override
1794
	public Log getLogRecords(Date fromDate, Date toDate, Event event, String pidFilter,
1795
			Integer start, Integer count) throws InvalidToken, InvalidRequest,
1796
			ServiceFailure, NotAuthorized, NotImplemented, InsufficientResources {
1797

    
1798
		return getLogRecords(null, fromDate, toDate, event, pidFilter, start, count);
1799
	}
1800
	
1801
	@Override
1802
	public boolean hasReservation(Subject subject, Identifier pid)
1803
			throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1804
			NotImplemented, InvalidRequest, IdentifierNotUnique {
1805

    
1806
		return hasReservation(null, subject, pid);
1807
	}
1808
	
1809
	@Override
1810
	public Identifier registerSystemMetadata(Identifier pid, SystemMetadata sysmeta)
1811
			throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1812
			InvalidSystemMetadata, InvalidToken {
1813

    
1814
		return registerSystemMetadata(null, pid, sysmeta);
1815
	}
1816
	
1817
	@Override
1818
	public Identifier reserveIdentifier(Identifier pid) throws InvalidToken,
1819
			ServiceFailure, NotAuthorized, IdentifierNotUnique, NotImplemented,
1820
			InvalidRequest {
1821

    
1822
		return reserveIdentifier(null, pid);
1823
	}
1824
	
1825
	@Override
1826
	public boolean setObsoletedBy(Identifier pid, Identifier obsoletedByPid, long serialVersion)
1827
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
1828
			InvalidRequest, InvalidToken, VersionMismatch {
1829

    
1830
		return setObsoletedBy(null, pid, obsoletedByPid, serialVersion);
1831
	}
1832
	
1833
	@Override
1834
	public DescribeResponse describe(Identifier pid) throws InvalidToken,
1835
			NotAuthorized, NotImplemented, ServiceFailure, NotFound {
1836

    
1837
		return describe(null, pid);
1838
	}
1839
	
1840
	@Override
1841
	public InputStream get(Identifier pid) throws InvalidToken, ServiceFailure,
1842
			NotAuthorized, NotFound, NotImplemented {
1843

    
1844
		return get(null, pid);
1845
	}
1846
	
1847
	@Override
1848
	public Checksum getChecksum(Identifier pid) throws InvalidToken,
1849
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1850

    
1851
		return getChecksum(null, pid);
1852
	}
1853
	
1854
	@Override
1855
	public SystemMetadata getSystemMetadata(Identifier pid) throws InvalidToken,
1856
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1857

    
1858
		return getSystemMetadata(null, pid);
1859
	}
1860
	
1861
	@Override
1862
	public ObjectList listObjects(Date startTime, Date endTime,
1863
			ObjectFormatIdentifier formatid, Boolean replicaStatus, Integer start, Integer count)
1864
			throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1865
			ServiceFailure {
1866

    
1867
		return listObjects(null, startTime, endTime, formatid, replicaStatus, start, count);
1868
	}
1869
	
1870
	@Override
1871
	public ObjectLocationList resolve(Identifier pid) throws InvalidToken,
1872
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1873

    
1874
		return resolve(null, pid);
1875
	}
1876
	
1877
	@Override
1878
	public ObjectList search(String queryType, String query) throws InvalidToken,
1879
			ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented {
1880

    
1881
		return search(null, queryType, query);
1882
	}
1883
	
1884
	@Override
1885
	public boolean deleteReplicationMetadata(Identifier pid, NodeReference nodeId,
1886
			long serialVersion) throws InvalidToken, InvalidRequest, ServiceFailure,
1887
			NotAuthorized, NotFound, NotImplemented, VersionMismatch {
1888

    
1889
		return deleteReplicationMetadata(null, pid, nodeId, serialVersion);
1890
	}
1891
	
1892
	@Override
1893
	public boolean isNodeAuthorized(Subject targetNodeSubject, Identifier pid)
1894
			throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure,
1895
			NotFound, InvalidRequest {
1896

    
1897
		return isNodeAuthorized(null, targetNodeSubject, pid);
1898
	}
1899
	
1900
	@Override
1901
	public boolean setReplicationPolicy(Identifier pid, ReplicationPolicy policy,
1902
			long serialVersion) throws NotImplemented, NotFound, NotAuthorized,
1903
			ServiceFailure, InvalidRequest, InvalidToken, VersionMismatch {
1904

    
1905
		return setReplicationPolicy(null, pid, policy, serialVersion);
1906
	}
1907
	
1908
	@Override
1909
	public boolean setReplicationStatus(Identifier pid, NodeReference targetNode,
1910
			ReplicationStatus status, BaseException failure) throws ServiceFailure,
1911
			NotImplemented, InvalidToken, NotAuthorized, InvalidRequest, NotFound {
1912

    
1913
		return setReplicationStatus(null, pid, targetNode, status, failure);
1914
	}
1915
	
1916
	@Override
1917
	public boolean updateReplicationMetadata(Identifier pid, Replica replica,
1918
			long serialVersion) throws NotImplemented, NotAuthorized, ServiceFailure,
1919
			NotFound, InvalidRequest, InvalidToken, VersionMismatch {
1920

    
1921
		return updateReplicationMetadata(null, pid, replica, serialVersion);
1922
	}
1923

    
1924
  @Override
1925
  public QueryEngineDescription getQueryEngineDescription(String arg0)
1926
          throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1927
          NotFound {
1928
      throw new NotImplemented("4410", "getQueryEngineDescription not implemented");
1929
      
1930
  }
1931

    
1932
  @Override
1933
  public QueryEngineList listQueryEngines() throws InvalidToken, ServiceFailure,
1934
          NotAuthorized, NotImplemented {
1935
      throw new NotImplemented("4420", "listQueryEngines not implemented");
1936
      
1937
  }
1938

    
1939
  @Override
1940
  public InputStream query(String arg0, String arg1) throws InvalidToken,
1941
          ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented, NotFound {
1942
      throw new NotImplemented("4324", "query not implemented");
1943
      
1944
  }
1945
}
(1-1/6)