Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.commons.io.IOUtils;
43
import org.apache.log4j.Logger;
44
import org.dataone.client.v2.CNode;
45
import org.dataone.client.v2.MNode;
46
import org.dataone.client.v2.itk.D1Client;
47
import org.dataone.exceptions.MarshallingException;
48
import org.dataone.service.cn.v2.CNAuthorization;
49
import org.dataone.service.cn.v2.CNCore;
50
import org.dataone.service.cn.v2.CNRead;
51
import org.dataone.service.cn.v2.CNReplication;
52
import org.dataone.service.cn.v2.CNView;
53
import org.dataone.service.exceptions.BaseException;
54
import org.dataone.service.exceptions.IdentifierNotUnique;
55
import org.dataone.service.exceptions.InsufficientResources;
56
import org.dataone.service.exceptions.InvalidRequest;
57
import org.dataone.service.exceptions.InvalidSystemMetadata;
58
import org.dataone.service.exceptions.InvalidToken;
59
import org.dataone.service.exceptions.NotAuthorized;
60
import org.dataone.service.exceptions.NotFound;
61
import org.dataone.service.exceptions.NotImplemented;
62
import org.dataone.service.exceptions.ServiceFailure;
63
import org.dataone.service.exceptions.UnsupportedType;
64
import org.dataone.service.exceptions.VersionMismatch;
65
import org.dataone.service.types.v1.AccessPolicy;
66
import org.dataone.service.types.v1.Checksum;
67
import org.dataone.service.types.v1.ChecksumAlgorithmList;
68
import org.dataone.service.types.v1.Event;
69
import org.dataone.service.types.v1.Identifier;
70
import org.dataone.service.types.v1.NodeReference;
71
import org.dataone.service.types.v1.NodeType;
72
import org.dataone.service.types.v1.ObjectFormatIdentifier;
73
import org.dataone.service.types.v1.ObjectList;
74
import org.dataone.service.types.v1.ObjectLocationList;
75
import org.dataone.service.types.v1.Permission;
76
import org.dataone.service.types.v1.Replica;
77
import org.dataone.service.types.v1.ReplicationPolicy;
78
import org.dataone.service.types.v1.ReplicationStatus;
79
import org.dataone.service.types.v1.Session;
80
import org.dataone.service.types.v1.Subject;
81
import org.dataone.service.types.v1_1.QueryEngineDescription;
82
import org.dataone.service.types.v1_1.QueryEngineList;
83
import org.dataone.service.types.v2.Node;
84
import org.dataone.service.types.v2.NodeList;
85
import org.dataone.service.types.v2.ObjectFormat;
86
import org.dataone.service.types.v2.ObjectFormatList;
87
import org.dataone.service.types.v2.SystemMetadata;
88
import org.dataone.service.util.TypeMarshaller;
89

    
90

    
91
import edu.ucsb.nceas.metacat.DBUtil;
92
import edu.ucsb.nceas.metacat.EventLog;
93
import edu.ucsb.nceas.metacat.IdentifierManager;
94
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
95
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
96
import edu.ucsb.nceas.metacat.properties.PropertyService;
97
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
98

    
99
/**
100
 * Represents Metacat's implementation of the DataONE Coordinating Node 
101
 * service API. Methods implement the various CN* interfaces, and methods common
102
 * to both Member Node and Coordinating Node interfaces are found in the
103
 * D1NodeService super class.
104
 *
105
 */
106
public class CNodeService extends D1NodeService implements CNAuthorization,
107
    CNCore, CNRead, CNReplication, CNView {
108

    
109
  /* the logger instance */
110
  private Logger logMetacat = null;
111
  public final static String V2V1MISSMATCH = "The Coordinating Node is not authorized to make systemMetadata changes on this object. Please make changes directly on the authoritative Member Node.";
112

    
113
  /**
114
   * singleton accessor
115
   */
116
  public static CNodeService getInstance(HttpServletRequest request) { 
117
    return new CNodeService(request);
118
  }
119
  
120
  /**
121
   * Constructor, private for singleton access
122
   */
123
  private CNodeService(HttpServletRequest request) {
124
    super(request);
125
    logMetacat = Logger.getLogger(CNodeService.class);
126
        
127
  }
128
    
129
  /**
130
   * Set the replication policy for an object given the object identifier
131
   * It only is applied to objects whose authoritative mn is a v1 node.
132
   * @param session - the Session object containing the credentials for the Subject
133
   * @param pid - the object identifier for the given object
134
   * @param policy - the replication policy to be applied
135
   * 
136
   * @return true or false
137
   * 
138
   * @throws NotImplemented
139
   * @throws NotAuthorized
140
   * @throws ServiceFailure
141
   * @throws InvalidRequest
142
   * @throws VersionMismatch
143
   * 
144
   */
145
  @Override
146
  public boolean setReplicationPolicy(Session session, Identifier pid,
147
      ReplicationPolicy policy, long serialVersion) 
148
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
149
      InvalidRequest, InvalidToken, VersionMismatch {
150
      
151
      // do we have a valid pid?
152
      if (pid == null || pid.getValue().trim().equals("")) {
153
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
154
          
155
      }
156
      
157
      //only allow pid to be passed
158
      String serviceFailure = "4882";
159
      String notFound = "4884";
160
      checkV1SystemMetaPidExist(pid, serviceFailure, "The object for given PID "+pid.getValue()+" couldn't be identified if it exists",  notFound, 
161
              "No object could be found for given PID: "+pid.getValue());
162
      
163
      /*String serviceFailureCode = "4882";
164
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
165
      if(sid != null) {
166
          pid = sid;
167
      }*/
168
      // The lock to be used for this identifier
169
      Lock lock = null;
170
      
171
      // get the subject
172
      Subject subject = session.getSubject();
173
      
174
      // are we allowed to do this?
175
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
176
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
177
                  + " not allowed by " + subject.getValue() + " on "
178
                  + pid.getValue());
179
          
180
      }
181
      
182
      SystemMetadata systemMetadata = null;
183
      try {
184
          lock = HazelcastService.getInstance().getLock(pid.getValue());
185
          lock.lock();
186
          logMetacat.debug("Locked identifier " + pid.getValue());
187

    
188
          try {
189
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
190
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
191
                  
192
              }
193
              
194
              // did we get it correctly?
195
              if ( systemMetadata == null ) {
196
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
197
                  
198
              }
199
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
200
              String version = checker.getVersion("MNRead");
201
              if(version == null) {
202
                  throw new ServiceFailure("4882", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
203
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
204
                  //we don't apply this method to an object whose authoritative node is v2
205
                  throw new NotAuthorized("4881", V2V1MISSMATCH);
206
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
207
                  //we don't understand this version (it is not v1 or v2)
208
                  throw new InvalidRequest("4883", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
209
              }
210
              // does the request have the most current system metadata?
211
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
212
                 String msg = "The requested system metadata version number " + 
213
                     serialVersion + " differs from the current version at " +
214
                     systemMetadata.getSerialVersion().longValue() +
215
                     ". Please get the latest copy in order to modify it.";
216
                 throw new VersionMismatch("4886", msg);
217
                 
218
              }
219
              
220
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
221
              throw new NotFound("4884", "No record found for: " + pid.getValue());
222
            
223
          }
224
          
225
          // set the new policy
226
          systemMetadata.setReplicationPolicy(policy);
227
          
228
          // update the metadata
229
          try {
230
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
231
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
232
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
233
              notifyReplicaNodes(systemMetadata);
234
              
235
          } catch (RuntimeException e) {
236
              throw new ServiceFailure("4882", e.getMessage());
237
          
238
          }
239
          
240
      } catch (RuntimeException e) {
241
          throw new ServiceFailure("4882", e.getMessage());
242
          
243
      } finally {
244
          lock.unlock();
245
          logMetacat.debug("Unlocked identifier " + pid.getValue());
246
          
247
      }
248
    
249
      return true;
250
  }
251

    
252
  /**
253
   * Deletes the replica from the given Member Node
254
   * NOTE: MN.delete() may be an "archive" operation. TBD.
255
   * @param session
256
   * @param pid
257
   * @param nodeId
258
   * @param serialVersion
259
   * @return
260
   * @throws InvalidToken
261
   * @throws ServiceFailure
262
   * @throws NotAuthorized
263
   * @throws NotFound
264
   * @throws NotImplemented
265
   * @throws VersionMismatch
266
   */
267
  @Override
268
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
269
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
270
	  
271
	  	// The lock to be used for this identifier
272
		Lock lock = null;
273

    
274
		// get the subject
275
		Subject subject = session.getSubject();
276

    
277
		// are we allowed to do this?
278
		/*boolean isAuthorized = false;
279
		try {
280
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
281
		} catch (InvalidRequest e) {
282
			throw new ServiceFailure("4882", e.getDescription());
283
		}
284
		if (!isAuthorized) {
285
			throw new NotAuthorized("4881", Permission.WRITE
286
					+ " not allowed by " + subject.getValue() + " on "
287
					+ pid.getValue());
288

    
289
		}*/
