Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.sql.SQLException;
29
import java.util.ArrayList;
30
import java.util.Calendar;
31
import java.util.Date;
32
import java.util.List;
33
import java.util.concurrent.locks.Lock;
34

    
35
import javax.servlet.http.HttpServletRequest;
36

    
37
import org.apache.log4j.Logger;
38
import org.dataone.client.v2.CNode;
39
import org.dataone.client.v2.itk.D1Client;
40
import org.dataone.client.v2.MNode;
41
import org.dataone.service.cn.v2.CNAuthorization;
42
import org.dataone.service.cn.v2.CNCore;
43
import org.dataone.service.cn.v2.CNRead;
44
import org.dataone.service.cn.v2.CNReplication;
45
import org.dataone.service.exceptions.BaseException;
46
import org.dataone.service.exceptions.IdentifierNotUnique;
47
import org.dataone.service.exceptions.InsufficientResources;
48
import org.dataone.service.exceptions.InvalidRequest;
49
import org.dataone.service.exceptions.InvalidSystemMetadata;
50
import org.dataone.service.exceptions.InvalidToken;
51
import org.dataone.service.exceptions.NotAuthorized;
52
import org.dataone.service.exceptions.NotFound;
53
import org.dataone.service.exceptions.NotImplemented;
54
import org.dataone.service.exceptions.ServiceFailure;
55
import org.dataone.service.exceptions.UnsupportedType;
56
import org.dataone.service.exceptions.VersionMismatch;
57
import org.dataone.service.types.v1.AccessPolicy;
58
import org.dataone.service.types.v1.Checksum;
59
import org.dataone.service.types.v1.ChecksumAlgorithmList;
60
import org.dataone.service.types.v1.DescribeResponse;
61
import org.dataone.service.types.v1.Event;
62
import org.dataone.service.types.v1.Identifier;
63
import org.dataone.service.types.v2.Log;
64
import org.dataone.service.types.v2.Node;
65
import org.dataone.service.types.v2.NodeList;
66
import org.dataone.service.types.v1.NodeReference;
67
import org.dataone.service.types.v1.NodeType;
68
import org.dataone.service.types.v2.ObjectFormat;
69
import org.dataone.service.types.v1.ObjectFormatIdentifier;
70
import org.dataone.service.types.v2.ObjectFormatList;
71
import org.dataone.service.types.v1.ObjectList;
72
import org.dataone.service.types.v1.ObjectLocationList;
73
import org.dataone.service.types.v1.Permission;
74
import org.dataone.service.types.v1.Replica;
75
import org.dataone.service.types.v1.ReplicationPolicy;
76
import org.dataone.service.types.v1.ReplicationStatus;
77
import org.dataone.service.types.v1.Session;
78
import org.dataone.service.types.v1.Subject;
79
import org.dataone.service.types.v2.SystemMetadata;
80
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
81
import org.dataone.service.types.v1_1.QueryEngineDescription;
82
import org.dataone.service.types.v1_1.QueryEngineList;
83

    
84
import edu.ucsb.nceas.metacat.EventLog;
85
import edu.ucsb.nceas.metacat.IdentifierManager;
86
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
87
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
88
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
89

    
90
/**
91
 * Represents Metacat's implementation of the DataONE Coordinating Node 
92
 * service API. Methods implement the various CN* interfaces, and methods common
93
 * to both Member Node and Coordinating Node interfaces are found in the
94
 * D1NodeService super class.
95
 *
96
 */
97
public class CNodeService extends D1NodeService implements CNAuthorization,
98
    CNCore, CNRead, CNReplication {
99

    
100
  /* the logger instance */
101
  private Logger logMetacat = null;
102

    
103
  /**
104
   * singleton accessor
105
   */
106
  public static CNodeService getInstance(HttpServletRequest request) { 
107
    return new CNodeService(request);
108
  }
109
  
110
  /**
111
   * Constructor, private for singleton access
112
   */
113
  private CNodeService(HttpServletRequest request) {
114
    super(request);
115
    logMetacat = Logger.getLogger(CNodeService.class);
116
        
117
  }
118
    
119
  /**
120
   * Set the replication policy for an object given the object identifier
121
   * 
122
   * @param session - the Session object containing the credentials for the Subject
123
   * @param pid - the object identifier for the given object
124
   * @param policy - the replication policy to be applied
125
   * 
126
   * @return true or false
127
   * 
128
   * @throws NotImplemented
129
   * @throws NotAuthorized
130
   * @throws ServiceFailure
131
   * @throws InvalidRequest
132
   * @throws VersionMismatch
133
   * 
134
   */
135
  @Override
136
  public boolean setReplicationPolicy(Session session, Identifier pid,
137
      ReplicationPolicy policy, long serialVersion) 
138
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
139
      InvalidRequest, InvalidToken, VersionMismatch {
140
      
141
      // The lock to be used for this identifier
142
      Lock lock = null;
143
      
144
      // get the subject
145
      Subject subject = session.getSubject();
146
      
147
      // are we allowed to do this?
148
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
149
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
150
                  + " not allowed by " + subject.getValue() + " on "
151
                  + pid.getValue());
152
          
153
      }
154
      
155
      SystemMetadata systemMetadata = null;
156
      try {
157
          lock = HazelcastService.getInstance().getLock(pid.getValue());
158
          lock.lock();
159
          logMetacat.debug("Locked identifier " + pid.getValue());
160

    
161
          try {
162
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
163
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
164
                  
165
              }
166
              
167
              // did we get it correctly?
168
              if ( systemMetadata == null ) {
169
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
170
                  
171
              }
172

    
173
              // does the request have the most current system metadata?
174
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
175
                 String msg = "The requested system metadata version number " + 
176
                     serialVersion + " differs from the current version at " +
177
                     systemMetadata.getSerialVersion().longValue() +
178
                     ". Please get the latest copy in order to modify it.";
179
                 throw new VersionMismatch("4886", msg);
180
                 
181
              }
182
              
183
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
184
              throw new NotFound("4884", "No record found for: " + pid.getValue());
185
            
186
          }
187
          
188
          // set the new policy
189
          systemMetadata.setReplicationPolicy(policy);
190
          
191
          // update the metadata
192
          try {
193
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
194
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
195
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
196
              notifyReplicaNodes(systemMetadata);
197
              
198
          } catch (RuntimeException e) {
199
              throw new ServiceFailure("4882", e.getMessage());
200
          
201
          }
202
          
203
      } catch (RuntimeException e) {
204
          throw new ServiceFailure("4882", e.getMessage());
205
          
206
      } finally {
207
          lock.unlock();
208
          logMetacat.debug("Unlocked identifier " + pid.getValue());
209
          
210
      }
211
    
212
      return true;
213
  }
214

    
215
  /**
216
   * Deletes the replica from the given Member Node
217
   * NOTE: MN.delete() may be an "archive" operation. TBD.
218
   * @param session
219
   * @param pid
220
   * @param nodeId
221
   * @param serialVersion
222
   * @return
223
   * @throws InvalidToken
224
   * @throws ServiceFailure
225
   * @throws NotAuthorized
226
   * @throws NotFound
227
   * @throws NotImplemented
228
   * @throws VersionMismatch
229
   */
230
  @Override
231
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
232
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
233
	  
