Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.log4j.Logger;
43
import org.dataone.client.v2.CNode;
44
import org.dataone.client.v2.MNode;
45
import org.dataone.client.v2.itk.D1Client;
46
import org.dataone.service.cn.v2.CNAuthorization;
47
import org.dataone.service.cn.v2.CNCore;
48
import org.dataone.service.cn.v2.CNRead;
49
import org.dataone.service.cn.v2.CNReplication;
50
import org.dataone.service.cn.v2.CNView;
51
import org.dataone.service.exceptions.BaseException;
52
import org.dataone.service.exceptions.IdentifierNotUnique;
53
import org.dataone.service.exceptions.InsufficientResources;
54
import org.dataone.service.exceptions.InvalidRequest;
55
import org.dataone.service.exceptions.InvalidSystemMetadata;
56
import org.dataone.service.exceptions.InvalidToken;
57
import org.dataone.service.exceptions.NotAuthorized;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.NotImplemented;
60
import org.dataone.service.exceptions.ServiceFailure;
61
import org.dataone.service.exceptions.UnsupportedType;
62
import org.dataone.service.exceptions.VersionMismatch;
63
import org.dataone.service.types.v1.AccessPolicy;
64
import org.dataone.service.types.v1.Checksum;
65
import org.dataone.service.types.v1.ChecksumAlgorithmList;
66
import org.dataone.service.types.v1.Event;
67
import org.dataone.service.types.v1.Identifier;
68
import org.dataone.service.types.v1.NodeReference;
69
import org.dataone.service.types.v1.NodeType;
70
import org.dataone.service.types.v1.ObjectFormatIdentifier;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v1_1.QueryEngineDescription;
80
import org.dataone.service.types.v1_1.QueryEngineList;
81
import org.dataone.service.types.v2.Node;
82
import org.dataone.service.types.v2.NodeList;
83
import org.dataone.service.types.v2.ObjectFormat;
84
import org.dataone.service.types.v2.ObjectFormatList;
85
import org.dataone.service.types.v2.SystemMetadata;
86
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
87
import org.dataone.service.util.TypeMarshaller;
88
import org.jibx.runtime.JiBXException;
89

    
90
import edu.ucsb.nceas.metacat.DBUtil;
91
import edu.ucsb.nceas.metacat.EventLog;
92
import edu.ucsb.nceas.metacat.IdentifierManager;
93
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
94
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
95
import edu.ucsb.nceas.metacat.properties.PropertyService;
96
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
97

    
98
/**
99
 * Represents Metacat's implementation of the DataONE Coordinating Node 
100
 * service API. Methods implement the various CN* interfaces, and methods common
101
 * to both Member Node and Coordinating Node interfaces are found in the
102
 * D1NodeService super class.
103
 *
104
 */
105
public class CNodeService extends D1NodeService implements CNAuthorization,
106
    CNCore, CNRead, CNReplication, CNView {
107

    
108
  /* the logger instance */
109
  private Logger logMetacat = null;
110
  public final static String V2V1MISSMATCH = "The Coordinating Node is not authorized to make systemMetadata changes on this object. Please make changes directly on the authoritative Member Node.";
111

    
112
  /**
113
   * singleton accessor
114
   */
115
  public static CNodeService getInstance(HttpServletRequest request) { 
116
    return new CNodeService(request);
117
  }
118
  
119
  /**
120
   * Constructor, private for singleton access
121
   */
122
  private CNodeService(HttpServletRequest request) {
123
    super(request);
124
    logMetacat = Logger.getLogger(CNodeService.class);
125
        
126
  }
127
    
128
  /**
129
   * Set the replication policy for an object given the object identifier
130
   * It only is applied to objects whose authoritative mn is a v1 node.
131
   * @param session - the Session object containing the credentials for the Subject
132
   * @param pid - the object identifier for the given object
133
   * @param policy - the replication policy to be applied
134
   * 
135
   * @return true or false
136
   * 
137
   * @throws NotImplemented
138
   * @throws NotAuthorized
139
   * @throws ServiceFailure
140
   * @throws InvalidRequest
141
   * @throws VersionMismatch
142
   * 
143
   */
144
  @Override
145
  public boolean setReplicationPolicy(Session session, Identifier pid,
146
      ReplicationPolicy policy, long serialVersion) 
147
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
148
      InvalidRequest, InvalidToken, VersionMismatch {
149
      
150
      // do we have a valid pid?
151
      if (pid == null || pid.getValue().trim().equals("")) {
152
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
153
          
154
      }
155
      
156
      //only allow pid to be passed
157
      String serviceFailure = "4882";
158
      String notFound = "4884";
159
      checkV1SystemMetaPidExist(pid, serviceFailure, "The object for given PID "+pid.getValue()+" couldn't be identified if it exists",  notFound, 
160
              "No object could be found for given PID: "+pid.getValue());
161
      
162
      /*String serviceFailureCode = "4882";
163
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
164
      if(sid != null) {
165
          pid = sid;
166
      }*/
167
      // The lock to be used for this identifier
168
      Lock lock = null;
169
      
170
      // get the subject
171
      Subject subject = session.getSubject();
172
      
173
      // are we allowed to do this?
174
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
175
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
176
                  + " not allowed by " + subject.getValue() + " on "
177
                  + pid.getValue());
178
          
179
      }
180
      
181
      SystemMetadata systemMetadata = null;
182
      try {
183
          lock = HazelcastService.getInstance().getLock(pid.getValue());
184
          lock.lock();
185
          logMetacat.debug("Locked identifier " + pid.getValue());
186

    
187
          try {
188
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
189
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
190
                  
191
              }
192
              
193
              // did we get it correctly?
194
              if ( systemMetadata == null ) {
195
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
196
                  
197
              }
198
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
199
              String version = checker.getVersion("MNStorage");
200
              if(version == null) {
201
                  throw new ServiceFailure("4882", "Couldn't determine the authoritative member node storage version for the pid "+pid.getValue());
202
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
203
                  //we don't apply this method to an object whose authoritative node is v2
204
                  throw new NotAuthorized("4881", V2V1MISSMATCH);
205
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
206
                  //we don't understand this version (it is not v1 or v2)
207
                  throw new InvalidRequest("4883", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
208
              }
209
              // does the request have the most current system metadata?
210
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
211
                 String msg = "The requested system metadata version number " + 
212
                     serialVersion + " differs from the current version at " +
213
                     systemMetadata.getSerialVersion().longValue() +
214
                     ". Please get the latest copy in order to modify it.";
215
                 throw new VersionMismatch("4886", msg);
216
                 
217
              }
218
              
219
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
220
              throw new NotFound("4884", "No record found for: " + pid.getValue());
221
            
222
          }
223
          
224
          // set the new policy
225
          systemMetadata.setReplicationPolicy(policy);
226
          
227
          // update the metadata
228
          try {
229
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
230
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
231
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
232
              notifyReplicaNodes(systemMetadata);
233
              
234
          } catch (RuntimeException e) {
235
              throw new ServiceFailure("4882", e.getMessage());
236
          
237
          }
238
          
239
      } catch (RuntimeException e) {
240
          throw new ServiceFailure("4882", e.getMessage());
241
          
242
      } finally {
243
          lock.unlock();
244
          logMetacat.debug("Unlocked identifier " + pid.getValue());
245
          
246
      }
247
    
248
      return true;
249
  }
250

    
251
  /**
252
   * Deletes the replica from the given Member Node
253
   * NOTE: MN.delete() may be an "archive" operation. TBD.
254
   * @param session
255
   * @param pid
256
   * @param nodeId
257
   * @param serialVersion
258
   * @return
259
   * @throws InvalidToken
260
   * @throws ServiceFailure
261
   * @throws NotAuthorized
262
   * @throws NotFound
263
   * @throws NotImplemented
264
   * @throws VersionMismatch
265
   */
266
  @Override
267
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
268
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
269
	  
270
	  	// The lock to be used for this identifier
271
		Lock lock = null;
272

    
273
		// get the subject
274
		Subject subject = session.getSubject();
275

    
276
		// are we allowed to do this?
277
		/*boolean isAuthorized = false;
278
		try {
279
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
280
		} catch (InvalidRequest e) {
281
			throw new ServiceFailure("4882", e.getDescription());
282
		}
283
		if (!isAuthorized) {
284
			throw new NotAuthorized("4881", Permission.WRITE
285
					+ " not allowed by " + subject.getValue() + " on "
286
					+ pid.getValue());
287

    
288
		}*/
289
		if(session == null) {
290
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
291
		} else {
292
		    if(!isCNAdmin(session)) {
293
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
294
		    }
295
		}
296

    
297
		SystemMetadata systemMetadata = null;