290
		if(session == null) {
291
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
292
		} else {
293
		    if(!isCNAdmin(session)) {
294
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
295
		    }
296
		}
297

    
298
		SystemMetadata systemMetadata = null;
299
		try {
300
			lock = HazelcastService.getInstance().getLock(pid.getValue());
301
			lock.lock();
302
			logMetacat.debug("Locked identifier " + pid.getValue());
303

    
304
			try {
305
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
306
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
307
				}
308

    
309
				// did we get it correctly?
310
				if (systemMetadata == null) {
311
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
312
				}
313

    
314
				// does the request have the most current system metadata?
315
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
316
					String msg = "The requested system metadata version number "
317
							+ serialVersion
318
							+ " differs from the current version at "
319
							+ systemMetadata.getSerialVersion().longValue()
320
							+ ". Please get the latest copy in order to modify it.";
321
					throw new VersionMismatch("4886", msg);
322

    
323
				}
324

    
325
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
326
				throw new NotFound("4884", "No record found for: " + pid.getValue());
327

    
328
			}
329
			  
330
			// check permissions
331
			// TODO: is this necessary?
332
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
333
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
334
			if (!isAllowed) {
335
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
336
			}*/
337
			  
338
			// delete the replica from the given node
339
			// CSJ: use CN.delete() to truly delete a replica, semantically
340
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
341
			//D1Client.getMN(nodeId).delete(session, pid);
342
			  
343
			// reflect that change in the system metadata
344
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
345
			for (Replica r: systemMetadata.getReplicaList()) {
346
				  if (r.getReplicaMemberNode().equals(nodeId)) {
347
					  updatedReplicas.remove(r);
348
					  break;
349
				  }
350
			}
351
			systemMetadata.setReplicaList(updatedReplicas);
352

    
353
			// update the metadata
354
			try {
355
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
356
				//we don't need to update the modification date.
357
				//systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
358
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
359
			} catch (RuntimeException e) {
360
				throw new ServiceFailure("4882", e.getMessage());
361
			}
362

    
363
		} catch (RuntimeException e) {
364
			throw new ServiceFailure("4882", e.getMessage());
365
		} finally {
366
			lock.unlock();
367
			logMetacat.debug("Unlocked identifier " + pid.getValue());
368
		}
369

    
370
		return true;	  
371
	  
372
  }
373
  
374
  /**
375
   * Deletes an object from the Coordinating Node
376
   * 
377
   * @param session - the Session object containing the credentials for the Subject
378
   * @param pid - The object identifier to be deleted
379
   * 
380
   * @return pid - the identifier of the object used for the deletion
381
   * 
382
   * @throws InvalidToken
383
   * @throws ServiceFailure
384
   * @throws NotAuthorized
385
   * @throws NotFound
386
   * @throws NotImplemented
387
   * @throws InvalidRequest
388
   */
389
  @Override
390
  public Identifier delete(Session session, Identifier pid) 
391
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
392
      
393
      String localId = null;      // The corresponding docid for this pid
394
	  Lock lock = null;           // The lock to be used for this identifier
395
      CNode cn = null;            // a reference to the CN to get the node list    
396
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
397
      List<Node> nodeList = null; // the list of nodes in this CN environment
398

    
399
      // check for a valid session
400
      if (session == null) {
401
        	throw new InvalidToken("4963", "No session has been provided");
402
        	
403
      }
404

    
405
      // do we have a valid pid?
406
      if (pid == null || pid.getValue().trim().equals("")) {
407
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
408
          
409
      }
410
      
411
      String serviceFailureCode = "4962";
412
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
413
      if(sid != null) {
414
          pid = sid;
415
      }
416

    
417
	  // check that it is CN/admin
418
	  boolean allowed = isAdminAuthorized(session);
419
	  
420
	  // additional check if it is the authoritative node if it is not the admin
421
      if(!allowed) {
422
          allowed = isAuthoritativeMNodeAdmin(session, pid);
423
          
424
      }
425
	  
426
	  if (!allowed) {
427
		  String msg = "The subject " + session.getSubject().getValue() + 
428
			  " is not allowed to call delete() on a Coordinating Node.";
429
		  logMetacat.info(msg);
430
		  throw new NotAuthorized("4960", msg);
431
		  
432
	  }
433
	  
434
	  // Don't defer to superclass implementation without a locally registered identifier
435
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
436
      // Check for the existing identifier
437
      try {
438
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
439
          super.delete(session, pid);
440
          
441
      } catch (McdbDocNotFoundException e) {
442
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
443
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
444
    	  
445
          try {
446
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
447
  			  lock.lock();
448
  			  logMetacat.debug("Locked identifier " + pid.getValue());
449

    
450
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
451
			  if ( sysMeta != null ) {
452
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
453
				sysMeta.setArchived(true);
454
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
455
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
456
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
457
	            //since this is cn, we don't need worry about the mn solr index.
458
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
459
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
460
	            String username = session.getSubject().getValue();//just for logging purpose
461
                //since data objects were not registered in the identifier table, we use pid as the docid
462
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
463
				
464
			  } else {
465
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
466
					  ". Couldn't obtain the system metadata record.");
467
				  
468
			  }
469
			  
470
		  } catch (RuntimeException re) {
471
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
472
				  ". The error message was: " + re.getMessage());
473
			  
474
		  } finally {
475
			  lock.unlock();
476
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
477

    
478
		  }
479

    
480
          // NOTE: cannot log the delete without localId
481
//          EventLog.getInstance().log(request.getRemoteAddr(), 
482
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
483
//                  pid.getValue(), Event.DELETE.xmlValue());
484

    
485
      } catch (SQLException e) {
486
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
487
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
488
      }
489

    
490
      // get the node list
491
      try {
492
          cn = D1Client.getCN();
493
          nodeList = cn.listNodes().getNodeList();
494
          
495
      } catch (Exception e) { // handle BaseException and other I/O issues
496
          
497
          // swallow errors since the call is not critical
498
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
499
              " due to communication issues with the CN: " + e.getMessage());
500
          
501
      }
502

    
503
	  // notify the replicas
504
	  if (systemMetadata.getReplicaList() != null) {
505
		  for (Replica replica: systemMetadata.getReplicaList()) {
506
			  NodeReference replicaNode = replica.getReplicaMemberNode();
507
			  try {
508
                  if (nodeList != null) {
509
                      // find the node type
510
                      for (Node node : nodeList) {
511
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
512
                              nodeType = node.getType();
513
                              break;
514
              
515
                          }
516
                      }
517
                  }
518
                  
519
                  // only send call MN.delete() to avoid an infinite loop with the CN
520
                  if (nodeType != null && nodeType == NodeType.MN) {
521
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
522
                  }
523
                  
524
			  } catch (Exception e) {
525
				  // all we can really do is log errors and carry on with life
526
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
527
					  " from replica MN: " + replicaNode.getValue(), e);
528
			}
529
			  
530
		  }
531
	  }
532
	  
533
	  return pid;
534
      
535
  }
536
  
537
  /**
538
   * Archives an object from the Coordinating Node
539
   * 
540
   * @param session - the Session object containing the credentials for the Subject
541
   * @param pid - The object identifier to be deleted
542
   * 
543
   * @return pid - the identifier of the object used for the deletion
544
   * 
545
   * @throws InvalidToken
546
   * @throws ServiceFailure
547
   * @throws NotAuthorized
548
   * @throws NotFound
549
   * @throws NotImplemented
550
   * @throws InvalidRequest
551
   */
552
  @Override
553
  public Identifier archive(Session session, Identifier pid) 
554
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
555

    
556
      String localId = null; // The corresponding docid for this pid
557
	  Lock lock = null;      // The lock to be used for this identifier
558
      CNode cn = null;            // a reference to the CN to get the node list    
559
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
560
      List<Node> nodeList = null; // the list of nodes in this CN environment
561
      
562

    
563
      // check for a valid session
564
      if (session == null) {
565
        	throw new InvalidToken("4973", "No session has been provided");
566
        	
567
      }
568

    
569
      // do we have a valid pid?
570
      if (pid == null || pid.getValue().trim().equals("")) {
571
          throw new InvalidToken("4973", "The provided identifier was invalid.");
572
          
573
      }
574

    
575
	  // check that it is CN/admin
576
	  boolean allowed = isAdminAuthorized(session);
577
	  
578
	  String serviceFailureCode = "4972";
579
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
580
	  if(sid != null) {
581
	        pid = sid;
582
	  }
583
	  
584
	  //check if it is the authoritative member node
585
	  if(!allowed) {
586
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
587
	  }
588
	  
589
	  //check if the session has the change permission
590
	  if(!allowed) {
591
	      try {
592
	          allowed = userHasPermission(session, pid, Permission.CHANGE_PERMISSION);
593
	      } catch (InvalidRequest e) {
594
	          throw new InvalidToken("4973","CN.archive method couldn't determine if the client has the change permssion on this pid "+pid.getValue()+ " since "+e.getMessage());
595
	      }
596
	      
597
	  }