234
	  	// The lock to be used for this identifier
235
		Lock lock = null;
236

    
237
		// get the subject
238
		Subject subject = session.getSubject();
239

    
240
		// are we allowed to do this?
241
		boolean isAuthorized = false;
242
		try {
243
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
244
		} catch (InvalidRequest e) {
245
			throw new ServiceFailure("4882", e.getDescription());
246
		}
247
		if (!isAuthorized) {
248
			throw new NotAuthorized("4881", Permission.WRITE
249
					+ " not allowed by " + subject.getValue() + " on "
250
					+ pid.getValue());
251

    
252
		}
253

    
254
		SystemMetadata systemMetadata = null;
255
		try {
256
			lock = HazelcastService.getInstance().getLock(pid.getValue());
257
			lock.lock();
258
			logMetacat.debug("Locked identifier " + pid.getValue());
259

    
260
			try {
261
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
262
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
263
				}
264

    
265
				// did we get it correctly?
266
				if (systemMetadata == null) {
267
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
268
				}
269

    
270
				// does the request have the most current system metadata?
271
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
272
					String msg = "The requested system metadata version number "
273
							+ serialVersion
274
							+ " differs from the current version at "
275
							+ systemMetadata.getSerialVersion().longValue()
276
							+ ". Please get the latest copy in order to modify it.";
277
					throw new VersionMismatch("4886", msg);
278

    
279
				}
280

    
281
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
282
				throw new NotFound("4884", "No record found for: " + pid.getValue());
283

    
284
			}
285
			  
286
			// check permissions
287
			// TODO: is this necessary?
288
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
289
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
290
			if (!isAllowed) {
291
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
292
			}
293
			  
294
			// delete the replica from the given node
295
			// CSJ: use CN.delete() to truly delete a replica, semantically
296
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
297
			//D1Client.getMN(nodeId).delete(session, pid);
298
			  
299
			// reflect that change in the system metadata
300
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
301
			for (Replica r: systemMetadata.getReplicaList()) {
302
				  if (r.getReplicaMemberNode().equals(nodeId)) {
303
					  updatedReplicas.remove(r);
304
					  break;
305
				  }
306
			}
307
			systemMetadata.setReplicaList(updatedReplicas);
308

    
309
			// update the metadata
310
			try {
311
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
312
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
313
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
314
			} catch (RuntimeException e) {
315
				throw new ServiceFailure("4882", e.getMessage());
316
			}
317

    
318
		} catch (RuntimeException e) {
319
			throw new ServiceFailure("4882", e.getMessage());
320
		} finally {
321
			lock.unlock();
322
			logMetacat.debug("Unlocked identifier " + pid.getValue());
323
		}
324

    
325
		return true;	  
326
	  
327
  }
328
  
329
  /**
330
   * Deletes an object from the Coordinating Node
331
   * 
332
   * @param session - the Session object containing the credentials for the Subject
333
   * @param pid - The object identifier to be deleted
334
   * 
335
   * @return pid - the identifier of the object used for the deletion
336
   * 
337
   * @throws InvalidToken
338
   * @throws ServiceFailure
339
   * @throws NotAuthorized
340
   * @throws NotFound
341
   * @throws NotImplemented
342
   * @throws InvalidRequest
343
   */
344
  @Override
345
  public Identifier delete(Session session, Identifier pid) 
346
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
347
      
348
      String localId = null;      // The corresponding docid for this pid
349
	  Lock lock = null;           // The lock to be used for this identifier
350
      CNode cn = null;            // a reference to the CN to get the node list    
351
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
352
      List<Node> nodeList = null; // the list of nodes in this CN environment
353

    
354
      // check for a valid session
355
      if (session == null) {
356
        	throw new InvalidToken("4963", "No session has been provided");
357
        	
358
      }
359

    
360
      // do we have a valid pid?
361
      if (pid == null || pid.getValue().trim().equals("")) {
362
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
363
          
364
      }
365

    
366
	  // check that it is CN/admin
367
	  boolean allowed = isAdminAuthorized(session);
368
	  
369
	  // additional check if it is the authoritative node if it is not the admin
370
      if(!allowed) {
371
          allowed = isAuthoritativeMNodeAdmin(session, pid);
372
          
373
      }
374
	  
375
	  if (!allowed) {
376
		  String msg = "The subject " + session.getSubject().getValue() + 
377
			  " is not allowed to call delete() on a Coordinating Node.";
378
		  logMetacat.info(msg);
379
		  throw new NotAuthorized("4960", msg);
380
		  
381
	  }
382
	  
383
	  // Don't defer to superclass implementation without a locally registered identifier
384
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
385
      // Check for the existing identifier
386
      try {
387
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
388
          super.delete(session, pid);
389
          
390
      } catch (McdbDocNotFoundException e) {
391
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
392
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
393
    	  
394
          try {
395
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
396
  			  lock.lock();
397
  			  logMetacat.debug("Locked identifier " + pid.getValue());
398

    
399
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
400
			  if ( sysMeta != null ) {
401
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
402
				sysMeta.setArchived(true);
403
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
404
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
405
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
406
	            //since this is cn, we don't need worry about the mn solr index.
407
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
408
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
409
	            String username = session.getSubject().getValue();//just for logging purpose
410
                //since data objects were not registered in the identifier table, we use pid as the docid
411
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
412
				
413
			  } else {
414
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
415
					  ". Couldn't obtain the system metadata record.");
416
				  
417
			  }
418
			  
419
		  } catch (RuntimeException re) {
420
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
421
				  ". The error message was: " + re.getMessage());
422
			  
423
		  } finally {
424
			  lock.unlock();
425
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
426

    
427
		  }
428

    
429
          // NOTE: cannot log the delete without localId
430
//          EventLog.getInstance().log(request.getRemoteAddr(), 
431
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
432
//                  pid.getValue(), Event.DELETE.xmlValue());
433

    
434
      } catch (SQLException e) {
435
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
436
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
437
      }
438

    
439
      // get the node list
440
      try {
441
          cn = D1Client.getCN();
442
          nodeList = cn.listNodes().getNodeList();
443
          
444
      } catch (Exception e) { // handle BaseException and other I/O issues
445
          
446
          // swallow errors since the call is not critical
447
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
448
              " due to communication issues with the CN: " + e.getMessage());
449
          
450
      }
451

    
452
	  // notify the replicas
453
	  if (systemMetadata.getReplicaList() != null) {
454
		  for (Replica replica: systemMetadata.getReplicaList()) {
455
			  NodeReference replicaNode = replica.getReplicaMemberNode();
456
			  try {
457
                  if (nodeList != null) {
458
                      // find the node type
459
                      for (Node node : nodeList) {
460
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
461
                              nodeType = node.getType();
462
                              break;
463
              
464
                          }
465
                      }
466
                  }
467
                  
468
                  // only send call MN.delete() to avoid an infinite loop with the CN
469
                  if (nodeType != null && nodeType == NodeType.MN) {
470
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
471
                  }
472
                  
473
			  } catch (Exception e) {
474
				  // all we can really do is log errors and carry on with life
475
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
476
					  " from replica MN: " + replicaNode.getValue(), e);
477
			}
478
			  
479
		  }
480
	  }
481
	  
482
	  return pid;
483
      
484
  }
485
  