298
		try {
299
			lock = HazelcastService.getInstance().getLock(pid.getValue());
300
			lock.lock();
301
			logMetacat.debug("Locked identifier " + pid.getValue());
302

    
303
			try {
304
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
305
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
306
				}
307

    
308
				// did we get it correctly?
309
				if (systemMetadata == null) {
310
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
311
				}
312

    
313
				// does the request have the most current system metadata?
314
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
315
					String msg = "The requested system metadata version number "
316
							+ serialVersion
317
							+ " differs from the current version at "
318
							+ systemMetadata.getSerialVersion().longValue()
319
							+ ". Please get the latest copy in order to modify it.";
320
					throw new VersionMismatch("4886", msg);
321

    
322
				}
323

    
324
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
325
				throw new NotFound("4884", "No record found for: " + pid.getValue());
326

    
327
			}
328
			  
329
			// check permissions
330
			// TODO: is this necessary?
331
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
332
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
333
			if (!isAllowed) {
334
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
335
			}*/
336
			  
337
			// delete the replica from the given node
338
			// CSJ: use CN.delete() to truly delete a replica, semantically
339
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
340
			//D1Client.getMN(nodeId).delete(session, pid);
341
			  
342
			// reflect that change in the system metadata
343
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
344
			for (Replica r: systemMetadata.getReplicaList()) {
345
				  if (r.getReplicaMemberNode().equals(nodeId)) {
346
					  updatedReplicas.remove(r);
347
					  break;
348
				  }
349
			}
350
			systemMetadata.setReplicaList(updatedReplicas);
351

    
352
			// update the metadata
353
			try {
354
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
355
				//we don't need to update the modification date.
356
				//systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
357
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
358
			} catch (RuntimeException e) {
359
				throw new ServiceFailure("4882", e.getMessage());
360
			}
361

    
362
		} catch (RuntimeException e) {
363
			throw new ServiceFailure("4882", e.getMessage());
364
		} finally {
365
			lock.unlock();
366
			logMetacat.debug("Unlocked identifier " + pid.getValue());
367
		}
368

    
369
		return true;	  
370
	  
371
  }
372
  
373
  /**
374
   * Deletes an object from the Coordinating Node
375
   * 
376
   * @param session - the Session object containing the credentials for the Subject
377
   * @param pid - The object identifier to be deleted
378
   * 
379
   * @return pid - the identifier of the object used for the deletion
380
   * 
381
   * @throws InvalidToken
382
   * @throws ServiceFailure
383
   * @throws NotAuthorized
384
   * @throws NotFound
385
   * @throws NotImplemented
386
   * @throws InvalidRequest
387
   */
388
  @Override
389
  public Identifier delete(Session session, Identifier pid) 
390
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
391
      
392
      String localId = null;      // The corresponding docid for this pid
393
	  Lock lock = null;           // The lock to be used for this identifier
394
      CNode cn = null;            // a reference to the CN to get the node list    
395
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
396
      List<Node> nodeList = null; // the list of nodes in this CN environment
397

    
398
      // check for a valid session
399
      if (session == null) {
400
        	throw new InvalidToken("4963", "No session has been provided");
401
        	
402
      }
403

    
404
      // do we have a valid pid?
405
      if (pid == null || pid.getValue().trim().equals("")) {
406
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
407
          
408
      }
409
      
410
      String serviceFailureCode = "4962";
411
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
412
      if(sid != null) {
413
          pid = sid;
414
      }
415

    
416
	  // check that it is CN/admin
417
	  boolean allowed = isAdminAuthorized(session);
418
	  
419
	  // additional check if it is the authoritative node if it is not the admin
420
      if(!allowed) {
421
          allowed = isAuthoritativeMNodeAdmin(session, pid);
422
          
423
      }
424
	  
425
	  if (!allowed) {
426
		  String msg = "The subject " + session.getSubject().getValue() + 
427
			  " is not allowed to call delete() on a Coordinating Node.";
428
		  logMetacat.info(msg);
429
		  throw new NotAuthorized("4960", msg);
430
		  
431
	  }
432
	  
433
	  // Don't defer to superclass implementation without a locally registered identifier
434
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
435
      // Check for the existing identifier
436
      try {
437
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
438
          super.delete(session, pid);
439
          
440
      } catch (McdbDocNotFoundException e) {
441
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
442
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
443
    	  
444
          try {
445
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
446
  			  lock.lock();
447
  			  logMetacat.debug("Locked identifier " + pid.getValue());
448

    
449
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
450
			  if ( sysMeta != null ) {
451
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
452
				sysMeta.setArchived(true);
453
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
454
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
455
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
456
	            //since this is cn, we don't need worry about the mn solr index.
457
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
458
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
459
	            String username = session.getSubject().getValue();//just for logging purpose
460
                //since data objects were not registered in the identifier table, we use pid as the docid
461
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
462
				
463
			  } else {
464
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
465
					  ". Couldn't obtain the system metadata record.");
466
				  
467
			  }
468
			  
469
		  } catch (RuntimeException re) {
470
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
471
				  ". The error message was: " + re.getMessage());
472
			  
473
		  } finally {
474
			  lock.unlock();
475
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
476

    
477
		  }
478

    
479
          // NOTE: cannot log the delete without localId
480
//          EventLog.getInstance().log(request.getRemoteAddr(), 
481
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
482
//                  pid.getValue(), Event.DELETE.xmlValue());
483

    
484
      } catch (SQLException e) {
485
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
486
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
487
      }
488

    
489
      // get the node list
490
      try {
491
          cn = D1Client.getCN();
492
          nodeList = cn.listNodes().getNodeList();
493
          
494
      } catch (Exception e) { // handle BaseException and other I/O issues
495
          
496
          // swallow errors since the call is not critical
497
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
498
              " due to communication issues with the CN: " + e.getMessage());
499
          
500
      }
501

    
502
	  // notify the replicas
503
	  if (systemMetadata.getReplicaList() != null) {
504
		  for (Replica replica: systemMetadata.getReplicaList()) {
505
			  NodeReference replicaNode = replica.getReplicaMemberNode();
506
			  try {
507
                  if (nodeList != null) {
508
                      // find the node type
509
                      for (Node node : nodeList) {
510
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
511
                              nodeType = node.getType();
512
                              break;
513
              
514
                          }
515
                      }
516
                  }
517
                  
518
                  // only send call MN.delete() to avoid an infinite loop with the CN
519
                  if (nodeType != null && nodeType == NodeType.MN) {
520
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
521
                  }
522
                  
523
			  } catch (Exception e) {
524
				  // all we can really do is log errors and carry on with life
525
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
526
					  " from replica MN: " + replicaNode.getValue(), e);
527
			}
528
			  
529
		  }
530
	  }
531
	  
532
	  return pid;
533
      
534
  }
535
  
536
  /**
537
   * Archives an object from the Coordinating Node
538
   * 
539
   * @param session - the Session object containing the credentials for the Subject
540
   * @param pid - The object identifier to be deleted
541
   * 
542
   * @return pid - the identifier of the object used for the deletion
543
   * 
544
   * @throws InvalidToken
545
   * @throws ServiceFailure
546
   * @throws NotAuthorized
547
   * @throws NotFound
548
   * @throws NotImplemented
549
   * @throws InvalidRequest
550
   */
551
  @Override
552
  public Identifier archive(Session session, Identifier pid) 
553
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
554

    
555
      String localId = null; // The corresponding docid for this pid
556
	  Lock lock = null;      // The lock to be used for this identifier
557
      CNode cn = null;            // a reference to the CN to get the node list    
558
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
559
      List<Node> nodeList = null; // the list of nodes in this CN environment
560
      
561

    
562
      // check for a valid session
563
      if (session == null) {
564
        	throw new InvalidToken("4973", "No session has been provided");
565
        	
566
      }
567

    
568
      // do we have a valid pid?
569
      if (pid == null || pid.getValue().trim().equals("")) {
570
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
571
          
572
      }
573

    
574
	  // check that it is CN/admin
575
	  boolean allowed = isAdminAuthorized(session);
576
	  
577
	  String serviceFailureCode = "4972";
578
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
579
	  if(sid != null) {
580
	        pid = sid;
581
	  }
582
	  
583
	  //check if it is the authoritative member node
584
	  if(!allowed) {
585
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
586
	  }
587
	  
588
	  if (!allowed) {
589
		  String msg = "The subject " + session.getSubject().getValue() + 
590
				  " is not allowed to call archive() on a Coordinating Node.";
591
		  logMetacat.info(msg);
592
		  throw new NotAuthorized("4970", msg);
593
	  }
594
	  
595
      try {
596
          HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
597
          logMetacat.debug("CNodeService.archive - lock the system metadata for "+pid.getValue());
598
          SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
599
          D1NodeVersionChecker checker = new D1NodeVersionChecker(sysMeta.getAuthoritativeMemberNode());
600
          String version = checker.getVersion("MNStorage");
601
          if(version == null) {
602
              throw new ServiceFailure("4972", "Couldn't determine the authoritative member node storage version for the pid "+pid.getValue());
603
          } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
604
              //we don't apply this method to an object whose authoritative node is v2
605
              throw new NotAuthorized("4970", V2V1MISSMATCH);
606
          } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