598
	  if (!allowed) {
599
		  String msg = "The subject " + session.getSubject().getValue() + 
600
				  " doesn't have the change permission to archive the object "+pid.getValue();
601
		  logMetacat.warn(msg);
602
		  throw new NotAuthorized("4970", msg);
603
	  }
604
	  
605
      try {
606
          HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
607
          logMetacat.debug("CNodeService.archive - lock the system metadata for "+pid.getValue());
608
          SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
609
          D1NodeVersionChecker checker = new D1NodeVersionChecker(sysMeta.getAuthoritativeMemberNode());
610
          String version = checker.getVersion("MNRead");
611
          if(version == null) {
612
              throw new ServiceFailure("4972", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
613
          } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
614
              //we don't apply this method to an object whose authoritative node is v2
615
              throw new NotAuthorized("4970", V2V1MISSMATCH);
616
          } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
617
              //we don't understand this version (it is not v1 or v2)
618
              throw new NotImplemented("4974", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
619
          }
620
          boolean needModifyDate = true;
621
          archiveCNObjectWithNotificationReplica(session, pid, sysMeta, needModifyDate);
622
      
623
      } finally {
624
          HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
625
          logMetacat.debug("CNodeService.archive - unlock the system metadata for "+pid.getValue());
626
      }
627

    
628
	  return pid;
629
      
630
  }
631
  
632
  
633
  /**
634
   * Archive a object on cn and notify the replica. This method doesn't lock the system metadata map. The caller should lock it.
635
   * This method doesn't check the authorization; this method only accept a pid.
636
   * @param session
637
   * @param pid
638
   * @param sysMeta
639
   * @param notifyReplica
640
   * @return
641
   * @throws InvalidToken
642
   * @throws ServiceFailure
643
   * @throws NotAuthorized
644
   * @throws NotFound
645
   * @throws NotImplemented
646
   */
647
  private Identifier archiveCNObjectWithNotificationReplica(Session session, Identifier pid, SystemMetadata sysMeta, boolean needModifyDate) 
648
                  throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
649
          boolean logArchive = true;
650
          archiveCNObject(logArchive, session, pid, sysMeta, needModifyDate);
651
          // notify the replicas
652
          notifyReplicaNodes(sysMeta);
653
          return pid;
654
    }
655
  
656
  
657
  
658
  /**
659
   * Set the obsoletedBy attribute in System Metadata
660
   * @param session
661
   * @param pid
662
   * @param obsoletedByPid
663
   * @param serialVersion
664
   * @return
665
   * @throws NotImplemented
666
   * @throws NotFound
667
   * @throws NotAuthorized
668
   * @throws ServiceFailure
669
   * @throws InvalidRequest
670
   * @throws InvalidToken
671
   * @throws VersionMismatch
672
   */
673
  @Override
674
  public boolean setObsoletedBy(Session session, Identifier pid,
675
			Identifier obsoletedByPid, long serialVersion)
676
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
677
			InvalidRequest, InvalidToken, VersionMismatch {
678
      
679
      // do we have a valid pid?
680
      if (pid == null || pid.getValue().trim().equals("")) {
681
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
682
          
683
      }
684
      
685
      /*String serviceFailureCode = "4941";
686
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
687
      if(sid != null) {
688
          pid = sid;
689
      }*/
690
      
691
      // do we have a valid pid?
692
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
693
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
694
          
695
      }
696
      
697
      try {
698
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
699
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
700
          }
701
      } catch (SQLException ee) {
702
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
703
      }
704
      
705

    
706
		// The lock to be used for this identifier
707
		Lock lock = null;
708

    
709
		// get the subject
710
		Subject subject = session.getSubject();
711

    
712
		// are we allowed to do this?
713
		if (!isAuthorized(session, pid, Permission.WRITE)) {
714
			throw new NotAuthorized("4881", Permission.WRITE
715
					+ " not allowed by " + subject.getValue() + " on "
716
					+ pid.getValue());
717

    
718
		}
719

    
720

    
721
		SystemMetadata systemMetadata = null;
722
		try {
723
			lock = HazelcastService.getInstance().getLock(pid.getValue());
724
			lock.lock();
725
			logMetacat.debug("Locked identifier " + pid.getValue());
726

    
727
			try {
728
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
729
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
730
				}
731

    
732
				// did we get it correctly?
733
				if (systemMetadata == null) {
734
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
735
				}
736

    
737
				// does the request have the most current system metadata?
738
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
739
					String msg = "The requested system metadata version number "
740
							+ serialVersion
741
							+ " differs from the current version at "
742
							+ systemMetadata.getSerialVersion().longValue()
743
							+ ". Please get the latest copy in order to modify it.";
744
					throw new VersionMismatch("4886", msg);
745

    
746
				}
747
				
748
				 //only apply to the object whose authoritative member node is v1.
749
	              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
750
	              String version = checker.getVersion("MNRead");
751
	              if(version == null) {
752
	                  throw new ServiceFailure("4941", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
753
	              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
754
	                  //we don't apply this method to an object whose authoritative node is v2
755
	                  throw new NotAuthorized("4945", V2V1MISSMATCH);
756
	              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
757
	                  //we don't understand this version (it is not v1 or v2)
758
	                  throw new InvalidRequest("4942", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
759
	              }
760

    
761
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
762
				throw new NotFound("4884", "No record found for: " + pid.getValue());
763

    
764
			}
765

    
766
			// set the new policy
767
			systemMetadata.setObsoletedBy(obsoletedByPid);
768

    
769
			// update the metadata
770
			try {
771
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
772
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
773
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
774
			} catch (RuntimeException e) {
775
				throw new ServiceFailure("4882", e.getMessage());
776
			}
777

    
778
		} catch (RuntimeException e) {
779
			throw new ServiceFailure("4882", e.getMessage());
780
		} finally {
781
			lock.unlock();
782
			logMetacat.debug("Unlocked identifier " + pid.getValue());
783
		}
784

    
785
		return true;
786
	}
787
  
788
  
789
  /**
790
   * Set the replication status for an object given the object identifier
791
   * 
792
   * @param session - the Session object containing the credentials for the Subject
793
   * @param pid - the object identifier for the given object
794
   * @param status - the replication status to be applied
795
   * 
796
   * @return true or false
797
   * 
798
   * @throws NotImplemented
799
   * @throws NotAuthorized
800
   * @throws ServiceFailure
801
   * @throws InvalidRequest
802
   * @throws InvalidToken
803
   * @throws NotFound
804
   * 
805
   */
806
  @Override
807
  public boolean setReplicationStatus(Session session, Identifier pid,
808
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
809
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
810
      InvalidRequest, NotFound {
811
	  
812
	  // cannot be called by public
813
	  if (session == null) {
814
		  throw new NotAuthorized("4720", "Session cannot be null");
815
	  } 
816
	  
817
	  /*else {
818
	      if(!isCNAdmin(session)) {
819
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
820
        }
821
	  }*/
822
	  
823
	// do we have a valid pid?
824
      if (pid == null || pid.getValue().trim().equals("")) {
825
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
826
          
827
      }
828
      
829
      /*String serviceFailureCode = "4700";
830
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
831
      if(sid != null) {
832
          pid = sid;
833
      }*/
834
      
835
      // The lock to be used for this identifier
836
      Lock lock = null;
837
      
838
      boolean allowed = false;
839
      int replicaEntryIndex = -1;
840
      List<Replica> replicas = null;
841
      // get the subject
842
      Subject subject = session.getSubject();
843
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
844
          " is " + status.toString());
845
      
846
      SystemMetadata systemMetadata = null;
847

    
848
      try {
849
          lock = HazelcastService.getInstance().getLock(pid.getValue());
850
          lock.lock();
851
          logMetacat.debug("Locked identifier " + pid.getValue());
852

    
853
          try {      
854
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
855

    
856
              // did we get it correctly?
857
              if ( systemMetadata == null ) {
858
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
859
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
860
                  
861
              }
862
              replicas = systemMetadata.getReplicaList();
863
              int count = 0;
864
              
865
              // was there a failure? log it
866
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
867
                 String msg = "The replication request of the object identified by " + 
868
                     pid.getValue() + " failed.  The error message was " +
869
                     failure.getMessage() + ".";
870
                 logMetacat.error(msg);
871
              }
872
              
873
              if (replicas.size() > 0 && replicas != null) {
874
                  // find the target replica index in the replica list
875
                  for (Replica replica : replicas) {
876
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
877
                      String targetNodeStr = targetNode.getValue();
878
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
879
                  
880
                      if (replicaNodeStr.equals(targetNodeStr)) {
881
                          replicaEntryIndex = count;
882
                          logMetacat.debug("replica entry index is: "
883
                                  + replicaEntryIndex);
884
                          break;
885
                      }
886
                      count++;
887
                  
888
                  }
889
              }
890
              // are we allowed to do this? only CNs and target MNs are allowed
891
              CNode cn = D1Client.getCN();
892
              List<Node> nodes = cn.listNodes().getNodeList();
893
              
894
              // find the node in the node list
895
              for ( Node node : nodes ) {
896
                  
897
                  NodeReference nodeReference = node.getIdentifier();
898
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
899
                      nodeReference.getValue());
900
                  
901
                  // allow target MN certs
902
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
903
                      node.getType().equals(NodeType.MN)) {
904
                      List<Subject> nodeSubjects = node.getSubjectList();
905
                      
906
                      // check if the session subject is in the node subject list
907
                      for (Subject nodeSubject : nodeSubjects) {
908
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
909
                                  nodeSubject.getValue() + " and " + subject.getValue());
910
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
911
                              
912
                              // lastly limit to COMPLETED, INVALIDATED,
913
                              // and FAILED status updates from MNs only
914
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
915
                                   status.equals(ReplicationStatus.INVALIDATED) ||
916
                                   status.equals(ReplicationStatus.FAILED)) {
917
                                  allowed = true;
918
                                  break;
919
                                  
920
                              }                              
921
                          }