486
  /**
487
   * Archives an object from the Coordinating Node
488
   * 
489
   * @param session - the Session object containing the credentials for the Subject
490
   * @param pid - The object identifier to be deleted
491
   * 
492
   * @return pid - the identifier of the object used for the deletion
493
   * 
494
   * @throws InvalidToken
495
   * @throws ServiceFailure
496
   * @throws NotAuthorized
497
   * @throws NotFound
498
   * @throws NotImplemented
499
   * @throws InvalidRequest
500
   */
501
  @Override
502
  public Identifier archive(Session session, Identifier pid) 
503
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
504

    
505
      String localId = null; // The corresponding docid for this pid
506
	  Lock lock = null;      // The lock to be used for this identifier
507
      CNode cn = null;            // a reference to the CN to get the node list    
508
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
509
      List<Node> nodeList = null; // the list of nodes in this CN environment
510
      
511

    
512
      // check for a valid session
513
      if (session == null) {
514
        	throw new InvalidToken("4973", "No session has been provided");
515
        	
516
      }
517

    
518
      // do we have a valid pid?
519
      if (pid == null || pid.getValue().trim().equals("")) {
520
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
521
          
522
      }
523

    
524
	  // check that it is CN/admin
525
	  boolean allowed = isAdminAuthorized(session);
526
	  
527
	  //check if it is the authoritative member node
528
	  if(!allowed) {
529
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
530
	  }
531
	  
532
	  if (!allowed) {
533
		  String msg = "The subject " + session.getSubject().getValue() + 
534
				  " is not allowed to call archive() on a Coordinating Node.";
535
		  logMetacat.info(msg);
536
		  throw new NotAuthorized("4970", msg);
537
	  }
538
	  
539
      // Check for the existing identifier
540
      try {
541
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
542
          super.archive(session, pid);
543
          
544
      } catch (McdbDocNotFoundException e) {
545
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
546
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
547
    	  
548
          try {
549
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
550
  			  lock.lock();
551
  			  logMetacat.debug("Locked identifier " + pid.getValue());
552

    
553
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
554
			  if ( sysMeta != null ) {
555
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
556
				sysMeta.setArchived(true);
557
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
558
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
559
			    // notify the replicas
560
				notifyReplicaNodes(sysMeta);
561
				  
562
			  } else {
563
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
564
					  ". Couldn't obtain the system metadata record.");
565
				  
566
			  }
567
			  
568
		  } catch (RuntimeException re) {
569
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
570
				  ". The error message was: " + re.getMessage());
571
			  
572
		  } finally {
573
			  lock.unlock();
574
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
575

    
576
		  }
577

    
578
          // NOTE: cannot log the archive without localId
579
//          EventLog.getInstance().log(request.getRemoteAddr(), 
580
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
581
//                  pid.getValue(), Event.DELETE.xmlValue());
582

    
583
      } catch (SQLException e) {
584
          throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
585
                  ". The local id of the object with the identifier can't be identified since "+e.getMessage());
586
      }
587

    
588
	  return pid;
589
      
590
  }
591
  
592
  /**
593
   * Set the obsoletedBy attribute in System Metadata
594
   * @param session
595
   * @param pid
596
   * @param obsoletedByPid
597
   * @param serialVersion
598
   * @return
599
   * @throws NotImplemented
600
   * @throws NotFound
601
   * @throws NotAuthorized
602
   * @throws ServiceFailure
603
   * @throws InvalidRequest
604
   * @throws InvalidToken
605
   * @throws VersionMismatch
606
   */
607
  @Override
608
  public boolean setObsoletedBy(Session session, Identifier pid,
609
			Identifier obsoletedByPid, long serialVersion)
610
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
611
			InvalidRequest, InvalidToken, VersionMismatch {
612

    
613
		// The lock to be used for this identifier
614
		Lock lock = null;
615

    
616
		// get the subject
617
		Subject subject = session.getSubject();
618

    
619
		// are we allowed to do this?
620
		if (!isAuthorized(session, pid, Permission.WRITE)) {
621
			throw new NotAuthorized("4881", Permission.WRITE
622
					+ " not allowed by " + subject.getValue() + " on "
623
					+ pid.getValue());
624

    
625
		}
626

    
627

    
628
		SystemMetadata systemMetadata = null;
629
		try {
630
			lock = HazelcastService.getInstance().getLock(pid.getValue());
631
			lock.lock();
632
			logMetacat.debug("Locked identifier " + pid.getValue());
633

    
634
			try {
635
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
636
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
637
				}
638

    
639
				// did we get it correctly?
640
				if (systemMetadata == null) {
641
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
642
				}
643

    
644
				// does the request have the most current system metadata?
645
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
646
					String msg = "The requested system metadata version number "
647
							+ serialVersion
648
							+ " differs from the current version at "
649
							+ systemMetadata.getSerialVersion().longValue()
650
							+ ". Please get the latest copy in order to modify it.";
651
					throw new VersionMismatch("4886", msg);
652

    
653
				}
654

    
655
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
656
				throw new NotFound("4884", "No record found for: " + pid.getValue());
657

    
658
			}
659

    
660
			// set the new policy
661
			systemMetadata.setObsoletedBy(obsoletedByPid);
662

    
663
			// update the metadata
664
			try {
665
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
666
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
667
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
668
			} catch (RuntimeException e) {
669
				throw new ServiceFailure("4882", e.getMessage());
670
			}
671

    
672
		} catch (RuntimeException e) {
673
			throw new ServiceFailure("4882", e.getMessage());
674
		} finally {
675
			lock.unlock();
676
			logMetacat.debug("Unlocked identifier " + pid.getValue());
677
		}
678

    
679
		return true;
680
	}
681
  
682
  
683
  /**
684
   * Set the replication status for an object given the object identifier
685
   * 
686
   * @param session - the Session object containing the credentials for the Subject
687
   * @param pid - the object identifier for the given object
688
   * @param status - the replication status to be applied
689
   * 
690
   * @return true or false
691
   * 
692
   * @throws NotImplemented
693
   * @throws NotAuthorized
694
   * @throws ServiceFailure
695
   * @throws InvalidRequest
696
   * @throws InvalidToken
697
   * @throws NotFound
698
   * 
699
   */
700
  @Override