607
              //we don't understand this version (it is not v1 or v2)
608
              throw new NotImplemented("4974", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
609
          }
610
          boolean needModifyDate = true;
611
          archiveCNObjectWithNotificationReplica(session, pid, sysMeta, needModifyDate);
612
      
613
      } finally {
614
          HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
615
          logMetacat.debug("CNodeService.archive - unlock the system metadata for "+pid.getValue());
616
      }
617

    
618
	  return pid;
619
      
620
  }
621
  
622
  
623
  /**
624
   * Archive a object on cn and notify the replica. This method doesn't lock the system metadata map. The caller should lock it.
625
   * This method doesn't check the authorization; this method only accept a pid.
626
   * @param session
627
   * @param pid
628
   * @param sysMeta
629
   * @param notifyReplica
630
   * @return
631
   * @throws InvalidToken
632
   * @throws ServiceFailure
633
   * @throws NotAuthorized
634
   * @throws NotFound
635
   * @throws NotImplemented
636
   */
637
  private Identifier archiveCNObjectWithNotificationReplica(Session session, Identifier pid, SystemMetadata sysMeta, boolean needModifyDate) 
638
                  throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
639
          boolean logArchive = true;
640
          archiveCNObject(logArchive, session, pid, sysMeta, needModifyDate);
641
          // notify the replicas
642
          notifyReplicaNodes(sysMeta);
643
          return pid;
644
    }
645
  
646
  
647
  
648
  /**
649
   * Set the obsoletedBy attribute in System Metadata
650
   * @param session
651
   * @param pid
652
   * @param obsoletedByPid
653
   * @param serialVersion
654
   * @return
655
   * @throws NotImplemented
656
   * @throws NotFound
657
   * @throws NotAuthorized
658
   * @throws ServiceFailure
659
   * @throws InvalidRequest
660
   * @throws InvalidToken
661
   * @throws VersionMismatch
662
   */
663
  @Override
664
  public boolean setObsoletedBy(Session session, Identifier pid,
665
			Identifier obsoletedByPid, long serialVersion)
666
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
667
			InvalidRequest, InvalidToken, VersionMismatch {
668
      
669
      // do we have a valid pid?
670
      if (pid == null || pid.getValue().trim().equals("")) {
671
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
672
          
673
      }
674
      
675
      /*String serviceFailureCode = "4941";
676
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
677
      if(sid != null) {
678
          pid = sid;
679
      }*/
680
      
681
      // do we have a valid pid?
682
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
683
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
684
          
685
      }
686
      
687
      try {
688
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
689
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
690
          }
691
      } catch (SQLException ee) {
692
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
693
      }
694
      
695

    
696
		// The lock to be used for this identifier
697
		Lock lock = null;
698

    
699
		// get the subject
700
		Subject subject = session.getSubject();
701

    
702
		// are we allowed to do this?
703
		if (!isAuthorized(session, pid, Permission.WRITE)) {
704
			throw new NotAuthorized("4881", Permission.WRITE
705
					+ " not allowed by " + subject.getValue() + " on "
706
					+ pid.getValue());
707

    
708
		}
709

    
710

    
711
		SystemMetadata systemMetadata = null;
712
		try {
713
			lock = HazelcastService.getInstance().getLock(pid.getValue());
714
			lock.lock();
715
			logMetacat.debug("Locked identifier " + pid.getValue());
716

    
717
			try {
718
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
719
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
720
				}
721

    
722
				// did we get it correctly?
723
				if (systemMetadata == null) {
724
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
725
				}
726

    
727
				// does the request have the most current system metadata?
728
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
729
					String msg = "The requested system metadata version number "
730
							+ serialVersion
731
							+ " differs from the current version at "
732
							+ systemMetadata.getSerialVersion().longValue()
733
							+ ". Please get the latest copy in order to modify it.";
734
					throw new VersionMismatch("4886", msg);
735

    
736
				}
737
				
738
				 //only apply to the object whose authoritative member node is v1.
739
	              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
740
	              String version = checker.getVersion("MNStorage");
741
	              if(version == null) {
742
	                  throw new ServiceFailure("4941", "Couldn't determine the authoritative member node storage version for the pid "+pid.getValue());
743
	              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
744
	                  //we don't apply this method to an object whose authoritative node is v2
745
	                  throw new NotAuthorized("4945", V2V1MISSMATCH);
746
	              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
747
	                  //we don't understand this version (it is not v1 or v2)
748
	                  throw new InvalidRequest("4942", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
749
	              }
750

    
751
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
752
				throw new NotFound("4884", "No record found for: " + pid.getValue());
753

    
754
			}
755

    
756
			// set the new policy
757
			systemMetadata.setObsoletedBy(obsoletedByPid);
758

    
759
			// update the metadata
760
			try {
761
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
762
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
763
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
764
			} catch (RuntimeException e) {
765
				throw new ServiceFailure("4882", e.getMessage());
766
			}
767

    
768
		} catch (RuntimeException e) {
769
			throw new ServiceFailure("4882", e.getMessage());
770
		} finally {
771
			lock.unlock();
772
			logMetacat.debug("Unlocked identifier " + pid.getValue());
773
		}
774

    
775
		return true;
776
	}
777
  
778
  
779
  /**
780
   * Set the replication status for an object given the object identifier
781
   * 
782
   * @param session - the Session object containing the credentials for the Subject
783
   * @param pid - the object identifier for the given object
784
   * @param status - the replication status to be applied
785
   * 
786
   * @return true or false
787
   * 
788
   * @throws NotImplemented
789
   * @throws NotAuthorized
790
   * @throws ServiceFailure
791
   * @throws InvalidRequest
792
   * @throws InvalidToken
793
   * @throws NotFound
794
   * 
795
   */
796
  @Override
797
  public boolean setReplicationStatus(Session session, Identifier pid,
798
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
799
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
800
      InvalidRequest, NotFound {
801
	  
802
	  // cannot be called by public
803
	  if (session == null) {
804
		  throw new NotAuthorized("4720", "Session cannot be null");
805
	  } 
806
	  
807
	  /*else {
808
	      if(!isCNAdmin(session)) {
809
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
810
        }
811
	  }*/
812
	  
813
	// do we have a valid pid?
814
      if (pid == null || pid.getValue().trim().equals("")) {
815
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
816
          
817
      }
818
      
819
      /*String serviceFailureCode = "4700";
820
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
821
      if(sid != null) {
822
          pid = sid;
823
      }*/
824
      
825
      // The lock to be used for this identifier
826
      Lock lock = null;
827
      
828
      boolean allowed = false;
829
      int replicaEntryIndex = -1;
830
      List<Replica> replicas = null;
831
      // get the subject
832
      Subject subject = session.getSubject();
833
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
834
          " is " + status.toString());
835
      
836
      SystemMetadata systemMetadata = null;
837

    
838
      try {
839
          lock = HazelcastService.getInstance().getLock(pid.getValue());
840
          lock.lock();
841
          logMetacat.debug("Locked identifier " + pid.getValue());
842

    
843
          try {      
844
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
845

    
846
              // did we get it correctly?
847
              if ( systemMetadata == null ) {
848
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
849
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
850
                  
851
              }
852
              replicas = systemMetadata.getReplicaList();
853
              int count = 0;
854
              
855
              // was there a failure? log it
856
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
857
                 String msg = "The replication request of the object identified by " + 
858
                     pid.getValue() + " failed.  The error message was " +
859
                     failure.getMessage() + ".";
860
                 logMetacat.error(msg);
861
              }
862
              
863
              if (replicas.size() > 0 && replicas != null) {
864
                  // find the target replica index in the replica list
865
                  for (Replica replica : replicas) {
866
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
867
                      String targetNodeStr = targetNode.getValue();
868
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
869
                  
870
                      if (replicaNodeStr.equals(targetNodeStr)) {
871
                          replicaEntryIndex = count;
872
                          logMetacat.debug("replica entry index is: "
873
                                  + replicaEntryIndex);
874
                          break;
875
                      }
876
                      count++;
877
                  
878
                  }
879
              }
880
              // are we allowed to do this? only CNs and target MNs are allowed
881
              CNode cn = D1Client.getCN();
882
              List<Node> nodes = cn.listNodes().getNodeList();
883
              
884
              // find the node in the node list
885
              for ( Node node : nodes ) {
886
                  
887
                  NodeReference nodeReference = node.getIdentifier();
888
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
889
                      nodeReference.getValue());
890
                  
891
                  // allow target MN certs
892
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
893
                      node.getType().equals(NodeType.MN)) {
894
                      List<Subject> nodeSubjects = node.getSubjectList();
895
                      
896
                      // check if the session subject is in the node subject list
897
                      for (Subject nodeSubject : nodeSubjects) {
898
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
899
                                  nodeSubject.getValue() + " and " + subject.getValue());
900
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
901
                              
902
                              // lastly limit to COMPLETED, INVALIDATED,
903
                              // and FAILED status updates from MNs only
904
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
905
                                   status.equals(ReplicationStatus.INVALIDATED) ||
906
                                   status.equals(ReplicationStatus.FAILED)) {
907
                                  allowed = true;
908
                                  break;
909
                                  
910
                              }                              
911
                          }