922
                      }                 
923
                  }
924
              }
925

    
926
              if ( !allowed ) {
927
                  //check for CN admin access
928
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
929
                  allowed = isCNAdmin(session);
930
                  
931
              }              
932
              
933
              if ( !allowed ) {
934
                  String msg = "The subject identified by "
935
                          + subject.getValue()
936
                          + " is not a CN or MN, and does not have permission to set the replication status for "
937
                          + "the replica identified by "
938
                          + targetNode.getValue() + ".";
939
                  logMetacat.info(msg);
940
                  throw new NotAuthorized("4720", msg);
941
                  
942
              }
943

    
944
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
945
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
946
                " : " + e.getMessage());
947
            
948
          }
949
          
950
          Replica targetReplica = new Replica();
951
          // set the status for the replica
952
          if ( replicaEntryIndex != -1 ) {
953
              targetReplica = replicas.get(replicaEntryIndex);
954
              
955
              // don't allow status to change from COMPLETED to anything other
956
              // than INVALIDATED: prevents overwrites from race conditions
957
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
958
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
959
            	  throw new InvalidRequest("4730", "Status state change from " +
960
            			  targetReplica.getReplicationStatus() + " to " +
961
            			  status.toString() + "is prohibited for identifier " +
962
            			  pid.getValue() + " and target node " + 
963
            			  targetReplica.getReplicaMemberNode().getValue());
964
              }
965
              
966
              if(targetReplica.getReplicationStatus().equals(status)) {
967
                  //There is no change in the status, we do nothing.
968
                  return true;
969
              }
970
              
971
              targetReplica.setReplicationStatus(status);
972
              
973
              logMetacat.debug("Set the replication status for " + 
974
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
975
                  targetReplica.getReplicationStatus() + " for identifier " +
976
                  pid.getValue());
977
              
978
          } else {
979
              // this is a new entry, create it
980
              targetReplica.setReplicaMemberNode(targetNode);
981
              targetReplica.setReplicationStatus(status);
982
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
983
              replicas.add(targetReplica);
984
              
985
          }
986
          
987
          systemMetadata.setReplicaList(replicas);
988
                
989
          // update the metadata
990
          try {
991
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
992
              // Based on CN behavior discussion 9/16/15, we no longer want to 
993
              // update the modified date for changes to the replica list
994
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
995
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
996

    
997
              if ( !status.equals(ReplicationStatus.QUEUED) && 
998
            	   !status.equals(ReplicationStatus.REQUESTED)) {
999
                  
1000
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
1001
                          "\tNODE:\t" + targetNode.getValue() + 
1002
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1003
                
1004
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
1005
                          "\tPID:\t"  + pid.getValue() + 
1006
                          "\tNODE:\t" + targetNode.getValue() + 
1007
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1008
              }
1009

    
1010
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
1011
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
1012
                      " on target node " + targetNode + ". The exception was: " +
1013
                      failure.getMessage());
1014
              }
1015
              
1016
			  // update the replica nodes about the completed replica when complete, failed or invalid
1017
              if (status.equals(ReplicationStatus.COMPLETED) || status.equals(ReplicationStatus.FAILED) ||
1018
                      status.equals(ReplicationStatus.INVALIDATED)) {
1019
				notifyReplicaNodes(systemMetadata);
1020
			}
1021
          
1022
          } catch (RuntimeException e) {
1023
              throw new ServiceFailure("4700", e.getMessage());
1024
          
1025
          }
1026
          
1027
    } catch (RuntimeException e) {
1028
        String msg = "There was a RuntimeException getting the lock for " +
1029
            pid.getValue();
1030
        logMetacat.info(msg);
1031
        
1032
    } finally {
1033
        lock.unlock();
1034
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1035
        
1036
    }
1037
      
1038
      return true;
1039
  }
1040
  
1041
/**
1042
   * Return the checksum of the object given the identifier 
1043
   * 
1044
   * @param session - the Session object containing the credentials for the Subject
1045
   * @param pid - the object identifier for the given object
1046
   * 
1047
   * @return checksum - the checksum of the object
1048
   * 
1049
   * @throws InvalidToken
1050
   * @throws ServiceFailure
1051
   * @throws NotAuthorized
1052
   * @throws NotFound
1053
   * @throws NotImplemented
1054
   */
1055
  @Override
1056
  public Checksum getChecksum(Session session, Identifier pid)
1057
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1058
    NotImplemented {
1059
    
1060
	boolean isAuthorized = false;
1061
	try {
1062
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1063
	} catch (InvalidRequest e) {
1064
		throw new ServiceFailure("1410", e.getDescription());
1065
	}  
1066
    if (!isAuthorized) {
1067
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1068
    }
1069
    
1070
    SystemMetadata systemMetadata = null;
1071
    Checksum checksum = null;
1072
    
1073
    try {
1074
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1075

    
1076
        if (systemMetadata == null ) {
1077
            String error ="";
1078
            String localId = null;
1079
            try {
1080
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1081
              
1082
             } catch (Exception e) {
1083
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1084
            }
1085
            
1086
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1087
                error = DELETEDMESSAGE;
1088
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1089
                error = DELETEDMESSAGE;
1090
            }
1091
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1092
        }
1093
        checksum = systemMetadata.getChecksum();
1094
        
1095
    } catch (RuntimeException e) {
1096
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1097
            pid.getValue() + ". The error message was: " + e.getMessage());
1098
      
1099
    }
1100
    
1101
    return checksum;
1102
  }
1103

    
1104
  /**
1105
   * Resolve the location of a given object
1106
   * 
1107
   * @param session - the Session object containing the credentials for the Subject
1108
   * @param pid - the object identifier for the given object
1109
   * 
1110
   * @return objectLocationList - the list of nodes known to contain the object
1111
   * 
1112
   * @throws InvalidToken
1113
   * @throws ServiceFailure
1114
   * @throws NotAuthorized
1115
   * @throws NotFound
1116
   * @throws NotImplemented
1117
   */
1118
  @Override
1119
  public ObjectLocationList resolve(Session session, Identifier pid)
1120
    throws InvalidToken, ServiceFailure, NotAuthorized,
1121
    NotFound, NotImplemented {
1122

    
1123
    throw new NotImplemented("4131", "resolve not implemented");
1124

    
1125
  }
1126

    
1127
  /**
1128
   * Metacat does not implement this method at the CN level
1129
   */
1130
  @Override
1131
  public ObjectList search(Session session, String queryType, String query)
1132
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1133
    NotImplemented {
1134

    
1135
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1136
	  
1137
//    ObjectList objectList = null;
1138
//    try {
1139
//        objectList = 
1140
//          IdentifierManager.getInstance().querySystemMetadata(
1141
//              null, //startTime, 
1142
//              null, //endTime,
1143
//              null, //objectFormat, 
1144
//              false, //replicaStatus, 
1145
//              0, //start, 
1146
//              1000 //count
1147
//              );
1148
//        
1149
//    } catch (Exception e) {
1150
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1151
//    }
1152
//
1153
//      return objectList;
1154
		  
1155
  }
1156
  
1157
  /**
1158
   * Returns the object format registered in the DataONE Object Format 
1159
   * Vocabulary for the given format identifier
1160
   * 
1161
   * @param fmtid - the identifier of the format requested
1162
   * 
1163
   * @return objectFormat - the object format requested
1164
   * 
1165
   * @throws ServiceFailure
1166
   * @throws NotFound
1167
   * @throws InsufficientResources
1168
   * @throws NotImplemented
1169
   */
1170
  @Override
1171
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1172
    throws ServiceFailure, NotFound, NotImplemented {
1173
     
1174
      return ObjectFormatService.getInstance().getFormat(fmtid);
1175
      
1176
  }
1177

    
1178
    @Override
1179
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1180
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1181

    
1182
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1183
                "format ID: " + format.getFormatId() + "\n" + 
1184
                "format name: " + format.getFormatName() + "\n" + 
1185
                "format type: " + format.getFormatType() );
1186
        
1187
        // FIXME remove:
1188
        if (true)
1189
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1190
        