701
  public boolean setReplicationStatus(Session session, Identifier pid,
702
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
703
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
704
      InvalidRequest, NotFound {
705
	  
706
	  // cannot be called by public
707
	  if (session == null) {
708
		  throw new NotAuthorized("4720", "Session cannot be null");
709
	  }
710
      
711
      // The lock to be used for this identifier
712
      Lock lock = null;
713
      
714
      boolean allowed = false;
715
      int replicaEntryIndex = -1;
716
      List<Replica> replicas = null;
717
      // get the subject
718
      Subject subject = session.getSubject();
719
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
720
          " is " + status.toString());
721
      
722
      SystemMetadata systemMetadata = null;
723

    
724
      try {
725
          lock = HazelcastService.getInstance().getLock(pid.getValue());
726
          lock.lock();
727
          logMetacat.debug("Locked identifier " + pid.getValue());
728

    
729
          try {      
730
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
731

    
732
              // did we get it correctly?
733
              if ( systemMetadata == null ) {
734
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
735
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
736
                  
737
              }
738
              replicas = systemMetadata.getReplicaList();
739
              int count = 0;
740
              
741
              // was there a failure? log it
742
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
743
                 String msg = "The replication request of the object identified by " + 
744
                     pid.getValue() + " failed.  The error message was " +
745
                     failure.getMessage() + ".";
746
              }
747
              
748
              if (replicas.size() > 0 && replicas != null) {
749
                  // find the target replica index in the replica list
750
                  for (Replica replica : replicas) {
751
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
752
                      String targetNodeStr = targetNode.getValue();
753
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
754
                  
755
                      if (replicaNodeStr.equals(targetNodeStr)) {
756
                          replicaEntryIndex = count;
757
                          logMetacat.debug("replica entry index is: "
758
                                  + replicaEntryIndex);
759
                          break;
760
                      }
761
                      count++;
762
                  
763
                  }
764
              }
765
              // are we allowed to do this? only CNs and target MNs are allowed
766
              CNode cn = D1Client.getCN();
767
              List<Node> nodes = cn.listNodes().getNodeList();
768
              
769
              // find the node in the node list
770
              for ( Node node : nodes ) {
771
                  
772
                  NodeReference nodeReference = node.getIdentifier();
773
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
774
                      nodeReference.getValue());
775
                  
776
                  // allow target MN certs
777
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
778
                      node.getType().equals(NodeType.MN)) {
779
                      List<Subject> nodeSubjects = node.getSubjectList();
780
                      
781
                      // check if the session subject is in the node subject list
782
                      for (Subject nodeSubject : nodeSubjects) {
783
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
784
                                  nodeSubject.getValue() + " and " + subject.getValue());
785
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
786
                              
787
                              // lastly limit to COMPLETED, INVALIDATED,
788
                              // and FAILED status updates from MNs only
789
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
790
                                   status.equals(ReplicationStatus.INVALIDATED) ||
791
                                   status.equals(ReplicationStatus.FAILED)) {
792
                                  allowed = true;
793
                                  break;
794
                                  
795
                              }                              
796
                          }
797
                      }                 
798
                  }
799
              }
800

    
801
              if ( !allowed ) {
802
                  //check for CN admin access
803
                  allowed = isAuthorized(session, pid, Permission.WRITE);
804
                  
805
              }              
806
              
807
              if ( !allowed ) {
808
                  String msg = "The subject identified by "
809
                          + subject.getValue()
810
                          + " does not have permission to set the replication status for "
811
                          + "the replica identified by "
812
                          + targetNode.getValue() + ".";
813
                  logMetacat.info(msg);
814
                  throw new NotAuthorized("4720", msg);
815
                  
816
              }
817

    
818
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
819
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
820
                " : " + e.getMessage());
821
            
822
          }
823
          
824
          Replica targetReplica = new Replica();
825
          // set the status for the replica
826
          if ( replicaEntryIndex != -1 ) {
827
              targetReplica = replicas.get(replicaEntryIndex);
828
              
829
              // don't allow status to change from COMPLETED to anything other
830
              // than INVALIDATED: prevents overwrites from race conditions
831
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
832
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
833
            	  throw new InvalidRequest("4730", "Status state change from " +
834
            			  targetReplica.getReplicationStatus() + " to " +
835
            			  status.toString() + "is prohibited for identifier " +
836
            			  pid.getValue() + " and target node " + 
837
            			  targetReplica.getReplicaMemberNode().getValue());
838
              }
839
              
840
              targetReplica.setReplicationStatus(status);
841
              
842
              logMetacat.debug("Set the replication status for " + 
843
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
844
                  targetReplica.getReplicationStatus() + " for identifier " +
845
                  pid.getValue());
846
              
847
          } else {
848
              // this is a new entry, create it
849
              targetReplica.setReplicaMemberNode(targetNode);
850
              targetReplica.setReplicationStatus(status);
851
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
852
              replicas.add(targetReplica);
853
              
854
          }
855
          
856
          systemMetadata.setReplicaList(replicas);
857
                
858
          // update the metadata
859
          try {
860
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
861
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
862
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
863

    
864
              if ( !status.equals(ReplicationStatus.QUEUED) && 
865
            	   !status.equals(ReplicationStatus.REQUESTED)) {
866
                  
867
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
868
                          "\tNODE:\t" + targetNode.getValue() + 
869
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
870
                
871
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
872
                          "\tPID:\t"  + pid.getValue() + 
873
                          "\tNODE:\t" + targetNode.getValue() + 
874
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
875
              }
876

    
877
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
878
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
879
                      " on target node " + targetNode + ". The exception was: " +
880
                      failure.getMessage());
881
              }
882
              
883
			  // update the replica nodes about the completed replica when complete
884
              if (status.equals(ReplicationStatus.COMPLETED)) {
885
				notifyReplicaNodes(systemMetadata);
886
			}
887
          
888
          } catch (RuntimeException e) {
889
              throw new ServiceFailure("4700", e.getMessage());
890
          
891
          }
892
          
893
    } catch (RuntimeException e) {
894
        String msg = "There was a RuntimeException getting the lock for " +
895
            pid.getValue();
896
        logMetacat.info(msg);
897
        
898
    } finally {
899
        lock.unlock();
900
        logMetacat.debug("Unlocked identifier " + pid.getValue());
901
        
902
    }
903
      
904
      return true;
905
  }
906
  
907
/**
908
   * Return the checksum of the object given the identifier 
909
   * 
910
   * @param session - the Session object containing the credentials for the Subject
911
   * @param pid - the object identifier for the given object
912
   * 
913
   * @return checksum - the checksum of the object
914
   * 
915
   * @throws InvalidToken
916
   * @throws ServiceFailure
917
   * @throws NotAuthorized
918
   * @throws NotFound
919
   * @throws NotImplemented
920
   */
921
  @Override
922
  public Checksum getChecksum(Session session, Identifier pid)
923
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
924
    NotImplemented {
925
    
926
	boolean isAuthorized = false;
927
	try {
928
		isAuthorized = isAuthorized(session, pid, Permission.READ);
929
	} catch (InvalidRequest e) {
930
		throw new ServiceFailure("1410", e.getDescription());
931
	}  
932
    if (!isAuthorized) {
933
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
934
    }
935
    
936
    SystemMetadata systemMetadata = null;
937
    Checksum checksum = null;
938
    
939
    try {
940
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
941

    
942
        if (systemMetadata == null ) {
943
            String error ="";
944
            String localId = null;
945
            try {
946
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
947
              
948
             } catch (Exception e) {
949
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
950
            }
951
            
952
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
953
                error = DELETEDMESSAGE;
954
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
955
                error = DELETEDMESSAGE;
956
            }
957
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
958
        }
959
        checksum = systemMetadata.getChecksum();
960
        
961
    } catch (RuntimeException e) {
962
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
963
            pid.getValue() + ". The error message was: " + e.getMessage());
964
      
965
    }
966
    
967
    return checksum;
968
  }
969

    
970
  /**
971
   * Resolve the location of a given object
972
   * 
973
   * @param session - the Session object containing the credentials for the Subject
974
   * @param pid - the object identifier for the given object
975
   * 
976
   * @return objectLocationList - the list of nodes known to contain the object
977
   * 
978
   * @throws InvalidToken
979
   * @throws ServiceFailure
980
   * @throws NotAuthorized
981
   * @throws NotFound
982
   * @throws NotImplemented
983
   */