912
                      }                 
913
                  }
914
              }
915

    
916
              if ( !allowed ) {
917
                  //check for CN admin access
918
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
919
                  allowed = isCNAdmin(session);
920
                  
921
              }              
922
              
923
              if ( !allowed ) {
924
                  String msg = "The subject identified by "
925
                          + subject.getValue()
926
                          + " is not a CN or MN, and does not have permission to set the replication status for "
927
                          + "the replica identified by "
928
                          + targetNode.getValue() + ".";
929
                  logMetacat.info(msg);
930
                  throw new NotAuthorized("4720", msg);
931
                  
932
              }
933

    
934
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
935
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
936
                " : " + e.getMessage());
937
            
938
          }
939
          
940
          Replica targetReplica = new Replica();
941
          // set the status for the replica
942
          if ( replicaEntryIndex != -1 ) {
943
              targetReplica = replicas.get(replicaEntryIndex);
944
              
945
              // don't allow status to change from COMPLETED to anything other
946
              // than INVALIDATED: prevents overwrites from race conditions
947
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
948
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
949
            	  throw new InvalidRequest("4730", "Status state change from " +
950
            			  targetReplica.getReplicationStatus() + " to " +
951
            			  status.toString() + "is prohibited for identifier " +
952
            			  pid.getValue() + " and target node " + 
953
            			  targetReplica.getReplicaMemberNode().getValue());
954
              }
955
              
956
              if(targetReplica.getReplicationStatus().equals(status)) {
957
                  //There is no change in the status, we do nothing.
958
                  return true;
959
              }
960
              
961
              targetReplica.setReplicationStatus(status);
962
              
963
              logMetacat.debug("Set the replication status for " + 
964
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
965
                  targetReplica.getReplicationStatus() + " for identifier " +
966
                  pid.getValue());
967
              
968
          } else {
969
              // this is a new entry, create it
970
              targetReplica.setReplicaMemberNode(targetNode);
971
              targetReplica.setReplicationStatus(status);
972
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
973
              replicas.add(targetReplica);
974
              
975
          }
976
          
977
          systemMetadata.setReplicaList(replicas);
978
                
979
          // update the metadata
980
          try {
981
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
982
              // Based on CN behavior discussion 9/16/15, we no longer want to 
983
              // update the modified date for changes to the replica list
984
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
985
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
986

    
987
              if ( !status.equals(ReplicationStatus.QUEUED) && 
988
            	   !status.equals(ReplicationStatus.REQUESTED)) {
989
                  
990
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
991
                          "\tNODE:\t" + targetNode.getValue() + 
992
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
993
                
994
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
995
                          "\tPID:\t"  + pid.getValue() + 
996
                          "\tNODE:\t" + targetNode.getValue() + 
997
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
998
              }
999

    
1000
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
1001
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
1002
                      " on target node " + targetNode + ". The exception was: " +
1003
                      failure.getMessage());
1004
              }
1005
              
1006
			  // update the replica nodes about the completed replica when complete, failed or invalid
1007
              if (status.equals(ReplicationStatus.COMPLETED) || status.equals(ReplicationStatus.FAILED) ||
1008
                      status.equals(ReplicationStatus.INVALIDATED)) {
1009
				notifyReplicaNodes(systemMetadata);
1010
			}
1011
          
1012
          } catch (RuntimeException e) {
1013
              throw new ServiceFailure("4700", e.getMessage());
1014
          
1015
          }
1016
          
1017
    } catch (RuntimeException e) {
1018
        String msg = "There was a RuntimeException getting the lock for " +
1019
            pid.getValue();
1020
        logMetacat.info(msg);
1021
        
1022
    } finally {
1023
        lock.unlock();
1024
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1025
        
1026
    }
1027
      
1028
      return true;
1029
  }
1030
  
1031
/**
1032
   * Return the checksum of the object given the identifier 
1033
   * 
1034
   * @param session - the Session object containing the credentials for the Subject
1035
   * @param pid - the object identifier for the given object
1036
   * 
1037
   * @return checksum - the checksum of the object
1038
   * 
1039
   * @throws InvalidToken
1040
   * @throws ServiceFailure
1041
   * @throws NotAuthorized
1042
   * @throws NotFound
1043
   * @throws NotImplemented
1044
   */
1045
  @Override
1046
  public Checksum getChecksum(Session session, Identifier pid)
1047
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1048
    NotImplemented {
1049
    
1050
	boolean isAuthorized = false;
1051
	try {
1052
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1053
	} catch (InvalidRequest e) {
1054
		throw new ServiceFailure("1410", e.getDescription());
1055
	}  
1056
    if (!isAuthorized) {
1057
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1058
    }
1059
    
1060
    SystemMetadata systemMetadata = null;
1061
    Checksum checksum = null;
1062
    
1063
    try {
1064
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1065

    
1066
        if (systemMetadata == null ) {
1067
            String error ="";
1068
            String localId = null;
1069
            try {
1070
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1071
              
1072
             } catch (Exception e) {
1073
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1074
            }
1075
            
1076
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1077
                error = DELETEDMESSAGE;
1078
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1079
                error = DELETEDMESSAGE;
1080
            }
1081
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1082
        }
1083
        checksum = systemMetadata.getChecksum();
1084
        
1085
    } catch (RuntimeException e) {
1086
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1087
            pid.getValue() + ". The error message was: " + e.getMessage());
1088
      
1089
    }
1090
    
1091
    return checksum;
1092
  }
1093

    
1094
  /**
1095
   * Resolve the location of a given object
1096
   * 
1097
   * @param session - the Session object containing the credentials for the Subject
1098
   * @param pid - the object identifier for the given object
1099
   * 
1100
   * @return objectLocationList - the list of nodes known to contain the object
1101
   * 
1102
   * @throws InvalidToken
1103
   * @throws ServiceFailure
1104
   * @throws NotAuthorized
1105
   * @throws NotFound
1106
   * @throws NotImplemented
1107
   */
1108
  @Override
1109
  public ObjectLocationList resolve(Session session, Identifier pid)
1110
    throws InvalidToken, ServiceFailure, NotAuthorized,
1111
    NotFound, NotImplemented {
1112

    
1113
    throw new NotImplemented("4131", "resolve not implemented");
1114

    
1115
  }
1116

    
1117
  /**
1118
   * Metacat does not implement this method at the CN level
1119
   */
1120
  @Override
1121
  public ObjectList search(Session session, String queryType, String query)
1122
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1123
    NotImplemented {
1124

    
1125
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1126
	  
1127
//    ObjectList objectList = null;
1128
//    try {
1129
//        objectList = 
1130
//          IdentifierManager.getInstance().querySystemMetadata(
1131
//              null, //startTime, 
1132
//              null, //endTime,
1133
//              null, //objectFormat, 
1134
//              false, //replicaStatus, 
1135
//              0, //start, 
1136
//              1000 //count
1137
//              );
1138
//        
1139
//    } catch (Exception e) {
1140
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1141
//    }
1142
//
1143
//      return objectList;
1144
		  
1145
  }
1146
  
1147
  /**
1148
   * Returns the object format registered in the DataONE Object Format 
1149
   * Vocabulary for the given format identifier
1150
   * 
1151
   * @param fmtid - the identifier of the format requested
1152
   * 
1153
   * @return objectFormat - the object format requested
1154
   * 
1155
   * @throws ServiceFailure
1156
   * @throws NotFound
1157
   * @throws InsufficientResources
1158
   * @throws NotImplemented
1159
   */
1160
  @Override
1161
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1162
    throws ServiceFailure, NotFound, NotImplemented {
1163
     
1164
      return ObjectFormatService.getInstance().getFormat(fmtid);
1165
      
1166
  }
1167

    
1168
    @Override
1169
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1170
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1171

    
1172
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1173
                "format ID: " + format.getFormatId() + "\n" + 
1174
                "format name: " + format.getFormatName() + "\n" + 
1175
                "format type: " + format.getFormatType() );
1176
        
1177
        // FIXME remove:
1178
        if (true)
1179
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1180
        
1181
        if (!isAdminAuthorized(session))
1182
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1183

    
1184
        String separator = ".";
1185
        try {
1186
            separator = PropertyService.getProperty("document.accNumSeparator");
1187
        } catch (PropertyNotFoundException e) {
1188
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1189
        }
1190

    
1191
        // find pids of last and next ObjectFormatList
1192
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1193
        int lastRev = -1;