1191
        if (!isAdminAuthorized(session))
1192
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1193

    
1194
        String separator = ".";
1195
        try {
1196
            separator = PropertyService.getProperty("document.accNumSeparator");
1197
        } catch (PropertyNotFoundException e) {
1198
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1199
        }
1200

    
1201
        // find pids of last and next ObjectFormatList
1202
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1203
        int lastRev = -1;
1204
        try {
1205
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1206
        } catch (SQLException e) {
1207
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1208
        }
1209
        int nextRev = lastRev + 1;
1210
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1211
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1212
        
1213
        Identifier lastPid = new Identifier();
1214
        lastPid.setValue(lastDocID);
1215
        Identifier nextPid = new Identifier();
1216
        nextPid.setValue(nextDocID);
1217
        
1218
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1219
                + "Next ObjectFormatList document ID: " + nextDocID);
1220
        
1221
        // add new format to the current ObjectFormatList
1222
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1223
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1224
        innerList.add(format);
1225

    
1226
        // get existing (last) sysmeta and make a copy
1227
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1228
        SystemMetadata nextSysmeta = new SystemMetadata();
1229
        try {
1230
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1231
        } catch (IllegalAccessException | InvocationTargetException e) {
1232
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1233
        }
1234
        
1235
        // create the new object format list, and update the old sysmeta with obsoletedBy
1236
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1237
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1238
        
1239
        // TODO add to ObjectFormatService local cache?
1240
        
1241
        return formatId;
1242
    }
1243

    
1244
    /**
1245
     * Creates the object for the next / updated version of the ObjectFormatList.
1246
     * 
1247
     * @param session
1248
     * @param lastPid
1249
     * @param nextPid
1250
     * @param objectFormatList
1251
     * @param lastSysmeta
1252
     */
1253
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1254
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1255
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1256
        
1257
        PipedInputStream is = new PipedInputStream();
1258
        PipedOutputStream os = null;
1259
        
1260
        try {
1261
            os = new PipedOutputStream(is);
1262
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1263
        } catch (MarshallingException | IOException e) {
1264
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1265
        } finally {
1266
            try {
1267
                os.flush();
1268
                os.close();
1269
            } catch (IOException ioe) {
1270
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1271
            }
1272
        }
1273
        
1274
        BigInteger docSize = lastSysmeta.getSize();
1275
        try {
1276
            docSize = BigInteger.valueOf(is.available());
1277
        } catch (IOException e) {
1278
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1279
        }
1280
        
1281
        lastSysmeta.setIdentifier(nextPid);
1282
        lastSysmeta.setObsoletes(lastPid);
1283
        lastSysmeta.setSize(docSize); 
1284
        lastSysmeta.setSubmitter(session.getSubject());
1285
        lastSysmeta.setDateUploaded(new Date());
1286
        
1287
        // create new object format list
1288
        try {
1289
            create(session, nextPid, is, lastSysmeta);
1290
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1291
                | InvalidSystemMetadata | InvalidRequest e) {
1292
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1293
        }
1294
    }
1295
  
1296
    /**
1297
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1298
     * by setting the obsoletedBy value to the pid of the new version of the 
1299
     * ObjectFormatList.
1300
     * 
1301
     * @param session
1302
     * @param lastPid
1303
     * @param obsoletedByPid
1304
     * @param lastSysmeta
1305
     * @throws ServiceFailure
1306
     */
1307
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1308
            throws ServiceFailure {
1309
        
1310
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1311
        
1312
        try {
1313
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1314
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1315
                | InvalidSystemMetadata | InvalidToken e) {
1316
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1317
        }
1318
    }
1319
  /**
1320
   * Returns a list of all object formats registered in the DataONE Object 
1321
   * Format Vocabulary
1322
    * 
1323
   * @return objectFormatList - The list of object formats registered in 
1324
   *                            the DataONE Object Format Vocabulary
1325
   * 
1326
   * @throws ServiceFailure
1327
   * @throws NotImplemented
1328
   * @throws InsufficientResources
1329
   */
1330
  @Override
1331
  public ObjectFormatList listFormats() 
1332
    throws ServiceFailure, NotImplemented {
1333

    
1334
    return ObjectFormatService.getInstance().listFormats();
1335
  }
1336

    
1337
  /**
1338
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1339
    * 
1340
   * @return nodeList - List of nodes from the registry
1341
   * 
1342
   * @throws ServiceFailure
1343
   * @throws NotImplemented
1344
   */
1345
  @Override
1346
  public NodeList listNodes() 
1347
    throws NotImplemented, ServiceFailure {
1348

    
1349
    throw new NotImplemented("4800", "listNodes not implemented");
1350
  }
1351

    
1352
  /**
1353
   * Provides a mechanism for adding system metadata independently of its 
1354
   * associated object, such as when adding system metadata for data objects.
1355
    * 
1356
   * @param session - the Session object containing the credentials for the Subject
1357
   * @param pid - The identifier of the object to register the system metadata against
1358
   * @param sysmeta - The system metadata to be registered
1359
   * 
1360
   * @return true if the registration succeeds
1361
   * 
1362
   * @throws NotImplemented
1363
   * @throws NotAuthorized
1364
   * @throws ServiceFailure
1365
   * @throws InvalidRequest
1366
   * @throws InvalidSystemMetadata
1367
   */
1368
  @Override
1369
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1370
      SystemMetadata sysmeta) 
1371
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1372
      InvalidSystemMetadata {
1373

    
1374
      // The lock to be used for this identifier
1375
      Lock lock = null;
1376

    
1377
      // TODO: control who can call this?
1378
      if (session == null) {
1379
          //TODO: many of the thrown exceptions do not use the correct error codes
1380
          //check these against the docs and correct them
1381
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1382
                  "  If you are not logged in, please do so and retry the request.");
1383
      } else {
1384
          //only CN is allwoed
1385
          if(!isCNAdmin(session)) {
1386
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1387
          }
1388
      }
1389
      // the identifier can't be an SID
1390
      try {
1391
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1392
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1393
          }
1394
      } catch (SQLException sqle) {
1395
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1396
      }
1397
      
1398
      // verify that guid == SystemMetadata.getIdentifier()
1399
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1400
          "|" + sysmeta.getIdentifier().getValue());
1401
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1402
          throw new InvalidRequest("4863", 
1403
              "The identifier in method call (" + pid.getValue() + 
1404
              ") does not match identifier in system metadata (" +
1405
              sysmeta.getIdentifier().getValue() + ").");
1406
      }
1407
      
1408
      //check if the sid is legitimate in the system metadata
1409
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1410
      Identifier sid = sysmeta.getSeriesId();
1411
      if(sid != null) {
1412
          if (!isValidIdentifier(sid)) {
1413
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1414
          }
1415
      }
1416

    
1417
      try {
1418
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1419
          lock.lock();
1420
          logMetacat.debug("Locked identifier " + pid.getValue());
1421
          logMetacat.debug("Checking if identifier exists...");
1422
          // Check that the identifier does not already exist
1423
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1424
              throw new InvalidRequest("4863", 
1425
                  "The identifier is already in use by an existing object.");
1426
          
1427
          }
1428
          
1429
          // insert the system metadata into the object store
1430
          logMetacat.debug("Starting to insert SystemMetadata...");
1431
          try {
1432
              //for the object whose authoriative mn is v1. we need reset the modification date.
1433
              //d1-sync already set the serial version. so we don't need do again.
1434
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1435
              String version = checker.getVersion("MNRead");
1436
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1437
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1438
              }
1439
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1440
              
1441
          } catch (RuntimeException e) {
1442
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1443
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1444
                  e.getClass() + ": " + e.getMessage());
1445
              
1446
          }
1447
          
1448
      } catch (RuntimeException e) {
1449
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1450
                  e.getClass() + ": " + e.getMessage());
1451
          
1452
      }  finally {
1453
          lock.unlock();
1454
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1455
          
1456
      }
1457

    
1458
      
1459
      logMetacat.debug("Returning from registerSystemMetadata");
1460
      
1461
      try {
1462
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1463
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1464
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1465
    	          localId, "registerSystemMetadata");
1466
      } catch (McdbDocNotFoundException e) {
1467
    	  // do nothing, no localId to log with
1468
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1469
      } catch (SQLException ee) {
1470
          // do nothing, no localId to log with
1471
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1472
      }
1473
      
1474
      
1475
      return pid;
1476
  }
1477
  
1478
  /**
1479
   * Given an optional scope and format, reserves and returns an identifier 
1480
   * within that scope and format that is unique and will not be 
1481
   * used by any other sessions. 
1482
    * 
1483
   * @param session - the Session object containing the credentials for the Subject
1484
   * @param pid - The identifier of the object to register the system metadata against
1485
   * @param scope - An optional string to be used to qualify the scope of 
1486
   *                the identifier namespace, which is applied differently 
1487
   *                depending on the format requested. If scope is not 
1488
   *                supplied, a default scope will be used.
1489
   * @param format - The optional name of the identifier format to be used, 
1490
   *                  drawn from a DataONE-specific vocabulary of identifier 
1491
   *                 format names, including several common syntaxes such 
1492
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1493
   *                 format is not supplied by the caller, the CN service 
1494
   *                 will use a default identifier format, which may change 
1495
   *                 over time.
1496
   * 
1497
   * @return true if the registration succeeds
1498
   * 
1499
   * @throws InvalidToken
1500
   * @throws ServiceFailure
1501
   * @throws NotAuthorized
1502
   * @throws IdentifierNotUnique
1503
   * @throws NotImplemented
1504
   */