984
  @Override
985
  public ObjectLocationList resolve(Session session, Identifier pid)
986
    throws InvalidToken, ServiceFailure, NotAuthorized,
987
    NotFound, NotImplemented {
988

    
989
    throw new NotImplemented("4131", "resolve not implemented");
990

    
991
  }
992

    
993
  /**
994
   * Metacat does not implement this method at the CN level
995
   */
996
  @Override
997
  public ObjectList search(Session session, String queryType, String query)
998
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
999
    NotImplemented {
1000

    
1001
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1002
	  
1003
//    ObjectList objectList = null;
1004
//    try {
1005
//        objectList = 
1006
//          IdentifierManager.getInstance().querySystemMetadata(
1007
//              null, //startTime, 
1008
//              null, //endTime,
1009
//              null, //objectFormat, 
1010
//              false, //replicaStatus, 
1011
//              0, //start, 
1012
//              1000 //count
1013
//              );
1014
//        
1015
//    } catch (Exception e) {
1016
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1017
//    }
1018
//
1019
//      return objectList;
1020
		  
1021
  }
1022
  
1023
  /**
1024
   * Returns the object format registered in the DataONE Object Format 
1025
   * Vocabulary for the given format identifier
1026
   * 
1027
   * @param fmtid - the identifier of the format requested
1028
   * 
1029
   * @return objectFormat - the object format requested
1030
   * 
1031
   * @throws ServiceFailure
1032
   * @throws NotFound
1033
   * @throws InsufficientResources
1034
   * @throws NotImplemented
1035
   */
1036
  @Override
1037
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1038
    throws ServiceFailure, NotFound, NotImplemented {
1039
     
1040
      return ObjectFormatService.getInstance().getFormat(fmtid);
1041
      
1042
  }
1043

    
1044
  /**
1045
   * Returns a list of all object formats registered in the DataONE Object 
1046
   * Format Vocabulary
1047
    * 
1048
   * @return objectFormatList - The list of object formats registered in 
1049
   *                            the DataONE Object Format Vocabulary
1050
   * 
1051
   * @throws ServiceFailure
1052
   * @throws NotImplemented
1053
   * @throws InsufficientResources
1054
   */
1055
  @Override
1056
  public ObjectFormatList listFormats() 
1057
    throws ServiceFailure, NotImplemented {
1058

    
1059
    return ObjectFormatService.getInstance().listFormats();
1060
  }
1061

    
1062
  /**
1063
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1064
    * 
1065
   * @return nodeList - List of nodes from the registry
1066
   * 
1067
   * @throws ServiceFailure
1068
   * @throws NotImplemented
1069
   */
1070
  @Override
1071
  public NodeList listNodes() 
1072
    throws NotImplemented, ServiceFailure {
1073

    
1074
    throw new NotImplemented("4800", "listNodes not implemented");
1075
  }
1076

    
1077
  /**
1078
   * Provides a mechanism for adding system metadata independently of its 
1079
   * associated object, such as when adding system metadata for data objects.
1080
    * 
1081
   * @param session - the Session object containing the credentials for the Subject
1082
   * @param pid - The identifier of the object to register the system metadata against
1083
   * @param sysmeta - The system metadata to be registered
1084
   * 
1085
   * @return true if the registration succeeds
1086
   * 
1087
   * @throws NotImplemented
1088
   * @throws NotAuthorized
1089
   * @throws ServiceFailure
1090
   * @throws InvalidRequest
1091
   * @throws InvalidSystemMetadata
1092
   */
1093
  @Override
1094
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1095
      SystemMetadata sysmeta) 
1096
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1097
      InvalidSystemMetadata {
1098

    
1099
      // The lock to be used for this identifier
1100
      Lock lock = null;
1101

    
1102
      // TODO: control who can call this?
1103
      if (session == null) {
1104
          //TODO: many of the thrown exceptions do not use the correct error codes
1105
          //check these against the docs and correct them
1106
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1107
                  "  If you are not logged in, please do so and retry the request.");
1108
      }
1109
      
1110
      // verify that guid == SystemMetadata.getIdentifier()
1111
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1112
          "|" + sysmeta.getIdentifier().getValue());
1113
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1114
          throw new InvalidRequest("4863", 
1115
              "The identifier in method call (" + pid.getValue() + 
1116
              ") does not match identifier in system metadata (" +
1117
              sysmeta.getIdentifier().getValue() + ").");
1118
      }
1119

    
1120
      try {
1121
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1122
          lock.lock();
1123
          logMetacat.debug("Locked identifier " + pid.getValue());
1124
          logMetacat.debug("Checking if identifier exists...");
1125
          // Check that the identifier does not already exist
1126
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1127
              throw new InvalidRequest("4863", 
1128
                  "The identifier is already in use by an existing object.");
1129
          
1130
          }
1131
          
1132
          // insert the system metadata into the object store
1133
          logMetacat.debug("Starting to insert SystemMetadata...");
1134
          try {
1135
              sysmeta.setSerialVersion(BigInteger.ONE);
1136
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1137
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1138
              
1139
          } catch (RuntimeException e) {
1140
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1141
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1142
                  e.getClass() + ": " + e.getMessage());
1143
              
1144
          }
1145
          
1146
      } catch (RuntimeException e) {
1147
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1148
                  e.getClass() + ": " + e.getMessage());
1149
          
1150
      }  finally {
1151
          lock.unlock();
1152
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1153
          
1154
      }
1155

    
1156
      
1157
      logMetacat.debug("Returning from registerSystemMetadata");
1158
      
1159
      try {
1160
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1161
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1162
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1163
    	          localId, "registerSystemMetadata");
1164
      } catch (McdbDocNotFoundException e) {
1165
    	  // do nothing, no localId to log with
1166
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1167
      } catch (SQLException ee) {
1168
          // do nothing, no localId to log with
1169
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1170
      }
1171
      
1172
      
1173
      return pid;
1174
  }
1175
  
1176
  /**
1177
   * Given an optional scope and format, reserves and returns an identifier 
1178
   * within that scope and format that is unique and will not be 
1179
   * used by any other sessions. 
1180
    * 
1181
   * @param session - the Session object containing the credentials for the Subject
1182
   * @param pid - The identifier of the object to register the system metadata against
1183
   * @param scope - An optional string to be used to qualify the scope of 
1184
   *                the identifier namespace, which is applied differently 
1185
   *                depending on the format requested. If scope is not 
1186
   *                supplied, a default scope will be used.
1187
   * @param format - The optional name of the identifier format to be used, 
1188
   *                  drawn from a DataONE-specific vocabulary of identifier 
1189
   *                 format names, including several common syntaxes such 
1190
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1191
   *                 format is not supplied by the caller, the CN service 
1192
   *                 will use a default identifier format, which may change 
1193
   *                 over time.
1194
   * 
1195
   * @return true if the registration succeeds
1196
   * 
1197
   * @throws InvalidToken
1198
   * @throws ServiceFailure
1199
   * @throws NotAuthorized
1200
   * @throws IdentifierNotUnique
1201
   * @throws NotImplemented
1202
   */
1203
  @Override
1204
  public Identifier reserveIdentifier(Session session, Identifier pid)