1194
        try {
1195
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1196
        } catch (SQLException e) {
1197
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1198
        }
1199
        int nextRev = lastRev + 1;
1200
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1201
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1202
        
1203
        Identifier lastPid = new Identifier();
1204
        lastPid.setValue(lastDocID);
1205
        Identifier nextPid = new Identifier();
1206
        nextPid.setValue(nextDocID);
1207
        
1208
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1209
                + "Next ObjectFormatList document ID: " + nextDocID);
1210
        
1211
        // add new format to the current ObjectFormatList
1212
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1213
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1214
        innerList.add(format);
1215

    
1216
        // get existing (last) sysmeta and make a copy
1217
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1218
        SystemMetadata nextSysmeta = new SystemMetadata();
1219
        try {
1220
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1221
        } catch (IllegalAccessException | InvocationTargetException e) {
1222
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1223
        }
1224
        
1225
        // create the new object format list, and update the old sysmeta with obsoletedBy
1226
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1227
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1228
        
1229
        // TODO add to ObjectFormatService local cache?
1230
        
1231
        return formatId;
1232
    }
1233

    
1234
    /**
1235
     * Creates the object for the next / updated version of the ObjectFormatList.
1236
     * 
1237
     * @param session
1238
     * @param lastPid
1239
     * @param nextPid
1240
     * @param objectFormatList
1241
     * @param lastSysmeta
1242
     */
1243
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1244
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1245
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1246
        
1247
        PipedInputStream is = new PipedInputStream();
1248
        PipedOutputStream os = null;
1249
        
1250
        try {
1251
            os = new PipedOutputStream(is);
1252
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1253
        } catch (JiBXException | IOException e) {
1254
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1255
        } finally {
1256
            try {
1257
                os.flush();
1258
                os.close();
1259
            } catch (IOException ioe) {
1260
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1261
            }
1262
        }
1263
        
1264
        BigInteger docSize = lastSysmeta.getSize();
1265
        try {
1266
            docSize = BigInteger.valueOf(is.available());
1267
        } catch (IOException e) {
1268
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1269
        }
1270
        
1271
        lastSysmeta.setIdentifier(nextPid);
1272
        lastSysmeta.setObsoletes(lastPid);
1273
        lastSysmeta.setSize(docSize); 
1274
        lastSysmeta.setSubmitter(session.getSubject());
1275
        lastSysmeta.setDateUploaded(new Date());
1276
        
1277
        // create new object format list
1278
        try {
1279
            create(session, nextPid, is, lastSysmeta);
1280
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1281
                | InvalidSystemMetadata | InvalidRequest e) {
1282
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1283
        }
1284
    }
1285
  
1286
    /**
1287
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1288
     * by setting the obsoletedBy value to the pid of the new version of the 
1289
     * ObjectFormatList.
1290
     * 
1291
     * @param session
1292
     * @param lastPid
1293
     * @param obsoletedByPid
1294
     * @param lastSysmeta
1295
     * @throws ServiceFailure
1296
     */
1297
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1298
            throws ServiceFailure {
1299
        
1300
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1301
        
1302
        try {
1303
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1304
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1305
                | InvalidSystemMetadata | InvalidToken e) {
1306
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1307
        }
1308
    }
1309
  /**
1310
   * Returns a list of all object formats registered in the DataONE Object 
1311
   * Format Vocabulary
1312
    * 
1313
   * @return objectFormatList - The list of object formats registered in 
1314
   *                            the DataONE Object Format Vocabulary
1315
   * 
1316
   * @throws ServiceFailure
1317
   * @throws NotImplemented
1318
   * @throws InsufficientResources
1319
   */
1320
  @Override
1321
  public ObjectFormatList listFormats() 
1322
    throws ServiceFailure, NotImplemented {
1323

    
1324
    return ObjectFormatService.getInstance().listFormats();
1325
  }
1326

    
1327
  /**
1328
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1329
    * 
1330
   * @return nodeList - List of nodes from the registry
1331
   * 
1332
   * @throws ServiceFailure
1333
   * @throws NotImplemented
1334
   */
1335
  @Override
1336
  public NodeList listNodes() 
1337
    throws NotImplemented, ServiceFailure {
1338

    
1339
    throw new NotImplemented("4800", "listNodes not implemented");
1340
  }
1341

    
1342
  /**
1343
   * Provides a mechanism for adding system metadata independently of its 
1344
   * associated object, such as when adding system metadata for data objects.
1345
    * 
1346
   * @param session - the Session object containing the credentials for the Subject
1347
   * @param pid - The identifier of the object to register the system metadata against
1348
   * @param sysmeta - The system metadata to be registered
1349
   * 
1350
   * @return true if the registration succeeds
1351
   * 
1352
   * @throws NotImplemented
1353
   * @throws NotAuthorized
1354
   * @throws ServiceFailure
1355
   * @throws InvalidRequest
1356
   * @throws InvalidSystemMetadata
1357
   */
1358
  @Override
1359
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1360
      SystemMetadata sysmeta) 
1361
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1362
      InvalidSystemMetadata {
1363

    
1364
      // The lock to be used for this identifier
1365
      Lock lock = null;
1366

    
1367
      // TODO: control who can call this?
1368
      if (session == null) {
1369
          //TODO: many of the thrown exceptions do not use the correct error codes
1370
          //check these against the docs and correct them
1371
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1372
                  "  If you are not logged in, please do so and retry the request.");
1373
      } else {
1374
          //only CN is allwoed
1375
          if(!isCNAdmin(session)) {
1376
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1377
          }
1378
      }
1379
      // the identifier can't be an SID
1380
      try {
1381
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1382
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1383
          }
1384
      } catch (SQLException sqle) {
1385
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1386
      }
1387
      
1388
      // verify that guid == SystemMetadata.getIdentifier()
1389
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1390
          "|" + sysmeta.getIdentifier().getValue());
1391
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1392
          throw new InvalidRequest("4863", 
1393
              "The identifier in method call (" + pid.getValue() + 
1394
              ") does not match identifier in system metadata (" +
1395
              sysmeta.getIdentifier().getValue() + ").");
1396
      }
1397
      
1398
      //check if the sid is legitimate in the system metadata
1399
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1400
      Identifier sid = sysmeta.getSeriesId();
1401
      if(sid != null) {
1402
          if (!isValidIdentifier(sid)) {
1403
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1404
          }
1405
      }
1406

    
1407
      try {
1408
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1409
          lock.lock();
1410
          logMetacat.debug("Locked identifier " + pid.getValue());
1411
          logMetacat.debug("Checking if identifier exists...");
1412
          // Check that the identifier does not already exist
1413
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1414
              throw new InvalidRequest("4863", 
1415
                  "The identifier is already in use by an existing object.");
1416
          
1417
          }
1418
          
1419
          // insert the system metadata into the object store
1420
          logMetacat.debug("Starting to insert SystemMetadata...");
1421
          try {
1422
              //for the object whose authoriative mn is v1. we need reset the modification date.
1423
              //d1-sync already set the serial version. so we don't need do again.
1424
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1425
              String version = checker.getVersion("MNStorage");
1426
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1427
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1428
              }
1429
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1430
              
1431
          } catch (RuntimeException e) {
1432
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1433
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1434
                  e.getClass() + ": " + e.getMessage());
1435
              
1436
          }
1437
          
1438
      } catch (RuntimeException e) {
1439
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1440
                  e.getClass() + ": " + e.getMessage());
1441
          
1442
      }  finally {
1443
          lock.unlock();
1444
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1445
          
1446
      }
1447

    
1448
      
1449
      logMetacat.debug("Returning from registerSystemMetadata");
1450
      
1451
      try {
1452
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1453
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1454
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1455
    	          localId, "registerSystemMetadata");
1456
      } catch (McdbDocNotFoundException e) {
1457
    	  // do nothing, no localId to log with
1458
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1459
      } catch (SQLException ee) {
1460
          // do nothing, no localId to log with
1461
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1462
      }
1463
      
1464
      
1465
      return pid;
1466
  }
1467
  
1468
  /**
1469
   * Given an optional scope and format, reserves and returns an identifier 
1470
   * within that scope and format that is unique and will not be 
1471
   * used by any other sessions. 
1472
    * 
1473
   * @param session - the Session object containing the credentials for the Subject
1474
   * @param pid - The identifier of the object to register the system metadata against
1475
   * @param scope - An optional string to be used to qualify the scope of 
1476
   *                the identifier namespace, which is applied differently 
1477
   *                depending on the format requested. If scope is not 
1478
   *                supplied, a default scope will be used.
1479
   * @param format - The optional name of the identifier format to be used, 
1480
   *                  drawn from a DataONE-specific vocabulary of identifier 
1481
   *                 format names, including several common syntaxes such 
1482
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1483
   *                 format is not supplied by the caller, the CN service 
1484
   *                 will use a default identifier format, which may change 
1485
   *                 over time.
1486
   * 
1487
   * @return true if the registration succeeds
1488
   * 
1489
   * @throws InvalidToken
1490
   * @throws ServiceFailure
1491
   * @throws NotAuthorized
1492
   * @throws IdentifierNotUnique
1493
   * @throws NotImplemented
1494
   */