1505
  @Override
1506
  public Identifier reserveIdentifier(Session session, Identifier pid)
1507
  throws InvalidToken, ServiceFailure,
1508
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1509

    
1510
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1511
  }
1512
  
1513
  @Override
1514
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1515
  throws InvalidToken, ServiceFailure,
1516
        NotAuthorized, NotImplemented, InvalidRequest {
1517
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1518
  }
1519
  
1520
  /**
1521
    * Checks whether the pid is reserved by the subject in the session param
1522
    * If the reservation is held on the pid by the subject, we return true.
1523
    * 
1524
   * @param session - the Session object containing the Subject
1525
   * @param pid - The identifier to check
1526
   * 
1527
   * @return true if the reservation exists for the subject/pid
1528
   * 
1529
   * @throws InvalidToken
1530
   * @throws ServiceFailure
1531
   * @throws NotFound - when the pid is not found (in use or in reservation)
1532
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1533
   * @throws IdentifierNotUnique - when the pid is in use
1534
   * @throws NotImplemented
1535
   */
1536

    
1537
  @Override
1538
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1539
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1540
      NotImplemented, InvalidRequest {
1541
  
1542
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1543
  }
1544

    
1545
  /**
1546
   * Changes ownership (RightsHolder) of the specified object to the 
1547
   * subject specified by userId
1548
    * 
1549
   * @param session - the Session object containing the credentials for the Subject
1550
   * @param pid - Identifier of the object to be modified
1551
   * @param userId - The subject that will be taking ownership of the specified object.
1552
   *
1553
   * @return pid - the identifier of the modified object
1554
   * 
1555
   * @throws ServiceFailure
1556
   * @throws InvalidToken
1557
   * @throws NotFound
1558
   * @throws NotAuthorized
1559
   * @throws NotImplemented
1560
   * @throws InvalidRequest
1561
   */  
1562
  @Override
1563
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1564
      long serialVersion)
1565
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1566
      NotImplemented, InvalidRequest, VersionMismatch {
1567
      
1568
      // The lock to be used for this identifier
1569
      Lock lock = null;
1570
      
1571
      // do we have a valid pid?
1572
      if (pid == null || pid.getValue().trim().equals("")) {
1573
          throw new InvalidRequest("4442", "The provided identifier was invalid.");
1574
          
1575
      }
1576

    
1577
      // get the subject
1578
      Subject subject = session.getSubject();
1579
      
1580
      String serviceFailureCode = "4490";
1581
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1582
      if(sid != null) {
1583
          pid = sid;
1584
      }
1585
      
1586
      // are we allowed to do this?
1587
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1588
          throw new NotAuthorized("4440", "not allowed by "
1589
                  + subject.getValue() + " on " + pid.getValue());
1590
          
1591
      }
1592
      
1593
      SystemMetadata systemMetadata = null;
1594
      try {
1595
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1596
          lock.lock();
1597
          logMetacat.debug("Locked identifier " + pid.getValue());
1598

    
1599
          try {
1600
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1601
              
1602
              if(systemMetadata == null) {
1603
                  throw new NotFound("4460", "The object "+pid.getValue()+" doesn't exist in the node.");
1604
              }
1605
              // does the request have the most current system metadata?
1606
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1607
                 String msg = "The requested system metadata version number " + 
1608
                     serialVersion + " differs from the current version at " +
1609
                     systemMetadata.getSerialVersion().longValue() +
1610
                     ". Please get the latest copy in order to modify it.";
1611
                 throw new VersionMismatch("4443", msg);
1612
              }
1613
              
1614
              //only apply to the object whose authoritative member node is v1.
1615
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1616
              String version = checker.getVersion("MNRead");
1617
              if(version == null) {
1618
                  throw new ServiceFailure("4490", "Couldn't determine the MNRead version of the authoritative member node storage version for the pid "+pid.getValue());
1619
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1620
                  //we don't apply this method to an object whose authoritative node is v2
1621
                  throw new NotAuthorized("4440", V2V1MISSMATCH);
1622
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1623
                  //we don't understand this version (it is not v1 or v2)
1624
                  throw new InvalidRequest("4442", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1625
              }
1626
              
1627
              
1628
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1629
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1630
              
1631
          }
1632
              
1633
          // set the new rights holder
1634
          systemMetadata.setRightsHolder(userId);
1635
          
1636
          // update the metadata
1637
          try {
1638
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1639
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1640
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1641
              notifyReplicaNodes(systemMetadata);
1642
              
1643
          } catch (RuntimeException e) {
1644
              throw new ServiceFailure("4490", e.getMessage());
1645
          
1646
          }
1647
          
1648
      } catch (RuntimeException e) {
1649
          throw new ServiceFailure("4490", e.getMessage());
1650
          
1651
      } finally {
1652
          lock.unlock();
1653
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1654
      
1655
      }
1656
      
1657
      return pid;
1658
  }
1659

    
1660
  /**
1661
   * Verify that a replication task is authorized by comparing the target node's
1662
   * Subject (from the X.509 certificate-derived Session) with the list of 
1663
   * subjects in the known, pending replication tasks map.
1664
   * 
1665
   * @param originatingNodeSession - Session information that contains the 
1666
   *                                 identity of the calling user
1667
   * @param targetNodeSubject - Subject identifying the target node
1668
   * @param pid - the identifier of the object to be replicated
1669
   * @param replicatePermission - the execute permission to be granted
1670
   * 
1671
   * @throws ServiceFailure
1672
   * @throws NotImplemented
1673
   * @throws InvalidToken
1674
   * @throws NotAuthorized
1675
   * @throws InvalidRequest
1676
   * @throws NotFound
1677
   */
1678
  @Override
1679
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1680
    Subject targetNodeSubject, Identifier pid) 
1681
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1682
    NotFound, InvalidRequest {
1683
    
1684
    boolean isAllowed = false;
1685
    SystemMetadata sysmeta = null;
1686
    NodeReference targetNode = null;
1687
    
1688
    try {
1689
      // get the target node reference from the nodes list
1690
      CNode cn = D1Client.getCN();
1691
      List<Node> nodes = cn.listNodes().getNodeList();
1692
      
1693
      if ( nodes != null ) {
1694
        for (Node node : nodes) {
1695
            
1696
        	if (node.getSubjectList() != null) {
1697
        		
1698
	            for (Subject nodeSubject : node.getSubjectList()) {
1699
	            	
1700
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1701
	                    targetNode = node.getIdentifier();
1702
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1703
	                    break;
1704
	                }
1705
	            }
1706
        	}
1707
            
1708
            if ( targetNode != null) { break; }
1709
        }
1710
        
1711
      } else {
1712
          String msg = "Couldn't get the node list from the CN";
1713
          logMetacat.debug(msg);
1714
          throw new ServiceFailure("4872", msg);
1715
          
1716
      }
1717
      
1718
      // can't find a node listed with the given subject
1719
      if ( targetNode == null ) {
1720
          String msg = "There is no Member Node registered with a node subject " +
1721
              "matching " + targetNodeSubject.getValue();
1722
          logMetacat.info(msg);
1723
          throw new NotAuthorized("4871", msg);
1724
          
1725
      }
1726
      
1727
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1728
      
1729
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1730

    
1731
      if ( sysmeta != null ) {
1732
          
1733
          List<Replica> replicaList = sysmeta.getReplicaList();
1734
          
1735
          if ( replicaList != null ) {
1736
              
1737
              // find the replica with the status set to 'requested'
1738
              for (Replica replica : replicaList) {
1739
                  ReplicationStatus status = replica.getReplicationStatus();
1740
                  NodeReference listedNode = replica.getReplicaMemberNode();
1741
                  if ( listedNode != null && targetNode != null ) {
1742
                      logMetacat.debug("Comparing " + listedNode.getValue()
1743
                              + " to " + targetNode.getValue());
1744
                      
1745
                      if (listedNode.getValue().equals(targetNode.getValue())
1746
                              && status.equals(ReplicationStatus.REQUESTED)) {
1747
                          isAllowed = true;
1748
                          break;
1749

    
1750
                      }
1751
                  }
1752
              }
1753
          }
1754
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1755
              "to replicate: " + isAllowed + " for " + pid.getValue());
1756

    
1757
          
1758
      } else {
1759
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1760
          " is null.");
1761
          String error ="";
1762
          String localId = null;
1763
          try {
1764
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1765
            
1766
           } catch (Exception e) {
1767
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1768
          }
1769
          
1770
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1771
              error = DELETEDMESSAGE;