1205
  throws InvalidToken, ServiceFailure,
1206
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1207

    
1208
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1209
  }
1210
  
1211
  @Override
1212
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1213
  throws InvalidToken, ServiceFailure,
1214
        NotAuthorized, NotImplemented, InvalidRequest {
1215
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1216
  }
1217
  
1218
  /**
1219
    * Checks whether the pid is reserved by the subject in the session param
1220
    * If the reservation is held on the pid by the subject, we return true.
1221
    * 
1222
   * @param session - the Session object containing the Subject
1223
   * @param pid - The identifier to check
1224
   * 
1225
   * @return true if the reservation exists for the subject/pid
1226
   * 
1227
   * @throws InvalidToken
1228
   * @throws ServiceFailure
1229
   * @throws NotFound - when the pid is not found (in use or in reservation)
1230
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1231
   * @throws IdentifierNotUnique - when the pid is in use
1232
   * @throws NotImplemented
1233
   */
1234

    
1235
  @Override
1236
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1237
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1238
      NotImplemented, InvalidRequest {
1239
  
1240
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1241
  }
1242

    
1243
  /**
1244
   * Changes ownership (RightsHolder) of the specified object to the 
1245
   * subject specified by userId
1246
    * 
1247
   * @param session - the Session object containing the credentials for the Subject
1248
   * @param pid - Identifier of the object to be modified
1249
   * @param userId - The subject that will be taking ownership of the specified object.
1250
   *
1251
   * @return pid - the identifier of the modified object
1252
   * 
1253
   * @throws ServiceFailure
1254
   * @throws InvalidToken
1255
   * @throws NotFound
1256
   * @throws NotAuthorized
1257
   * @throws NotImplemented
1258
   * @throws InvalidRequest
1259
   */  
1260
  @Override
1261
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1262
      long serialVersion)
1263
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1264
      NotImplemented, InvalidRequest, VersionMismatch {
1265
      
1266
      // The lock to be used for this identifier
1267
      Lock lock = null;
1268

    
1269
      // get the subject
1270
      Subject subject = session.getSubject();
1271
      
1272
      // are we allowed to do this?
1273
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1274
          throw new NotAuthorized("4440", "not allowed by "
1275
                  + subject.getValue() + " on " + pid.getValue());
1276
          
1277
      }
1278
      
1279
      SystemMetadata systemMetadata = null;
1280
      try {
1281
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1282
          lock.lock();
1283
          logMetacat.debug("Locked identifier " + pid.getValue());
1284

    
1285
          try {
1286
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1287
              
1288
              // does the request have the most current system metadata?
1289
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1290
                 String msg = "The requested system metadata version number " + 
1291
                     serialVersion + " differs from the current version at " +
1292
                     systemMetadata.getSerialVersion().longValue() +
1293
                     ". Please get the latest copy in order to modify it.";
1294
                 throw new VersionMismatch("4443", msg);
1295
              }
1296
              
1297
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1298
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1299
              
1300
          }
1301
              
1302
          // set the new rights holder
1303
          systemMetadata.setRightsHolder(userId);
1304
          
1305
          // update the metadata
1306
          try {
1307
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1308
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1309
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1310
              notifyReplicaNodes(systemMetadata);
1311
              
1312
          } catch (RuntimeException e) {
1313
              throw new ServiceFailure("4490", e.getMessage());
1314
          
1315
          }
1316
          
1317
      } catch (RuntimeException e) {
1318
          throw new ServiceFailure("4490", e.getMessage());
1319
          
1320
      } finally {
1321
          lock.unlock();
1322
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1323
      
1324
      }
1325
      
1326
      return pid;
1327
  }
1328

    
1329
  /**
1330
   * Verify that a replication task is authorized by comparing the target node's
1331
   * Subject (from the X.509 certificate-derived Session) with the list of 
1332
   * subjects in the known, pending replication tasks map.
1333
   * 
1334
   * @param originatingNodeSession - Session information that contains the 
1335
   *                                 identity of the calling user
1336
   * @param targetNodeSubject - Subject identifying the target node
1337
   * @param pid - the identifier of the object to be replicated
1338
   * @param replicatePermission - the execute permission to be granted
1339
   * 
1340
   * @throws ServiceFailure
1341
   * @throws NotImplemented
1342
   * @throws InvalidToken
1343
   * @throws NotAuthorized
1344
   * @throws InvalidRequest
1345
   * @throws NotFound
1346
   */
1347
  @Override
1348
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1349
    Subject targetNodeSubject, Identifier pid) 
1350
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1351
    NotFound, InvalidRequest {
1352
    
1353
    boolean isAllowed = false;
1354
    SystemMetadata sysmeta = null;
1355
    NodeReference targetNode = null;
1356
    
1357
    try {
1358
      // get the target node reference from the nodes list
1359
      CNode cn = D1Client.getCN();
1360
      List<Node> nodes = cn.listNodes().getNodeList();
1361
      
1362
      if ( nodes != null ) {
1363
        for (Node node : nodes) {
1364
            
1365
        	if (node.getSubjectList() != null) {
1366
        		
1367
	            for (Subject nodeSubject : node.getSubjectList()) {
1368
	            	
1369
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1370
	                    targetNode = node.getIdentifier();
1371
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1372
	                    break;
1373
	                }
1374
	            }
1375
        	}
1376
            
1377
            if ( targetNode != null) { break; }
1378
        }
1379
        
1380
      } else {
1381
          String msg = "Couldn't get the node list from the CN";
1382
          logMetacat.debug(msg);
1383
          throw new ServiceFailure("4872", msg);
1384
          
1385
      }
1386
      
1387
      // can't find a node listed with the given subject
1388
      if ( targetNode == null ) {
1389
          String msg = "There is no Member Node registered with a node subject " +
1390
              "matching " + targetNodeSubject.getValue();
1391
          logMetacat.info(msg);
1392
          throw new NotAuthorized("4871", msg);
1393
          
1394
      }
1395
      
1396
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1397
      
1398
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1399

    
1400
      if ( sysmeta != null ) {
1401
          
1402
          List<Replica> replicaList = sysmeta.getReplicaList();
1403
          
1404
          if ( replicaList != null ) {
1405
              
1406
              // find the replica with the status set to 'requested'
1407
              for (Replica replica : replicaList) {
1408
                  ReplicationStatus status = replica.getReplicationStatus();
1409
                  NodeReference listedNode = replica.getReplicaMemberNode();
1410
                  if ( listedNode != null && targetNode != null ) {
1411
                      logMetacat.debug("Comparing " + listedNode.getValue()
1412
                              + " to " + targetNode.getValue());
1413
                      
1414
                      if (listedNode.getValue().equals(targetNode.getValue())
1415
                              && status.equals(ReplicationStatus.REQUESTED)) {
1416
                          isAllowed = true;
1417
                          break;
1418

    
1419
                      }
1420
                  }
1421
              }
1422
          }
1423
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1424
              "to replicate: " + isAllowed + " for " + pid.getValue());
1425

    
1426
          
1427
      } else {
1428
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1429
          " is null.");
1430
          String error ="";
1431
          String localId = null;
1432
          try {
1433
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1434
            
1435
           } catch (Exception e) {
1436
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1437
          }
1438
          