1495
  @Override
1496
  public Identifier reserveIdentifier(Session session, Identifier pid)
1497
  throws InvalidToken, ServiceFailure,
1498
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1499

    
1500
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1501
  }
1502
  
1503
  @Override
1504
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1505
  throws InvalidToken, ServiceFailure,
1506
        NotAuthorized, NotImplemented, InvalidRequest {
1507
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1508
  }
1509
  
1510
  /**
1511
    * Checks whether the pid is reserved by the subject in the session param
1512
    * If the reservation is held on the pid by the subject, we return true.
1513
    * 
1514
   * @param session - the Session object containing the Subject
1515
   * @param pid - The identifier to check
1516
   * 
1517
   * @return true if the reservation exists for the subject/pid
1518
   * 
1519
   * @throws InvalidToken
1520
   * @throws ServiceFailure
1521
   * @throws NotFound - when the pid is not found (in use or in reservation)
1522
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1523
   * @throws IdentifierNotUnique - when the pid is in use
1524
   * @throws NotImplemented
1525
   */
1526

    
1527
  @Override
1528
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1529
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1530
      NotImplemented, InvalidRequest {
1531
  
1532
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1533
  }
1534

    
1535
  /**
1536
   * Changes ownership (RightsHolder) of the specified object to the 
1537
   * subject specified by userId
1538
    * 
1539
   * @param session - the Session object containing the credentials for the Subject
1540
   * @param pid - Identifier of the object to be modified
1541
   * @param userId - The subject that will be taking ownership of the specified object.
1542
   *
1543
   * @return pid - the identifier of the modified object
1544
   * 
1545
   * @throws ServiceFailure
1546
   * @throws InvalidToken
1547
   * @throws NotFound
1548
   * @throws NotAuthorized
1549
   * @throws NotImplemented
1550
   * @throws InvalidRequest
1551
   */  
1552
  @Override
1553
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1554
      long serialVersion)
1555
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1556
      NotImplemented, InvalidRequest, VersionMismatch {
1557
      
1558
      // The lock to be used for this identifier
1559
      Lock lock = null;
1560
      
1561
      // do we have a valid pid?
1562
      if (pid == null || pid.getValue().trim().equals("")) {
1563
          throw new InvalidRequest("4442", "The provided identifier was invalid.");
1564
          
1565
      }
1566

    
1567
      // get the subject
1568
      Subject subject = session.getSubject();
1569
      
1570
      String serviceFailureCode = "4490";
1571
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1572
      if(sid != null) {
1573
          pid = sid;
1574
      }
1575
      
1576
      // are we allowed to do this?
1577
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1578
          throw new NotAuthorized("4440", "not allowed by "
1579
                  + subject.getValue() + " on " + pid.getValue());
1580
          
1581
      }
1582
      
1583
      SystemMetadata systemMetadata = null;
1584
      try {
1585
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1586
          lock.lock();
1587
          logMetacat.debug("Locked identifier " + pid.getValue());
1588

    
1589
          try {
1590
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1591
              
1592
              if(systemMetadata == null) {
1593
                  throw new NotFound("4460", "The object "+pid.getValue()+" doesn't exist in the node.");
1594
              }
1595
              // does the request have the most current system metadata?
1596
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1597
                 String msg = "The requested system metadata version number " + 
1598
                     serialVersion + " differs from the current version at " +
1599
                     systemMetadata.getSerialVersion().longValue() +
1600
                     ". Please get the latest copy in order to modify it.";
1601
                 throw new VersionMismatch("4443", msg);
1602
              }
1603
              
1604
              //only apply to the object whose authoritative member node is v1.
1605
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1606
              String version = checker.getVersion("MNStorage");
1607
              if(version == null) {
1608
                  throw new ServiceFailure("4490", "Couldn't determine the authoritative member node storage version for the pid "+pid.getValue());
1609
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1610
                  //we don't apply this method to an object whose authoritative node is v2
1611
                  throw new NotAuthorized("4440", V2V1MISSMATCH);
1612
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1613
                  //we don't understand this version (it is not v1 or v2)
1614
                  throw new InvalidRequest("4442", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1615
              }
1616
              
1617
              
1618
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1619
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1620
              
1621
          }
1622
              
1623
          // set the new rights holder
1624
          systemMetadata.setRightsHolder(userId);
1625
          
1626
          // update the metadata
1627
          try {
1628
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1629
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1630
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1631
              notifyReplicaNodes(systemMetadata);
1632
              
1633
          } catch (RuntimeException e) {
1634
              throw new ServiceFailure("4490", e.getMessage());
1635
          
1636
          }
1637
          
1638
      } catch (RuntimeException e) {
1639
          throw new ServiceFailure("4490", e.getMessage());
1640
          
1641
      } finally {
1642
          lock.unlock();
1643
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1644
      
1645
      }
1646
      
1647
      return pid;
1648
  }
1649

    
1650
  /**
1651
   * Verify that a replication task is authorized by comparing the target node's
1652
   * Subject (from the X.509 certificate-derived Session) with the list of 
1653
   * subjects in the known, pending replication tasks map.
1654
   * 
1655
   * @param originatingNodeSession - Session information that contains the 
1656
   *                                 identity of the calling user
1657
   * @param targetNodeSubject - Subject identifying the target node
1658
   * @param pid - the identifier of the object to be replicated
1659
   * @param replicatePermission - the execute permission to be granted
1660
   * 
1661
   * @throws ServiceFailure
1662
   * @throws NotImplemented
1663
   * @throws InvalidToken
1664
   * @throws NotAuthorized
1665
   * @throws InvalidRequest
1666
   * @throws NotFound
1667
   */
1668
  @Override
1669
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1670
    Subject targetNodeSubject, Identifier pid) 
1671
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1672
    NotFound, InvalidRequest {
1673
    
1674
    boolean isAllowed = false;
1675
    SystemMetadata sysmeta = null;
1676
    NodeReference targetNode = null;
1677
    
1678
    try {
1679
      // get the target node reference from the nodes list
1680
      CNode cn = D1Client.getCN();
1681
      List<Node> nodes = cn.listNodes().getNodeList();
1682
      
1683
      if ( nodes != null ) {
1684
        for (Node node : nodes) {
1685
            
1686
        	if (node.getSubjectList() != null) {
1687
        		
1688
	            for (Subject nodeSubject : node.getSubjectList()) {
1689
	            	
1690
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1691
	                    targetNode = node.getIdentifier();
1692
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1693
	                    break;
1694
	                }
1695
	            }
1696
        	}
1697
            
1698
            if ( targetNode != null) { break; }
1699
        }
1700
        
1701
      } else {
1702
          String msg = "Couldn't get the node list from the CN";
1703
          logMetacat.debug(msg);
1704
          throw new ServiceFailure("4872", msg);
1705
          
1706
      }
1707
      
1708
      // can't find a node listed with the given subject
1709
      if ( targetNode == null ) {
1710
          String msg = "There is no Member Node registered with a node subject " +
1711
              "matching " + targetNodeSubject.getValue();
1712
          logMetacat.info(msg);
1713
          throw new NotAuthorized("4871", msg);
1714
          
1715
      }
1716
      
1717
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1718
      
1719
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1720

    
1721
      if ( sysmeta != null ) {
1722
          
1723
          List<Replica> replicaList = sysmeta.getReplicaList();
1724
          
1725
          if ( replicaList != null ) {
1726
              
1727
              // find the replica with the status set to 'requested'
1728
              for (Replica replica : replicaList) {
1729
                  ReplicationStatus status = replica.getReplicationStatus();
1730
                  NodeReference listedNode = replica.getReplicaMemberNode();
1731
                  if ( listedNode != null && targetNode != null ) {
1732
                      logMetacat.debug("Comparing " + listedNode.getValue()
1733
                              + " to " + targetNode.getValue());
1734
                      
1735
                      if (listedNode.getValue().equals(targetNode.getValue())
1736
                              && status.equals(ReplicationStatus.REQUESTED)) {
1737
                          isAllowed = true;
1738
                          break;
1739

    
1740
                      }
1741
                  }
1742
              }
1743
          }
1744
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1745
              "to replicate: " + isAllowed + " for " + pid.getValue());
1746

    
1747
          
1748
      } else {
1749
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1750
          " is null.");
1751
          String error ="";
1752
          String localId = null;
1753
          try {
1754
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1755
            
1756
           } catch (Exception e) {
1757
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1758
          }
1759
          
1760
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1761
              error = DELETEDMESSAGE;
1762
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1763
              error = DELETEDMESSAGE;