1772
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1773
              error = DELETEDMESSAGE;
1774
          }
1775
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1776
          
1777
      }
1778

    
1779
    } catch (RuntimeException e) {
1780
    	  ServiceFailure sf = new ServiceFailure("4872", 
1781
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1782
                e.getMessage());
1783
    	  sf.initCause(e);
1784
        throw sf;
1785
        
1786
    }
1787
      
1788
    return isAllowed;
1789
    
1790
  }
1791

    
1792
  /**
1793
   * Adds a new object to the Node, where the object is a science metadata object.
1794
   * 
1795
   * @param session - the Session object containing the credentials for the Subject
1796
   * @param pid - The object identifier to be created
1797
   * @param object - the object bytes
1798
   * @param sysmeta - the system metadata that describes the object  
1799
   * 
1800
   * @return pid - the object identifier created
1801
   * 
1802
   * @throws InvalidToken
1803
   * @throws ServiceFailure
1804
   * @throws NotAuthorized
1805
   * @throws IdentifierNotUnique
1806
   * @throws UnsupportedType
1807
   * @throws InsufficientResources
1808
   * @throws InvalidSystemMetadata
1809
   * @throws NotImplemented
1810
   * @throws InvalidRequest
1811
   */
1812
  public Identifier create(Session session, Identifier pid, InputStream object,
1813
    SystemMetadata sysmeta) 
1814
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1815
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1816
    NotImplemented, InvalidRequest {
1817
    
1818
    try {
1819
      // verify the pid is valid format
1820
      if (!isValidIdentifier(pid)) {
1821
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1822
      }
1823
      logMetacat.debug("CN.create -start to create the object with pid "+pid.getValue());
1824
      // The lock to be used for this identifier
1825
      Lock lock = null;
1826

    
1827
      try {
1828
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1829
          lock.lock();
1830
          // are we allowed?
1831
          boolean isAllowed = false;
1832
          isAllowed = isAdminAuthorized(session);
1833
          
1834
          // additional check if it is the authoritative node if it is not the admin
1835
          if(!isAllowed) {
1836
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1837
          }
1838

    
1839
          // proceed if we're called by a CN
1840
          if ( isAllowed ) {
1841
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1842
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1843
              Identifier sid = sysmeta.getSeriesId();
1844
              if(sid != null) {
1845
                  if (!isValidIdentifier(sid)) {
1846
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1847
                  }
1848
              }
1849
              // create the coordinating node version of the document      
1850
              logMetacat.debug("CN.create - after locking identifier, passing authorization check, continue to create the object " + pid.getValue());
1851
              sysmeta.setSerialVersion(BigInteger.ONE);
1852
              //for the object whose authoritative mn is v1. we need reset the modification date.
1853
              //for the object whose authoritative mn is v2. we just accept the modification date.
1854
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1855
              String version = checker.getVersion("MNRead");
1856
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1857
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1858
              }
1859
              //sysmeta.setArchived(false); // this is a create op, not update
1860
              
1861
              // the CN should have set the origin and authoritative member node fields
1862
              try {
1863
                  sysmeta.getOriginMemberNode().getValue();
1864
                  sysmeta.getAuthoritativeMemberNode().getValue();
1865
                  
1866
              } catch (NullPointerException npe) {
1867
                  throw new InvalidSystemMetadata("4896", 
1868
                      "Both the origin and authoritative member node identifiers need to be set.");
1869
                  
1870
              }
1871
              pid = super.create(session, pid, object, sysmeta);
1872

    
1873
          } else {
1874
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1875
                  " isn't allowed to call create() on a Coordinating Node for pid "+pid.getValue();
1876
              logMetacat.error(msg);
1877
              throw new NotAuthorized("1100", msg);
1878
          }
1879
          
1880
      } catch (RuntimeException e) {
1881
          // Convert Hazelcast runtime exceptions to service failures
1882
          String msg = "There was a problem creating the object identified by " +
1883
              pid.getValue() + ". There error message was: " + e.getMessage();
1884
          throw new ServiceFailure("4893", msg);
1885
          
1886
      } finally {
1887
    	  if (lock != null) {
1888
	          lock.unlock();
1889
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1890
    	  }
1891
      }
1892
    } finally {
1893
        IOUtils.closeQuietly(object);
1894
    }
1895
    return pid;
1896

    
1897
  }
1898

    
1899
  /**
1900
   * Set access for a given object using the object identifier and a Subject
1901
   * under a given Session.
1902
   * This method only applies the objects whose authoritative mn is a v1 node.
1903
   * @param session - the Session object containing the credentials for the Subject
1904
   * @param pid - the object identifier for the given object to apply the policy
1905
   * @param policy - the access policy to be applied
1906
   * 
1907
   * @return true if the application of the policy succeeds
1908
   * @throws InvalidToken
1909
   * @throws ServiceFailure
1910
   * @throws NotFound
1911
   * @throws NotAuthorized
1912
   * @throws NotImplemented
1913
   * @throws InvalidRequest
1914
   */
1915
  public boolean setAccessPolicy(Session session, Identifier pid, 
1916
      AccessPolicy accessPolicy, long serialVersion) 
1917
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1918
      NotImplemented, InvalidRequest, VersionMismatch {
1919
      
1920
   // do we have a valid pid?
1921
      if (pid == null || pid.getValue().trim().equals("")) {
1922
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1923
          
1924
      }
1925
      
1926
      String serviceFailureCode = "4430";
1927
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1928
      if(sid != null) {
1929
          pid = sid;
1930
      }
1931
      // The lock to be used for this identifier
1932
      Lock lock = null;
1933
      SystemMetadata systemMetadata = null;
1934
      
1935
      boolean success = false;
1936
      
1937
      // get the subject
1938
      Subject subject = session.getSubject();
1939
      
1940
      // are we allowed to do this?
1941
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1942
          throw new NotAuthorized("4420", "not allowed by "
1943
                  + subject.getValue() + " on " + pid.getValue());
1944
      }
1945
      
1946
      try {
1947
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1948
          lock.lock();
1949
          logMetacat.debug("Locked identifier " + pid.getValue());
1950

    
1951
          try {
1952
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1953

    
1954
              if ( systemMetadata == null ) {
1955
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1956
                  
1957
              }
1958
              // does the request have the most current system metadata?
1959
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1960
                 String msg = "The requested system metadata version number " + 
1961
                     serialVersion + " differs from the current version at " +
1962
                     systemMetadata.getSerialVersion().longValue() +
1963
                     ". Please get the latest copy in order to modify it.";
1964
                 throw new VersionMismatch("4402", msg);
1965
                 
1966
              }
1967
              
1968
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1969
              String version = checker.getVersion("MNRead");
1970
              if(version == null) {
1971
                  throw new ServiceFailure("4430", "Couldn't determine the version of MNRead of the authoritative member node for the pid "+pid.getValue());
1972
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1973
                  //we don't apply this method to an object whose authoritative node is v2
1974
                  throw new NotAuthorized("4420", V2V1MISSMATCH);
1975
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1976
                  //we don't understand this version (it is not v1 or v2)
1977
                  throw new InvalidRequest("4402", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1978
              }
1979
              
1980
          } catch (RuntimeException e) {
1981
              // convert Hazelcast RuntimeException to NotFound
1982
              throw new NotFound("4400", "No record found for: " + pid);
1983
            
1984
          }
1985
              
1986
          // set the access policy
1987
          systemMetadata.setAccessPolicy(accessPolicy);
1988
          
1989
          // update the system metadata
1990
          try {
1991
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1992
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1993
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1994
              notifyReplicaNodes(systemMetadata);
1995
              
1996
          } catch (RuntimeException e) {
1997
              // convert Hazelcast RuntimeException to ServiceFailure
1998
              throw new ServiceFailure("4430", e.getMessage());
1999
            
2000
          }
2001
          
2002
      } catch (RuntimeException e) {
2003
          throw new ServiceFailure("4430", e.getMessage());
2004
          
2005
      } finally {
2006
          lock.unlock();
2007
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2008
        
2009
      }
2010

    
2011
    
2012
    // TODO: how do we know if the map was persisted?
2013
    success = true;
2014
    
2015
    return success;
2016
  }
2017

    
2018
  /**
2019
   * Full replacement of replication metadata in the system metadata for the 
2020
   * specified object, changes date system metadata modified
2021
   * 
2022
   * @param session - the Session object containing the credentials for the Subject
2023
   * @param pid - the object identifier for the given object to apply the policy
2024
   * @param replica - the replica to be updated
2025
   * @return
2026
   * @throws NotImplemented
2027
   * @throws NotAuthorized
2028
   * @throws ServiceFailure
2029
   * @throws InvalidRequest
2030
   * @throws NotFound
2031
   * @throws VersionMismatch
2032
   */
2033
  @Override
2034
  public boolean updateReplicationMetadata(Session session, Identifier pid,
2035
      Replica replica, long serialVersion) 