1439
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1440
              error = DELETEDMESSAGE;
1441
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1442
              error = DELETEDMESSAGE;
1443
          }
1444
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1445
          
1446
      }
1447

    
1448
    } catch (RuntimeException e) {
1449
    	  ServiceFailure sf = new ServiceFailure("4872", 
1450
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1451
                e.getMessage());
1452
    	  sf.initCause(e);
1453
        throw sf;
1454
        
1455
    }
1456
      
1457
    return isAllowed;
1458
    
1459
  }
1460

    
1461
  /**
1462
   * Adds a new object to the Node, where the object is a science metadata object.
1463
   * 
1464
   * @param session - the Session object containing the credentials for the Subject
1465
   * @param pid - The object identifier to be created
1466
   * @param object - the object bytes
1467
   * @param sysmeta - the system metadata that describes the object  
1468
   * 
1469
   * @return pid - the object identifier created
1470
   * 
1471
   * @throws InvalidToken
1472
   * @throws ServiceFailure
1473
   * @throws NotAuthorized
1474
   * @throws IdentifierNotUnique
1475
   * @throws UnsupportedType
1476
   * @throws InsufficientResources
1477
   * @throws InvalidSystemMetadata
1478
   * @throws NotImplemented
1479
   * @throws InvalidRequest
1480
   */
1481
  public Identifier create(Session session, Identifier pid, InputStream object,
1482
    SystemMetadata sysmeta) 
1483
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1484
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1485
    NotImplemented, InvalidRequest {
1486
                  
1487
      // The lock to be used for this identifier
1488
      Lock lock = null;
1489

    
1490
      try {
1491
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1492
          // are we allowed?
1493
          boolean isAllowed = false;
1494
          isAllowed = isAdminAuthorized(session);
1495
          
1496
          // additional check if it is the authoritative node if it is not the admin
1497
          if(!isAllowed) {
1498
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1499
          }
1500

    
1501
          // proceed if we're called by a CN
1502
          if ( isAllowed ) {
1503
              // create the coordinating node version of the document      
1504
              lock.lock();
1505
              logMetacat.debug("Locked identifier " + pid.getValue());
1506
              sysmeta.setSerialVersion(BigInteger.ONE);
1507
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1508
              //sysmeta.setArchived(false); // this is a create op, not update
1509
              
1510
              // the CN should have set the origin and authoritative member node fields
1511
              try {
1512
                  sysmeta.getOriginMemberNode().getValue();
1513
                  sysmeta.getAuthoritativeMemberNode().getValue();
1514
                  
1515
              } catch (NullPointerException npe) {
1516
                  throw new InvalidSystemMetadata("4896", 
1517
                      "Both the origin and authoritative member node identifiers need to be set.");
1518
                  
1519
              }
1520
              pid = super.create(session, pid, object, sysmeta);
1521

    
1522
          } else {
1523
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1524
                  " isn't allowed to call create() on a Coordinating Node.";
1525
              logMetacat.info(msg);
1526
              throw new NotAuthorized("1100", msg);
1527
          }
1528
          
1529
      } catch (RuntimeException e) {
1530
          // Convert Hazelcast runtime exceptions to service failures
1531
          String msg = "There was a problem creating the object identified by " +
1532
              pid.getValue() + ". There error message was: " + e.getMessage();
1533
          throw new ServiceFailure("4893", msg);
1534
          
1535
      } finally {
1536
    	  if (lock != null) {
1537
	          lock.unlock();
1538
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1539
    	  }
1540
      }
1541
      
1542
      return pid;
1543

    
1544
  }
1545

    
1546
  /**
1547
   * Set access for a given object using the object identifier and a Subject
1548
   * under a given Session.
1549
   * 
1550
   * @param session - the Session object containing the credentials for the Subject
1551
   * @param pid - the object identifier for the given object to apply the policy
1552
   * @param policy - the access policy to be applied
1553
   * 
1554
   * @return true if the application of the policy succeeds
1555
   * @throws InvalidToken
1556
   * @throws ServiceFailure
1557
   * @throws NotFound
1558
   * @throws NotAuthorized
1559
   * @throws NotImplemented
1560
   * @throws InvalidRequest
1561
   */
1562
  public boolean setAccessPolicy(Session session, Identifier pid, 
1563
      AccessPolicy accessPolicy, long serialVersion) 
1564
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1565
      NotImplemented, InvalidRequest, VersionMismatch {
1566
      
1567
      // The lock to be used for this identifier
1568
      Lock lock = null;
1569
      SystemMetadata systemMetadata = null;
1570
      
1571
      boolean success = false;
1572
      
1573
      // get the subject
1574
      Subject subject = session.getSubject();
1575
      
1576
      // are we allowed to do this?
1577
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1578
          throw new NotAuthorized("4420", "not allowed by "
1579
                  + subject.getValue() + " on " + pid.getValue());
1580
      }
1581
      
1582
      try {
1583
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1584
          lock.lock();
1585
          logMetacat.debug("Locked identifier " + pid.getValue());
1586

    
1587
          try {
1588
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1589

    
1590
              if ( systemMetadata == null ) {
1591
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1592
                  
1593
              }
1594
              // does the request have the most current system metadata?
1595
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1596
                 String msg = "The requested system metadata version number " + 
1597
                     serialVersion + " differs from the current version at " +
1598
                     systemMetadata.getSerialVersion().longValue() +
1599
                     ". Please get the latest copy in order to modify it.";
1600
                 throw new VersionMismatch("4402", msg);
1601
                 
1602
              }
1603
              
1604
          } catch (RuntimeException e) {
1605
              // convert Hazelcast RuntimeException to NotFound
1606
              throw new NotFound("4400", "No record found for: " + pid);
1607
            
1608
          }
1609
              
1610
          // set the access policy
1611
          systemMetadata.setAccessPolicy(accessPolicy);
1612
          
1613
          // update the system metadata
1614
          try {
1615
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1616
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1617
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1618
              notifyReplicaNodes(systemMetadata);
1619
              
1620
          } catch (RuntimeException e) {
1621
              // convert Hazelcast RuntimeException to ServiceFailure
1622
              throw new ServiceFailure("4430", e.getMessage());
1623
            
1624
          }
1625
          
1626
      } catch (RuntimeException e) {
1627
          throw new ServiceFailure("4430", e.getMessage());
1628
          
1629
      } finally {
1630
          lock.unlock();
1631
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1632
        
1633
      }
1634

    
1635
    
1636
    // TODO: how do we know if the map was persisted?
1637
    success = true;
1638
    
1639
    return success;
1640
  }
1641

    
1642
  /**
1643
   * Full replacement of replication metadata in the system metadata for the 
1644
   * specified object, changes date system metadata modified
1645
   * 
1646
   * @param session - the Session object containing the credentials for the Subject
1647
   * @param pid - the object identifier for the given object to apply the policy
1648
   * @param replica - the replica to be updated
1649
   * @return
1650
   * @throws NotImplemented
1651
   * @throws NotAuthorized
1652
   * @throws ServiceFailure
1653
   * @throws InvalidRequest
1654
   * @throws NotFound
1655
   * @throws VersionMismatch
1656
   */
1657
  @Override
1658
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1659
      Replica replica, long serialVersion) 