1764
          }
1765
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1766
          
1767
      }
1768

    
1769
    } catch (RuntimeException e) {
1770
    	  ServiceFailure sf = new ServiceFailure("4872", 
1771
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1772
                e.getMessage());
1773
    	  sf.initCause(e);
1774
        throw sf;
1775
        
1776
    }
1777
      
1778
    return isAllowed;
1779
    
1780
  }
1781

    
1782
  /**
1783
   * Adds a new object to the Node, where the object is a science metadata object.
1784
   * 
1785
   * @param session - the Session object containing the credentials for the Subject
1786
   * @param pid - The object identifier to be created
1787
   * @param object - the object bytes
1788
   * @param sysmeta - the system metadata that describes the object  
1789
   * 
1790
   * @return pid - the object identifier created
1791
   * 
1792
   * @throws InvalidToken
1793
   * @throws ServiceFailure
1794
   * @throws NotAuthorized
1795
   * @throws IdentifierNotUnique
1796
   * @throws UnsupportedType
1797
   * @throws InsufficientResources
1798
   * @throws InvalidSystemMetadata
1799
   * @throws NotImplemented
1800
   * @throws InvalidRequest
1801
   */
1802
  public Identifier create(Session session, Identifier pid, InputStream object,
1803
    SystemMetadata sysmeta) 
1804
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1805
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1806
    NotImplemented, InvalidRequest {
1807
       
1808
   // verify the pid is valid format
1809
      if (!isValidIdentifier(pid)) {
1810
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1811
      }
1812
      // The lock to be used for this identifier
1813
      Lock lock = null;
1814

    
1815
      try {
1816
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1817
          lock.lock();
1818
          // are we allowed?
1819
          boolean isAllowed = false;
1820
          isAllowed = isAdminAuthorized(session);
1821
          
1822
          // additional check if it is the authoritative node if it is not the admin
1823
          if(!isAllowed) {
1824
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1825
          }
1826

    
1827
          // proceed if we're called by a CN
1828
          if ( isAllowed ) {
1829
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1830
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1831
              Identifier sid = sysmeta.getSeriesId();
1832
              if(sid != null) {
1833
                  if (!isValidIdentifier(sid)) {
1834
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1835
                  }
1836
              }
1837
              // create the coordinating node version of the document      
1838
              logMetacat.debug("Locked identifier " + pid.getValue());
1839
              sysmeta.setSerialVersion(BigInteger.ONE);
1840
              //for the object whose authoritative mn is v1. we need reset the modification date.
1841
              //for the object whose authoritative mn is v2. we just accept the modification date.
1842
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1843
              String version = checker.getVersion("MNStorage");
1844
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1845
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1846
              }
1847
              //sysmeta.setArchived(false); // this is a create op, not update
1848
              
1849
              // the CN should have set the origin and authoritative member node fields
1850
              try {
1851
                  sysmeta.getOriginMemberNode().getValue();
1852
                  sysmeta.getAuthoritativeMemberNode().getValue();
1853
                  
1854
              } catch (NullPointerException npe) {
1855
                  throw new InvalidSystemMetadata("4896", 
1856
                      "Both the origin and authoritative member node identifiers need to be set.");
1857
                  
1858
              }
1859
              pid = super.create(session, pid, object, sysmeta);
1860

    
1861
          } else {
1862
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1863
                  " isn't allowed to call create() on a Coordinating Node.";
1864
              logMetacat.info(msg);
1865
              throw new NotAuthorized("1100", msg);
1866
          }
1867
          
1868
      } catch (RuntimeException e) {
1869
          // Convert Hazelcast runtime exceptions to service failures
1870
          String msg = "There was a problem creating the object identified by " +
1871
              pid.getValue() + ". There error message was: " + e.getMessage();
1872
          throw new ServiceFailure("4893", msg);
1873
          
1874
      } finally {
1875
    	  if (lock != null) {
1876
	          lock.unlock();
1877
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1878
    	  }
1879
      }
1880
      
1881
      return pid;
1882

    
1883
  }
1884

    
1885
  /**
1886
   * Set access for a given object using the object identifier and a Subject
1887
   * under a given Session.
1888
   * This method only applies the objects whose authoritative mn is a v1 node.
1889
   * @param session - the Session object containing the credentials for the Subject
1890
   * @param pid - the object identifier for the given object to apply the policy
1891
   * @param policy - the access policy to be applied
1892
   * 
1893
   * @return true if the application of the policy succeeds
1894
   * @throws InvalidToken
1895
   * @throws ServiceFailure
1896
   * @throws NotFound
1897
   * @throws NotAuthorized
1898
   * @throws NotImplemented
1899
   * @throws InvalidRequest
1900
   */
1901
  public boolean setAccessPolicy(Session session, Identifier pid, 
1902
      AccessPolicy accessPolicy, long serialVersion) 
1903
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1904
      NotImplemented, InvalidRequest, VersionMismatch {
1905
      
1906
   // do we have a valid pid?
1907
      if (pid == null || pid.getValue().trim().equals("")) {
1908
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1909
          
1910
      }
1911
      
1912
      String serviceFailureCode = "4430";
1913
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1914
      if(sid != null) {
1915
          pid = sid;
1916
      }
1917
      // The lock to be used for this identifier
1918
      Lock lock = null;
1919
      SystemMetadata systemMetadata = null;
1920
      
1921
      boolean success = false;
1922
      
1923
      // get the subject
1924
      Subject subject = session.getSubject();
1925
      
1926
      // are we allowed to do this?
1927
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1928
          throw new NotAuthorized("4420", "not allowed by "
1929
                  + subject.getValue() + " on " + pid.getValue());
1930
      }
1931
      
1932
      try {
1933
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1934
          lock.lock();
1935
          logMetacat.debug("Locked identifier " + pid.getValue());
1936

    
1937
          try {
1938
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1939

    
1940
              if ( systemMetadata == null ) {
1941
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1942
                  
1943
              }
1944
              // does the request have the most current system metadata?
1945
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1946
                 String msg = "The requested system metadata version number " + 
1947
                     serialVersion + " differs from the current version at " +
1948
                     systemMetadata.getSerialVersion().longValue() +
1949
                     ". Please get the latest copy in order to modify it.";
1950
                 throw new VersionMismatch("4402", msg);
1951
                 
1952
              }
1953
              
1954
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1955
              String version = checker.getVersion("MNStorage");
1956
              if(version == null) {
1957
                  throw new ServiceFailure("4430", "Couldn't determine the authoritative member node storage version for the pid "+pid.getValue());
1958
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1959
                  //we don't apply this method to an object whose authoritative node is v2
1960
                  throw new NotAuthorized("4420", V2V1MISSMATCH);
1961
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1962
                  //we don't understand this version (it is not v1 or v2)
1963
                  throw new InvalidRequest("4402", "The version of the MNStorage is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1964
              }
1965
              
1966
          } catch (RuntimeException e) {
1967
              // convert Hazelcast RuntimeException to NotFound
1968
              throw new NotFound("4400", "No record found for: " + pid);
1969
            
1970
          }
1971
              
1972
          // set the access policy
1973
          systemMetadata.setAccessPolicy(accessPolicy);
1974
          
1975
          // update the system metadata
1976
          try {
1977
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1978
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1979
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1980
              notifyReplicaNodes(systemMetadata);
1981
              
1982
          } catch (RuntimeException e) {
1983
              // convert Hazelcast RuntimeException to ServiceFailure
1984
              throw new ServiceFailure("4430", e.getMessage());
1985
            
1986
          }
1987
          
1988
      } catch (RuntimeException e) {
1989
          throw new ServiceFailure("4430", e.getMessage());
1990
          
1991
      } finally {
1992
          lock.unlock();
1993
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1994
        
1995
      }
1996

    
1997
    
1998
    // TODO: how do we know if the map was persisted?
1999
    success = true;
2000
    
2001
    return success;
2002
  }
2003

    
2004
  /**
2005
   * Full replacement of replication metadata in the system metadata for the 
2006
   * specified object, changes date system metadata modified
2007
   * 
2008
   * @param session - the Session object containing the credentials for the Subject
2009
   * @param pid - the object identifier for the given object to apply the policy
2010
   * @param replica - the replica to be updated
2011
   * @return
2012
   * @throws NotImplemented
2013
   * @throws NotAuthorized
2014
   * @throws ServiceFailure
2015
   * @throws InvalidRequest
2016
   * @throws NotFound
2017
   * @throws VersionMismatch
2018
   */
2019
  @Override
2020
  public boolean updateReplicationMetadata(Session session, Identifier pid,
2021
      Replica replica, long serialVersion) 