2036
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
2037
      NotFound, VersionMismatch {
2038
      
2039
      // The lock to be used for this identifier
2040
      Lock lock = null;
2041
      
2042
      // get the subject
2043
      Subject subject = session.getSubject();
2044
      
2045
      // are we allowed to do this?
2046
      if(session == null) {
2047
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
2048
      } else {
2049
          if(!isCNAdmin(session)) {
2050
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
2051
        }
2052
      }
2053
      /*try {
2054

    
2055
          // what is the controlling permission?
2056
          if (!isAuthorized(session, pid, Permission.WRITE)) {
2057
              throw new NotAuthorized("4851", "not allowed by "
2058
                      + subject.getValue() + " on " + pid.getValue());
2059
          }
2060

    
2061
        
2062
      } catch (InvalidToken e) {
2063
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
2064
                  " on " + pid.getValue());  
2065
          
2066
      }*/
2067

    
2068
      SystemMetadata systemMetadata = null;
2069
      try {
2070
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2071
          lock.lock();
2072
          logMetacat.debug("Locked identifier " + pid.getValue());
2073

    
2074
          try {      
2075
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2076

    
2077
              // does the request have the most current system metadata?
2078
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2079
                 String msg = "The requested system metadata version number " + 
2080
                     serialVersion + " differs from the current version at " +
2081
                     systemMetadata.getSerialVersion().longValue() +
2082
                     ". Please get the latest copy in order to modify it.";
2083
                 throw new VersionMismatch("4855", msg);
2084
              }
2085
              
2086
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
2087
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
2088
                  " : " + e.getMessage());
2089
            
2090
          }
2091
              
2092
          // set the status for the replica
2093
          List<Replica> replicas = systemMetadata.getReplicaList();
2094
          NodeReference replicaNode = replica.getReplicaMemberNode();
2095
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2096
          int index = 0;
2097
          for (Replica listedReplica: replicas) {
2098
              
2099
              // remove the replica that we are replacing
2100
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2101
                      // don't allow status to change from COMPLETED to anything other
2102
                      // than INVALIDATED: prevents overwrites from race conditions
2103
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2104
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2105
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2106
                	  throw new InvalidRequest("4853", "Status state change from " +
2107
                			  listedReplica.getReplicationStatus() + " to " +
2108
                			  replicaStatus.toString() + "is prohibited for identifier " +
2109
                			  pid.getValue() + " and target node " + 
2110
                			  listedReplica.getReplicaMemberNode().getValue());
2111

    
2112
            	  }
2113
                  replicas.remove(index);
2114
                  break;
2115
                  
2116
              }
2117
              index++;
2118
          }
2119
          
2120
          // add the new replica item
2121
          replicas.add(replica);
2122
          systemMetadata.setReplicaList(replicas);
2123
          
2124
          // update the metadata
2125
          try {
2126
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2127
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2128
              // update the modified date for changes to the replica list
2129
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2130
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2131
              
2132
              // inform replica nodes of the change if the status is complete
2133
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2134
            	  notifyReplicaNodes(systemMetadata);
2135
            	  
2136
              }
2137
          } catch (RuntimeException e) {
2138
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2139
              throw new ServiceFailure("4852", e.getMessage());
2140
          
2141
          }
2142
          
2143
      } catch (RuntimeException e) {
2144
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2145
          throw new ServiceFailure("4852", e.getMessage());
2146
      
2147
      } finally {
2148
          lock.unlock();
2149
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2150
          
2151
      }
2152
    
2153
      return true;
2154
      
2155
  }
2156
  
2157
  /**
2158
   * 
2159
   */
2160
  @Override
2161
  public ObjectList listObjects(Session session, Date startTime, 
2162
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2163
      Integer start, Integer count)
2164
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2165
      ServiceFailure {
2166

    
2167
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2168
  }
2169

    
2170
  
2171
 	/**
2172
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2173
 	 * @return cal  the list of checksum algorithms
2174
 	 * 
2175
 	 * @throws ServiceFailure
2176
 	 * @throws NotImplemented
2177
 	 */
2178
  @Override
2179
  public ChecksumAlgorithmList listChecksumAlgorithms()
2180
			throws ServiceFailure, NotImplemented {
2181
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2182
		cal.addAlgorithm("MD5");
2183
		cal.addAlgorithm("SHA-1");
2184
		return cal;
2185
		
2186
	}
2187

    
2188
  /**
2189
   * Notify replica Member Nodes of system metadata changes for a given pid
2190
   * 
2191
   * @param currentSystemMetadata - the up to date system metadata
2192
   */
2193
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2194
      
2195
      Session session = null;
2196
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2197
      //MNode mn = null;
2198
      NodeReference replicaNodeRef = null;
2199
      CNode cn = null;
2200
      NodeType nodeType = null;
2201
      List<Node> nodeList = null;
2202
      
2203
      try {
2204
          cn = D1Client.getCN();
2205
          nodeList = cn.listNodes().getNodeList();
2206
          
2207
      } catch (Exception e) { // handle BaseException and other I/O issues
2208
          
2209
          // swallow errors since the call is not critical
2210
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2211
              "to communication issues with the CN: " + e.getMessage());
2212
          
2213
      }
2214
      
2215
      if ( replicaList != null ) {
2216
          
2217
          // iterate through the replicas and inform  MN replica nodes
2218
          for (Replica replica : replicaList) {
2219
              String replicationVersion = null;
2220
              replicaNodeRef = replica.getReplicaMemberNode();
2221
              try {
2222
                  if (nodeList != null) {
2223
                      // find the node type
2224
                      for (Node node : nodeList) {
2225
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2226
                              nodeType = node.getType();
2227
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2228
                              replicationVersion = checker.getVersion("MNRead");
2229
                              break;
2230
              
2231
                          }
2232
                      }
2233
                  }
2234
              
2235
                  // notify only MNs
2236
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2237
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2238
                          //connect to a v2 mn
2239
                          MNode mn = D1Client.getMN(replicaNodeRef);
2240
                          mn.systemMetadataChanged(session, 
2241
                              currentSystemMetadata.getIdentifier(), 
2242
                              currentSystemMetadata.getSerialVersion().longValue(),
2243
                              currentSystemMetadata.getDateSysMetadataModified());
2244
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2245
                          //connect to a v1 mn
2246
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2247
                          mn.systemMetadataChanged(session, 
2248
                                  currentSystemMetadata.getIdentifier(), 
2249
                                  currentSystemMetadata.getSerialVersion().longValue(),
2250
                                  currentSystemMetadata.getDateSysMetadataModified());
2251
                      }
2252
                      
2253
                  }
2254
              
2255
              } catch (Exception e) { // handle BaseException and other I/O issues
2256
              
2257
                  // swallow errors since the call is not critical
2258
                  logMetacat.error("Can't inform "
2259
                          + replicaNodeRef.getValue()
2260
                          + " of system metadata changes due "
2261
                          + "to communication issues with the CN: "
2262
                          + e.getMessage());
2263
              
2264
              }
2265
          }
2266
      }
2267
  }
2268
  
2269
  /**
2270
   * Update the system metadata of the specified pid.
2271
   * Note: the serial version and the replica list in the new system metadata will be ignored and the old values will be kept.
2272
   */
2273
  @Override
2274
  public boolean updateSystemMetadata(Session session, Identifier pid,
2275
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2276
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2277
   if(sysmeta == null) {
2278
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2279
   }
2280
   if(pid == null || pid.getValue() == null) {
2281
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2282
   }
2283

    
2284
   if (session == null) {
2285
       //TODO: many of the thrown exceptions do not use the correct error codes
2286
       //check these against the docs and correct them
2287
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2288
               "  If you are not logged in, please do so and retry the request.");
2289
   } else {
2290
         //only CN is allwoed
2291
         if(!isCNAdmin(session)) {
2292
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2293
         }
2294
   }
2295

    
2296
    //update the system metadata locally
2297
    boolean success = false;
2298
    try {
2299
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2300
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2301
       
2302
        if(currentSysmeta == null) {
2303
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2304
        }
2305
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2306
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2307
        sysmeta.setSerialVersion(currentSerialVersion);
2308
        List<Replica> replicas = currentSysmeta.getReplicaList();
2309
        sysmeta.setReplicaList(replicas);
2310
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2311
        boolean fromCN = true;
2312
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta, fromCN);
2313
    } finally {
2314
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2315
    }
2316
    return success;
2317
  }
2318
  
2319
    @Override
2320
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2321
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2322

    
2323
    }
2324

    
2325
	@Override
2326
	public QueryEngineDescription getQueryEngineDescription(Session session,
2327
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2328
			NotImplemented, NotFound {
2329
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2330

    
2331
	}
2332
	
2333
	@Override
2334
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2335
			ServiceFailure, NotAuthorized, NotImplemented {
2336
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2337

    
2338
	}
2339
	
2340
	@Override
2341
	public InputStream query(Session session, String queryEngine, String query)
2342
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2343
			NotImplemented, NotFound {
2344
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2345

    
2346
	}
2347
	
2348
	@Override
2349
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2350
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2351
	}
2352

    
2353
}
(1-1/8)