1660
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1661
      NotFound, VersionMismatch {
1662
      
1663
      // The lock to be used for this identifier
1664
      Lock lock = null;
1665
      
1666
      // get the subject
1667
      Subject subject = session.getSubject();
1668
      
1669
      // are we allowed to do this?
1670
      try {
1671

    
1672
          // what is the controlling permission?
1673
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1674
              throw new NotAuthorized("4851", "not allowed by "
1675
                      + subject.getValue() + " on " + pid.getValue());
1676
          }
1677

    
1678
        
1679
      } catch (InvalidToken e) {
1680
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1681
                  " on " + pid.getValue());  
1682
          
1683
      }
1684

    
1685
      SystemMetadata systemMetadata = null;
1686
      try {
1687
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1688
          lock.lock();
1689
          logMetacat.debug("Locked identifier " + pid.getValue());
1690

    
1691
          try {      
1692
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1693

    
1694
              // does the request have the most current system metadata?
1695
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1696
                 String msg = "The requested system metadata version number " + 
1697
                     serialVersion + " differs from the current version at " +
1698
                     systemMetadata.getSerialVersion().longValue() +
1699
                     ". Please get the latest copy in order to modify it.";
1700
                 throw new VersionMismatch("4855", msg);
1701
              }
1702
              
1703
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1704
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1705
                  " : " + e.getMessage());
1706
            
1707
          }
1708
              
1709
          // set the status for the replica
1710
          List<Replica> replicas = systemMetadata.getReplicaList();
1711
          NodeReference replicaNode = replica.getReplicaMemberNode();
1712
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1713
          int index = 0;
1714
          for (Replica listedReplica: replicas) {
1715
              
1716
              // remove the replica that we are replacing
1717
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1718
                      // don't allow status to change from COMPLETED to anything other
1719
                      // than INVALIDATED: prevents overwrites from race conditions
1720
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1721
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1722
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1723
                	  throw new InvalidRequest("4853", "Status state change from " +
1724
                			  listedReplica.getReplicationStatus() + " to " +
1725
                			  replicaStatus.toString() + "is prohibited for identifier " +
1726
                			  pid.getValue() + " and target node " + 
1727
                			  listedReplica.getReplicaMemberNode().getValue());
1728

    
1729
            	  }
1730
                  replicas.remove(index);
1731
                  break;
1732
                  
1733
              }
1734
              index++;
1735
          }
1736
          
1737
          // add the new replica item
1738
          replicas.add(replica);
1739
          systemMetadata.setReplicaList(replicas);
1740
          
1741
          // update the metadata
1742
          try {
1743
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1744
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1745
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1746
              
1747
              // inform replica nodes of the change if the status is complete
1748
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1749
            	  notifyReplicaNodes(systemMetadata);
1750
            	  
1751
              }
1752
          } catch (RuntimeException e) {
1753
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1754
              throw new ServiceFailure("4852", e.getMessage());
1755
          
1756
          }
1757
          
1758
      } catch (RuntimeException e) {
1759
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1760
          throw new ServiceFailure("4852", e.getMessage());
1761
      
1762
      } finally {
1763
          lock.unlock();
1764
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1765
          
1766
      }
1767
    
1768
      return true;
1769
      
1770
  }
1771
  
1772
  /**
1773
   * 
1774
   */
1775
  @Override
1776
  public ObjectList listObjects(Session session, Date startTime, 
1777
      Date endTime, ObjectFormatIdentifier formatid, Identifier identifier, Boolean replicaStatus,
1778
      Integer start, Integer count)
1779
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1780
      ServiceFailure {
1781
      
1782
      ObjectList objectList = null;
1783
        try {
1784
        	if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
1785
            	count = MAXIMUM_DB_RECORD_COUNT;
1786
            }
1787
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1788
        } catch (Exception e) {
1789
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1790
        }
1791

    
1792
        return objectList;
1793
  }
1794

    
1795
  
1796
 	/**
1797
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1798
 	 * @return cal  the list of checksum algorithms
1799
 	 * 
1800
 	 * @throws ServiceFailure
1801
 	 * @throws NotImplemented
1802
 	 */
1803
  @Override
1804
  public ChecksumAlgorithmList listChecksumAlgorithms()
1805
			throws ServiceFailure, NotImplemented {
1806
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1807
		cal.addAlgorithm("MD5");
1808
		cal.addAlgorithm("SHA-1");
1809
		return cal;
1810
		
1811
	}
1812

    
1813
  /**
1814
   * Notify replica Member Nodes of system metadata changes for a given pid
1815
   * 
1816
   * @param currentSystemMetadata - the up to date system metadata
1817
   */
1818
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1819
      
1820
      Session session = null;
1821
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1822
      MNode mn = null;
1823
      NodeReference replicaNodeRef = null;
1824
      CNode cn = null;
1825
      NodeType nodeType = null;
1826
      List<Node> nodeList = null;
1827
      
1828
      try {
1829
          cn = D1Client.getCN();
1830
          nodeList = cn.listNodes().getNodeList();
1831
          
1832
      } catch (Exception e) { // handle BaseException and other I/O issues
1833
          
1834
          // swallow errors since the call is not critical
1835
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1836
              "to communication issues with the CN: " + e.getMessage());
1837
          
1838
      }
1839
      
1840
      if ( replicaList != null ) {
1841
          
1842
          // iterate through the replicas and inform  MN replica nodes
1843
          for (Replica replica : replicaList) {
1844
              
1845
              replicaNodeRef = replica.getReplicaMemberNode();
1846
              try {
1847
                  if (nodeList != null) {
1848
                      // find the node type
1849
                      for (Node node : nodeList) {
1850
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
1851
                              nodeType = node.getType();
1852
                              break;
1853
              
1854
                          }
1855
                      }
1856
                  }
1857
              
1858
                  // notify only MNs
1859
                  if (nodeType != null && nodeType == NodeType.MN) {
1860
                      mn = D1Client.getMN(replicaNodeRef);
1861
                      mn.systemMetadataChanged(session, 
1862
                          currentSystemMetadata.getIdentifier(), 
1863
                          currentSystemMetadata.getSerialVersion().longValue(),
1864
                          currentSystemMetadata.getDateSysMetadataModified());
1865
                  }
1866
              
1867
              } catch (Exception e) { // handle BaseException and other I/O issues
1868
              
1869
                  // swallow errors since the call is not critical
1870
                  logMetacat.error("Can't inform "
1871
                          + replicaNodeRef.getValue()
1872
                          + " of system metadata changes due "
1873
                          + "to communication issues with the CN: "
1874
                          + e.getMessage());
1875
              
1876
              }
1877
          }
1878
      }
1879
  }
1880

    
1881
	@Override
1882
	public QueryEngineDescription getQueryEngineDescription(Session session,
1883
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
1884
			NotImplemented, NotFound {
1885
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1886

    
1887
	}
1888
	
1889
	@Override
1890
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
1891
			ServiceFailure, NotAuthorized, NotImplemented {
1892
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1893

    
1894
	}
1895
	
1896
	@Override
1897
	public InputStream query(Session session, String queryEngine, String query)
1898
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1899
			NotImplemented, NotFound {
1900
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1901

    
1902
	}
1903
	
1904
	@Override
1905
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
1906
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
1907
	}
1908
	
1909
	
1910
}
(1-1/7)