2022
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
2023
      NotFound, VersionMismatch {
2024
      
2025
      // The lock to be used for this identifier
2026
      Lock lock = null;
2027
      
2028
      // get the subject
2029
      Subject subject = session.getSubject();
2030
      
2031
      // are we allowed to do this?
2032
      if(session == null) {
2033
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
2034
      } else {
2035
          if(!isCNAdmin(session)) {
2036
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
2037
        }
2038
      }
2039
      /*try {
2040

    
2041
          // what is the controlling permission?
2042
          if (!isAuthorized(session, pid, Permission.WRITE)) {
2043
              throw new NotAuthorized("4851", "not allowed by "
2044
                      + subject.getValue() + " on " + pid.getValue());
2045
          }
2046

    
2047
        
2048
      } catch (InvalidToken e) {
2049
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
2050
                  " on " + pid.getValue());  
2051
          
2052
      }*/
2053

    
2054
      SystemMetadata systemMetadata = null;
2055
      try {
2056
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2057
          lock.lock();
2058
          logMetacat.debug("Locked identifier " + pid.getValue());
2059

    
2060
          try {      
2061
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2062

    
2063
              // does the request have the most current system metadata?
2064
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2065
                 String msg = "The requested system metadata version number " + 
2066
                     serialVersion + " differs from the current version at " +
2067
                     systemMetadata.getSerialVersion().longValue() +
2068
                     ". Please get the latest copy in order to modify it.";
2069
                 throw new VersionMismatch("4855", msg);
2070
              }
2071
              
2072
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
2073
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
2074
                  " : " + e.getMessage());
2075
            
2076
          }
2077
              
2078
          // set the status for the replica
2079
          List<Replica> replicas = systemMetadata.getReplicaList();
2080
          NodeReference replicaNode = replica.getReplicaMemberNode();
2081
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2082
          int index = 0;
2083
          for (Replica listedReplica: replicas) {
2084
              
2085
              // remove the replica that we are replacing
2086
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2087
                      // don't allow status to change from COMPLETED to anything other
2088
                      // than INVALIDATED: prevents overwrites from race conditions
2089
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2090
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2091
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2092
                	  throw new InvalidRequest("4853", "Status state change from " +
2093
                			  listedReplica.getReplicationStatus() + " to " +
2094
                			  replicaStatus.toString() + "is prohibited for identifier " +
2095
                			  pid.getValue() + " and target node " + 
2096
                			  listedReplica.getReplicaMemberNode().getValue());
2097

    
2098
            	  }
2099
                  replicas.remove(index);
2100
                  break;
2101
                  
2102
              }
2103
              index++;
2104
          }
2105
          
2106
          // add the new replica item
2107
          replicas.add(replica);
2108
          systemMetadata.setReplicaList(replicas);
2109
          
2110
          // update the metadata
2111
          try {
2112
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2113
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2114
              // update the modified date for changes to the replica list
2115
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2116
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2117
              
2118
              // inform replica nodes of the change if the status is complete
2119
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2120
            	  notifyReplicaNodes(systemMetadata);
2121
            	  
2122
              }
2123
          } catch (RuntimeException e) {
2124
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2125
              throw new ServiceFailure("4852", e.getMessage());
2126
          
2127
          }
2128
          
2129
      } catch (RuntimeException e) {
2130
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2131
          throw new ServiceFailure("4852", e.getMessage());
2132
      
2133
      } finally {
2134
          lock.unlock();
2135
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2136
          
2137
      }
2138
    
2139
      return true;
2140
      
2141
  }
2142
  
2143
  /**
2144
   * 
2145
   */
2146
  @Override
2147
  public ObjectList listObjects(Session session, Date startTime, 
2148
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2149
      Integer start, Integer count)
2150
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2151
      ServiceFailure {
2152

    
2153
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2154
  }
2155

    
2156
  
2157
 	/**
2158
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2159
 	 * @return cal  the list of checksum algorithms
2160
 	 * 
2161
 	 * @throws ServiceFailure
2162
 	 * @throws NotImplemented
2163
 	 */
2164
  @Override
2165
  public ChecksumAlgorithmList listChecksumAlgorithms()
2166
			throws ServiceFailure, NotImplemented {
2167
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2168
		cal.addAlgorithm("MD5");
2169
		cal.addAlgorithm("SHA-1");
2170
		return cal;
2171
		
2172
	}
2173

    
2174
  /**
2175
   * Notify replica Member Nodes of system metadata changes for a given pid
2176
   * 
2177
   * @param currentSystemMetadata - the up to date system metadata
2178
   */
2179
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2180
      
2181
      Session session = null;
2182
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2183
      //MNode mn = null;
2184
      NodeReference replicaNodeRef = null;
2185
      CNode cn = null;
2186
      NodeType nodeType = null;
2187
      List<Node> nodeList = null;
2188
      
2189
      try {
2190
          cn = D1Client.getCN();
2191
          nodeList = cn.listNodes().getNodeList();
2192
          
2193
      } catch (Exception e) { // handle BaseException and other I/O issues
2194
          
2195
          // swallow errors since the call is not critical
2196
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2197
              "to communication issues with the CN: " + e.getMessage());
2198
          
2199
      }
2200
      
2201
      if ( replicaList != null ) {
2202
          
2203
          // iterate through the replicas and inform  MN replica nodes
2204
          for (Replica replica : replicaList) {
2205
              String replicationVersion = null;
2206
              replicaNodeRef = replica.getReplicaMemberNode();
2207
              try {
2208
                  if (nodeList != null) {
2209
                      // find the node type
2210
                      for (Node node : nodeList) {
2211
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2212
                              nodeType = node.getType();
2213
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2214
                              replicationVersion = checker.getVersion("MNRead");
2215
                              break;
2216
              
2217
                          }
2218
                      }
2219
                  }
2220
              
2221
                  // notify only MNs
2222
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2223
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2224
                          //connect to a v2 mn
2225
                          MNode mn = D1Client.getMN(replicaNodeRef);
2226
                          mn.systemMetadataChanged(session, 
2227
                              currentSystemMetadata.getIdentifier(), 
2228
                              currentSystemMetadata.getSerialVersion().longValue(),
2229
                              currentSystemMetadata.getDateSysMetadataModified());
2230
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2231
                          //connect to a v1 mn
2232
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2233
                          mn.systemMetadataChanged(session, 
2234
                                  currentSystemMetadata.getIdentifier(), 
2235
                                  currentSystemMetadata.getSerialVersion().longValue(),
2236
                                  currentSystemMetadata.getDateSysMetadataModified());
2237
                      }
2238
                      
2239
                  }
2240
              
2241
              } catch (Exception e) { // handle BaseException and other I/O issues
2242
              
2243
                  // swallow errors since the call is not critical
2244
                  logMetacat.error("Can't inform "
2245
                          + replicaNodeRef.getValue()
2246
                          + " of system metadata changes due "
2247
                          + "to communication issues with the CN: "
2248
                          + e.getMessage());
2249
              
2250
              }
2251
          }
2252
      }
2253
  }
2254
  
2255
  /**
2256
   * Update the system metadata of the specified pid.
2257
   * Note: the serial version and the replica list in the new system metadata will be ignored and the old values will be kept.
2258
   */
2259
  @Override
2260
  public boolean updateSystemMetadata(Session session, Identifier pid,
2261
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2262
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2263
   if(sysmeta == null) {
2264
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2265
   }
2266
   if(pid == null || pid.getValue() == null) {
2267
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2268
   }
2269

    
2270
   if (session == null) {
2271
       //TODO: many of the thrown exceptions do not use the correct error codes
2272
       //check these against the docs and correct them
2273
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2274
               "  If you are not logged in, please do so and retry the request.");
2275
   } else {
2276
         //only CN is allwoed
2277
         if(!isCNAdmin(session)) {
2278
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2279
         }
2280
   }
2281

    
2282
    //update the system metadata locally
2283
    boolean success = false;
2284
    try {
2285
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2286
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2287
       
2288
        if(currentSysmeta == null) {
2289
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2290
        }
2291
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2292
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2293
        sysmeta.setSerialVersion(currentSerialVersion);
2294
        List<Replica> replicas = currentSysmeta.getReplicaList();
2295
        sysmeta.setReplicaList(replicas);
2296
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2297
        boolean fromCN = true;
2298
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta, fromCN);
2299
    } finally {
2300
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2301
    }
2302
    return success;
2303
  }
2304
  
2305
    @Override
2306
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2307
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2308

    
2309
    }
2310

    
2311
	@Override
2312
	public QueryEngineDescription getQueryEngineDescription(Session session,
2313
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2314
			NotImplemented, NotFound {
2315
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2316

    
2317
	}
2318
	
2319
	@Override
2320
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2321
			ServiceFailure, NotAuthorized, NotImplemented {
2322
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2323

    
2324
	}
2325
	
2326
	@Override
2327
	public InputStream query(Session session, String queryEngine, String query)
2328
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2329
			NotImplemented, NotFound {
2330
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2331

    
2332
	}
2333
	
2334
	@Override
2335
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2336
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2337
	}
2338

    
2339
}
(1-1/8)