Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.PipedInputStream;
29
import java.io.PipedOutputStream;
30
import java.lang.reflect.InvocationTargetException;
31
import java.math.BigInteger;
32
import java.sql.SQLException;
33
import java.util.ArrayList;
34
import java.util.Calendar;
35
import java.util.Date;
36
import java.util.List;
37
import java.util.concurrent.locks.Lock;
38

    
39
import javax.servlet.http.HttpServletRequest;
40

    
41
import org.apache.commons.beanutils.BeanUtils;
42
import org.apache.commons.io.IOUtils;
43
import org.apache.log4j.Logger;
44
import org.dataone.client.v2.CNode;
45
import org.dataone.client.v2.MNode;
46
import org.dataone.client.v2.itk.D1Client;
47
import org.dataone.cn.index.messaging.IndexTaskMessagingClient;
48
import org.dataone.cn.index.messaging.IndexTaskMessagingClientFactory;
49
import org.dataone.cn.index.task.IndexTask;
50
import org.dataone.cn.index.task.IndexTaskGenerator;
51
import org.dataone.exceptions.MarshallingException;
52
import org.dataone.service.cn.v2.CNAuthorization;
53
import org.dataone.service.cn.v2.CNCore;
54
import org.dataone.service.cn.v2.CNRead;
55
import org.dataone.service.cn.v2.CNReplication;
56
import org.dataone.service.cn.v2.CNView;
57
import org.dataone.service.exceptions.BaseException;
58
import org.dataone.service.exceptions.IdentifierNotUnique;
59
import org.dataone.service.exceptions.InsufficientResources;
60
import org.dataone.service.exceptions.InvalidRequest;
61
import org.dataone.service.exceptions.InvalidSystemMetadata;
62
import org.dataone.service.exceptions.InvalidToken;
63
import org.dataone.service.exceptions.NotAuthorized;
64
import org.dataone.service.exceptions.NotFound;
65
import org.dataone.service.exceptions.NotImplemented;
66
import org.dataone.service.exceptions.ServiceFailure;
67
import org.dataone.service.exceptions.UnsupportedType;
68
import org.dataone.service.exceptions.VersionMismatch;
69
import org.dataone.service.types.v1.AccessPolicy;
70
import org.dataone.service.types.v1.Checksum;
71
import org.dataone.service.types.v1.ChecksumAlgorithmList;
72
import org.dataone.service.types.v1.Event;
73
import org.dataone.service.types.v1.Identifier;
74
import org.dataone.service.types.v1.NodeReference;
75
import org.dataone.service.types.v1.NodeType;
76
import org.dataone.service.types.v1.ObjectFormatIdentifier;
77
import org.dataone.service.types.v1.ObjectList;
78
import org.dataone.service.types.v1.ObjectLocationList;
79
import org.dataone.service.types.v1.Permission;
80
import org.dataone.service.types.v1.Replica;
81
import org.dataone.service.types.v1.ReplicationPolicy;
82
import org.dataone.service.types.v1.ReplicationStatus;
83
import org.dataone.service.types.v1.Session;
84
import org.dataone.service.types.v1.Subject;
85
import org.dataone.service.types.v1_1.QueryEngineDescription;
86
import org.dataone.service.types.v1_1.QueryEngineList;
87
import org.dataone.service.types.v2.Node;
88
import org.dataone.service.types.v2.NodeList;
89
import org.dataone.service.types.v2.ObjectFormat;
90
import org.dataone.service.types.v2.ObjectFormatList;
91
import org.dataone.service.types.v2.SystemMetadata;
92
import org.dataone.service.util.TypeMarshaller;
93

    
94
import edu.ucsb.nceas.metacat.DBUtil;
95
import edu.ucsb.nceas.metacat.EventLog;
96
import edu.ucsb.nceas.metacat.IdentifierManager;
97
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
98
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
99
import edu.ucsb.nceas.metacat.properties.PropertyService;
100
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
101

    
102
/**
103
 * Represents Metacat's implementation of the DataONE Coordinating Node 
104
 * service API. Methods implement the various CN* interfaces, and methods common
105
 * to both Member Node and Coordinating Node interfaces are found in the
106
 * D1NodeService super class.
107
 *
108
 */
109
public class CNodeService extends D1NodeService implements CNAuthorization,
110
    CNCore, CNRead, CNReplication, CNView {
111

    
112
  /* the logger instance */
113
  private Logger logMetacat = null;
114
  public final static String V2V1MISSMATCH = "The Coordinating Node is not authorized to make systemMetadata changes on this object. Please make changes directly on the authoritative Member Node.";
115

    
116
  private IndexTaskMessagingClient indexTaskClient = null;
117
  /**
118
   * singleton accessor
119
   */
120
  public static CNodeService getInstance(HttpServletRequest request) { 
121
    return new CNodeService(request);
122
  }
123
  
124
  /**
125
   * Constructor, private for singleton access
126
   */
127
  private CNodeService(HttpServletRequest request) {
128
    super(request);
129
    logMetacat = Logger.getLogger(CNodeService.class);
130
    try {
131
        indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
132
    } catch (Exception e) {
133
        logMetacat.warn("CNodeService.constructorr - the client for sumbitting the index tasks couldn't be initialized since "+e.getMessage(), e);
134
    }
135
  }
136
  
137
 
138
  
139
  /*
140
   * Submit a index task to the message broker.
141
   */
142
  private void submitAddIndexTask(SystemMetadata sysmeta, String methodName, boolean isData)  {
143
      try {
144
          if(indexTaskClient == null) {
145
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
146
          }
147
          String objectURI = null;
148
          if(!isData) {
149
              String localId = IdentifierManager.getInstance().getLocalId(sysmeta.getIdentifier().getValue());
150
              objectURI = IdentifierManager.getInstance().getObjectFilePath(localId, isScienceMetadata(sysmeta));
151
          }
152
          IndexTaskGenerator generator = new IndexTaskGenerator();
153
          //The objectURI will be null for data file
154
          IndexTask task = generator.generateAddTask(sysmeta, objectURI);
155
          indexTaskClient.submit(task);
156
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
157
      } catch (Exception e) {
158
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
159
      }
160
  }
161
  
162
  /*
163
   * Submit an update index task to the message broker.
164
   */
165
  private void submitUpdateIndexTask(SystemMetadata sysmeta, String methodName) {
166
      try {
167
          if(indexTaskClient == null) {
168
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
169
          }
170
          String localId = IdentifierManager.getInstance().getLocalId(sysmeta.getIdentifier().getValue());
171
          String objectURI = IdentifierManager.getInstance().getObjectFilePath(localId, isScienceMetadata(sysmeta));
172
          IndexTaskGenerator generator = new IndexTaskGenerator();
173
          IndexTask task = generator.generateUpdateTask(sysmeta, objectURI);
174
          indexTaskClient.submit(task);
175
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
176
      } catch (Exception e) {
177
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
178
      }
179
  }
180
  
181
  /*
182
   * Submit a delete index task to the message broker.
183
   */
184
  private void submitDeleteIndexTask(SystemMetadata sysmeta, String methodName) {
185
      try {
186
          if(indexTaskClient == null) {
187
              indexTaskClient = IndexTaskMessagingClientManager.getInstance().getMessagingClient();
188
          }
189
          IndexTaskGenerator generator = new IndexTaskGenerator();
190
          IndexTask task = generator.generateDeleteTask(sysmeta);
191
          indexTaskClient.submit(task);
192
          logMetacat.info("CNodeService."+methodName+" - Metacat successfully submitted the index task for the object "+sysmeta.getIdentifier().getValue()+" to the message broker. But it is NOT guarranteed that the broker can get it.");
193
      } catch (Exception e) {
194
          logMetacat.warn("CNodeService."+methodName+" - the index task for the object "+sysmeta.getIdentifier().getValue()+" failed to be submitted to the message broker since "+e.getMessage(), e);
195
      }
196
  }
197
    
198
  /**
199
   * Set the replication policy for an object given the object identifier
200
   * It only is applied to objects whose authoritative mn is a v1 node.
201
   * @param session - the Session object containing the credentials for the Subject
202
   * @param pid - the object identifier for the given object
203
   * @param policy - the replication policy to be applied
204
   * 
205
   * @return true or false
206
   * 
207
   * @throws NotImplemented
208
   * @throws NotAuthorized
209
   * @throws ServiceFailure
210
   * @throws InvalidRequest
211
   * @throws VersionMismatch
212
   * 
213
   */
214
  @Override
215
  public boolean setReplicationPolicy(Session session, Identifier pid,
216
      ReplicationPolicy policy, long serialVersion) 
217
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
218
      InvalidRequest, InvalidToken, VersionMismatch {
219
      
220
      // do we have a valid pid?
221
      if (pid == null || pid.getValue().trim().equals("")) {
222
          throw new InvalidRequest("4883", "The provided identifier was invalid.");
223
          
224
      }
225
      
226
      //only allow pid to be passed
227
      String serviceFailure = "4882";
228
      String notFound = "4884";
229
      checkV1SystemMetaPidExist(pid, serviceFailure, "The object for given PID "+pid.getValue()+" couldn't be identified if it exists",  notFound, 
230
              "No object could be found for given PID: "+pid.getValue());
231
      
232
      /*String serviceFailureCode = "4882";
233
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
234
      if(sid != null) {
235
          pid = sid;
236
      }*/
237
      // The lock to be used for this identifier
238
      Lock lock = null;
239
      
240
      // get the subject
241
      Subject subject = session.getSubject();
242
      
243
      // are we allowed to do this?
244
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
245
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
246
                  + " not allowed by " + subject.getValue() + " on "
247
                  + pid.getValue());
248
          
249
      }
250
      
251
      SystemMetadata systemMetadata = null;
252
      try {
253
          lock = HazelcastService.getInstance().getLock(pid.getValue());
254
          lock.lock();
255
          logMetacat.debug("Locked identifier " + pid.getValue());
256

    
257
          try {
258
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
259
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
260
                  
261
              }
262
              
263
              // did we get it correctly?
264
              if ( systemMetadata == null ) {
265
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
266
                  
267
              }
268
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
269
              String version = checker.getVersion("MNRead");
270
              if(version == null) {
271
                  throw new ServiceFailure("4882", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
272
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
273
                  //we don't apply this method to an object whose authoritative node is v2
274
                  throw new NotAuthorized("4881", V2V1MISSMATCH);
275
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
276
                  //we don't understand this version (it is not v1 or v2)
277
                  throw new InvalidRequest("4883", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
278
              }
279
              // does the request have the most current system metadata?
280
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
281
                 String msg = "The requested system metadata version number " + 
282
                     serialVersion + " differs from the current version at " +
283
                     systemMetadata.getSerialVersion().longValue() +
284
                     ". Please get the latest copy in order to modify it.";
285
                 throw new VersionMismatch("4886", msg);
286
                 
287
              }
288
              
289
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
290
              throw new NotFound("4884", "No record found for: " + pid.getValue());
291
            
292
          }
293
          
294
          // set the new policy
295
          systemMetadata.setReplicationPolicy(policy);
296
          
297
          // update the metadata
298
          try {
299
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
300
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
301
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
302
              notifyReplicaNodes(systemMetadata);
303
              
304
          } catch (RuntimeException e) {
305
              throw new ServiceFailure("4882", e.getMessage());
306
          
307
          }
308
          
309
      } catch (RuntimeException e) {
310
          throw new ServiceFailure("4882", e.getMessage());
311
          
312
      } finally {
313
          lock.unlock();
314
          logMetacat.debug("Unlocked identifier " + pid.getValue());
315
          
316
      }
317
    
318
      return true;
319
  }
320

    
321
  /**
322
   * Deletes the replica from the given Member Node
323
   * NOTE: MN.delete() may be an "archive" operation. TBD.
324
   * @param session
325
   * @param pid
326
   * @param nodeId
327
   * @param serialVersion
328
   * @return
329
   * @throws InvalidToken
330
   * @throws ServiceFailure
331
   * @throws NotAuthorized
332
   * @throws NotFound
333
   * @throws NotImplemented
334
   * @throws VersionMismatch
335
   */
336
  @Override
337
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
338
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
339
	  
340
	  	// The lock to be used for this identifier
341
		Lock lock = null;
342

    
343
		// get the subject
344
		Subject subject = session.getSubject();
345

    
346
		// are we allowed to do this?
347
		/*boolean isAuthorized = false;
348
		try {
349
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
350
		} catch (InvalidRequest e) {
351
			throw new ServiceFailure("4882", e.getDescription());
352
		}
353
		if (!isAuthorized) {
354
			throw new NotAuthorized("4881", Permission.WRITE
355
					+ " not allowed by " + subject.getValue() + " on "
356
					+ pid.getValue());
357

    
358
		}*/
359
		if(session == null) {
360
		    throw new NotAuthorized("4882", "Session cannot be null. It is not authorized for deleting the replication metadata of the object "+pid.getValue());
361
		} else {
362
		    if(!isCNAdmin(session)) {
363
		        throw new NotAuthorized("4882", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for deleting the replication metadata of the object "+pid.getValue());
364
		    }
365
		}
366

    
367
		SystemMetadata systemMetadata = null;
368
		try {
369
			lock = HazelcastService.getInstance().getLock(pid.getValue());
370
			lock.lock();
371
			logMetacat.debug("Locked identifier " + pid.getValue());
372

    
373
			try {
374
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
375
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
376
				}
377

    
378
				// did we get it correctly?
379
				if (systemMetadata == null) {
380
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
381
				}
382

    
383
				// does the request have the most current system metadata?
384
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
385
					String msg = "The requested system metadata version number "
386
							+ serialVersion
387
							+ " differs from the current version at "
388
							+ systemMetadata.getSerialVersion().longValue()
389
							+ ". Please get the latest copy in order to modify it.";
390
					throw new VersionMismatch("4886", msg);
391

    
392
				}
393

    
394
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
395
				throw new NotFound("4884", "No record found for: " + pid.getValue());
396

    
397
			}
398
			  
399
			// check permissions
400
			// TODO: is this necessary?
401
			/*List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
402
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
403
			if (!isAllowed) {
404
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
405
			}*/
406
			  
407
			// delete the replica from the given node
408
			// CSJ: use CN.delete() to truly delete a replica, semantically
409
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
410
			//D1Client.getMN(nodeId).delete(session, pid);
411
			  
412
			// reflect that change in the system metadata
413
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
414
			for (Replica r: systemMetadata.getReplicaList()) {
415
				  if (r.getReplicaMemberNode().equals(nodeId)) {
416
					  updatedReplicas.remove(r);
417
					  break;
418
				  }
419
			}
420
			systemMetadata.setReplicaList(updatedReplicas);
421

    
422
			// update the metadata
423
			try {
424
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
425
				//we don't need to update the modification date.
426
				//systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
427
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
428
			} catch (RuntimeException e) {
429
				throw new ServiceFailure("4882", e.getMessage());
430
			}
431

    
432
		} catch (RuntimeException e) {
433
			throw new ServiceFailure("4882", e.getMessage());
434
		} finally {
435
			lock.unlock();
436
			logMetacat.debug("Unlocked identifier " + pid.getValue());
437
		}
438

    
439
		return true;	  
440
	  
441
  }
442
  
443
  /**
444
   * Deletes an object from the Coordinating Node
445
   * 
446
   * @param session - the Session object containing the credentials for the Subject
447
   * @param pid - The object identifier to be deleted
448
   * 
449
   * @return pid - the identifier of the object used for the deletion
450
   * 
451
   * @throws InvalidToken
452
   * @throws ServiceFailure
453
   * @throws NotAuthorized
454
   * @throws NotFound
455
   * @throws NotImplemented
456
   * @throws InvalidRequest
457
   */
458
  @Override
459
  public Identifier delete(Session session, Identifier pid) 
460
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
461
      
462
      String localId = null;      // The corresponding docid for this pid
463
	  Lock lock = null;           // The lock to be used for this identifier
464
      CNode cn = null;            // a reference to the CN to get the node list    
465
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
466
      List<Node> nodeList = null; // the list of nodes in this CN environment
467

    
468
      // check for a valid session
469
      if (session == null) {
470
        	throw new InvalidToken("4963", "No session has been provided");
471
        	
472
      }
473

    
474
      // do we have a valid pid?
475
      if (pid == null || pid.getValue().trim().equals("")) {
476
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
477
          
478
      }
479
      
480
      String serviceFailureCode = "4962";
481
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
482
      if(sid != null) {
483
          pid = sid;
484
      }
485

    
486
	  // check that it is CN/admin
487
	  boolean allowed = isAdminAuthorized(session);
488
	  
489
	  // additional check if it is the authoritative node if it is not the admin
490
      if(!allowed) {
491
          allowed = isAuthoritativeMNodeAdmin(session, pid);
492
          
493
      }
494
	  
495
	  if (!allowed) {
496
		  String msg = "The subject " + session.getSubject().getValue() + 
497
			  " is not allowed to call delete() on a Coordinating Node.";
498
		  logMetacat.info(msg);
499
		  throw new NotAuthorized("4960", msg);
500
		  
501
	  }
502
	  
503
	  // Don't defer to superclass implementation without a locally registered identifier
504
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
505
      // Check for the existing identifier
506
      try {
507
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
508
          super.delete(session, pid);
509
          
510
      } catch (McdbDocNotFoundException e) {
511
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
512
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
513
    	  
514
          try {
515
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
516
  			  lock.lock();
517
  			  logMetacat.debug("Locked identifier " + pid.getValue());
518

    
519
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
520
			  if ( sysMeta != null ) {
521
				/*sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
522
				sysMeta.setArchived(true);
523
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
524
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);*/
525
			    //move the systemmetadata object from the map and delete the records in the systemmetadata database table
526
	            //since this is cn, we don't need worry about the mn solr index.
527
	            HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
528
	            HazelcastService.getInstance().getIdentifiers().remove(pid);
529
	            String username = session.getSubject().getValue();//just for logging purpose
530
                //since data objects were not registered in the identifier table, we use pid as the docid
531
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, pid.getValue(), Event.DELETE.xmlValue());
532
				
533
			  } else {
534
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
535
					  ". Couldn't obtain the system metadata record.");
536
				  
537
			  }
538
			  
539
		  } catch (RuntimeException re) {
540
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
541
				  ". The error message was: " + re.getMessage());
542
			  
543
		  } finally {
544
			  lock.unlock();
545
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
546

    
547
		  }
548

    
549
          // NOTE: cannot log the delete without localId
550
//          EventLog.getInstance().log(request.getRemoteAddr(), 
551
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
552
//                  pid.getValue(), Event.DELETE.xmlValue());
553

    
554
      } catch (SQLException e) {
555
          throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
556
                  ". The local id of the object with the identifier can't be identified since " + e.getMessage());
557
      }
558

    
559
      // get the node list
560
      try {
561
          cn = D1Client.getCN();
562
          nodeList = cn.listNodes().getNodeList();
563
          
564
      } catch (Exception e) { // handle BaseException and other I/O issues
565
          
566
          // swallow errors since the call is not critical
567
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
568
              " due to communication issues with the CN: " + e.getMessage());
569
          
570
      }
571

    
572
	  // notify the replicas
573
	  if (systemMetadata.getReplicaList() != null) {
574
		  for (Replica replica: systemMetadata.getReplicaList()) {
575
			  NodeReference replicaNode = replica.getReplicaMemberNode();
576
			  try {
577
                  if (nodeList != null) {
578
                      // find the node type
579
                      for (Node node : nodeList) {
580
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
581
                              nodeType = node.getType();
582
                              break;
583
              
584
                          }
585
                      }
586
                  }
587
                  
588
                  // only send call MN.delete() to avoid an infinite loop with the CN
589
                  if (nodeType != null && nodeType == NodeType.MN) {
590
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
591
                  }
592
                  
593
			  } catch (Exception e) {
594
				  // all we can really do is log errors and carry on with life
595
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
596
					  " from replica MN: " + replicaNode.getValue(), e);
597
			}
598
			  
599
		  }
600
	  }
601
	  
602
	  return pid;
603
      
604
  }
605
  
606
  /**
607
   * Archives an object from the Coordinating Node
608
   * 
609
   * @param session - the Session object containing the credentials for the Subject
610
   * @param pid - The object identifier to be deleted
611
   * 
612
   * @return pid - the identifier of the object used for the deletion
613
   * 
614
   * @throws InvalidToken
615
   * @throws ServiceFailure
616
   * @throws NotAuthorized
617
   * @throws NotFound
618
   * @throws NotImplemented
619
   * @throws InvalidRequest
620
   */
621
  @Override
622
  public Identifier archive(Session session, Identifier pid) 
623
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
624

    
625
      String localId = null; // The corresponding docid for this pid
626
	  Lock lock = null;      // The lock to be used for this identifier
627
      CNode cn = null;            // a reference to the CN to get the node list    
628
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
629
      List<Node> nodeList = null; // the list of nodes in this CN environment
630
      
631

    
632
      // check for a valid session
633
      if (session == null) {
634
        	throw new InvalidToken("4973", "No session has been provided");
635
        	
636
      }
637

    
638
      // do we have a valid pid?
639
      if (pid == null || pid.getValue().trim().equals("")) {
640
          throw new InvalidToken("4973", "The provided identifier was invalid.");
641
          
642
      }
643

    
644
	  // check that it is CN/admin
645
	  boolean allowed = isAdminAuthorized(session);
646
	  
647
	  String serviceFailureCode = "4972";
648
	  Identifier sid = getPIDForSID(pid, serviceFailureCode);
649
	  if(sid != null) {
650
	        pid = sid;
651
	  }
652
	  
653
	  //check if it is the authoritative member node
654
	  if(!allowed) {
655
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
656
	  }
657
	  
658
	  //check if the session has the change permission
659
	  if(!allowed) {
660
	      try {
661
	          allowed = userHasPermission(session, pid, Permission.CHANGE_PERMISSION);
662
	      } catch (InvalidRequest e) {
663
	          throw new InvalidToken("4973","CN.archive method couldn't determine if the client has the change permssion on this pid "+pid.getValue()+ " since "+e.getMessage());
664
	      }
665
	      
666
	  }
667
	  if (!allowed) {
668
		  String msg = "The subject " + session.getSubject().getValue() + 
669
				  " doesn't have the change permission to archive the object "+pid.getValue();
670
		  logMetacat.warn(msg);
671
		  throw new NotAuthorized("4970", msg);
672
	  }
673
	  
674
      try {
675
          HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
676
          logMetacat.debug("CNodeService.archive - lock the system metadata for "+pid.getValue());
677
          SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
678
          D1NodeVersionChecker checker = new D1NodeVersionChecker(sysMeta.getAuthoritativeMemberNode());
679
          String version = checker.getVersion("MNRead");
680
          if(version == null) {
681
              throw new ServiceFailure("4972", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
682
          } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
683
              //we don't apply this method to an object whose authoritative node is v2
684
              throw new NotAuthorized("4970", V2V1MISSMATCH);
685
          } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
686
              //we don't understand this version (it is not v1 or v2)
687
              throw new NotImplemented("4974", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
688
          }
689
          boolean needModifyDate = true;
690
          archiveCNObjectWithNotificationReplica(session, pid, sysMeta, needModifyDate);
691
      
692
      } finally {
693
          HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
694
          logMetacat.debug("CNodeService.archive - unlock the system metadata for "+pid.getValue());
695
      }
696

    
697
	  return pid;
698
      
699
  }
700
  
701
  
702
  /**
703
   * Archive a object on cn and notify the replica. This method doesn't lock the system metadata map. The caller should lock it.
704
   * This method doesn't check the authorization; this method only accept a pid.
705
   * @param session
706
   * @param pid
707
   * @param sysMeta
708
   * @param notifyReplica
709
   * @return
710
   * @throws InvalidToken
711
   * @throws ServiceFailure
712
   * @throws NotAuthorized
713
   * @throws NotFound
714
   * @throws NotImplemented
715
   */
716
  private Identifier archiveCNObjectWithNotificationReplica(Session session, Identifier pid, SystemMetadata sysMeta, boolean needModifyDate) 
717
                  throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
718
          boolean logArchive = true;
719
          archiveCNObject(logArchive, session, pid, sysMeta, needModifyDate);
720
          // notify the replicas
721
          notifyReplicaNodes(sysMeta);
722
          return pid;
723
    }
724
  
725
  
726
  
727
  /**
728
   * Set the obsoletedBy attribute in System Metadata
729
   * @param session
730
   * @param pid
731
   * @param obsoletedByPid
732
   * @param serialVersion
733
   * @return
734
   * @throws NotImplemented
735
   * @throws NotFound
736
   * @throws NotAuthorized
737
   * @throws ServiceFailure
738
   * @throws InvalidRequest
739
   * @throws InvalidToken
740
   * @throws VersionMismatch
741
   */
742
  @Override
743
  public boolean setObsoletedBy(Session session, Identifier pid,
744
			Identifier obsoletedByPid, long serialVersion)
745
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
746
			InvalidRequest, InvalidToken, VersionMismatch {
747
      
748
      // do we have a valid pid?
749
      if (pid == null || pid.getValue().trim().equals("")) {
750
          throw new InvalidRequest("4942", "The provided identifier was invalid.");
751
          
752
      }
753
      
754
      /*String serviceFailureCode = "4941";
755
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
756
      if(sid != null) {
757
          pid = sid;
758
      }*/
759
      
760
      // do we have a valid pid?
761
      if (obsoletedByPid == null || obsoletedByPid.getValue().trim().equals("")) {
762
          throw new InvalidRequest("4942", "The provided obsoletedByPid was invalid.");
763
          
764
      }
765
      
766
      try {
767
          if(IdentifierManager.getInstance().systemMetadataSIDExists(obsoletedByPid)) {
768
              throw new InvalidRequest("4942", "The provided obsoletedByPid "+obsoletedByPid.getValue()+" is an existing SID. However, it must NOT be an SID.");
769
          }
770
      } catch (SQLException ee) {
771
          throw new ServiceFailure("4941", "Couldn't determine if the obsoletedByPid "+obsoletedByPid.getValue()+" is an SID or not. The id shouldn't be an SID.");
772
      }
773
      
774

    
775
		// The lock to be used for this identifier
776
		Lock lock = null;
777

    
778
		// get the subject
779
		Subject subject = session.getSubject();
780

    
781
		// are we allowed to do this?
782
		if (!isAuthorized(session, pid, Permission.WRITE)) {
783
			throw new NotAuthorized("4881", Permission.WRITE
784
					+ " not allowed by " + subject.getValue() + " on "
785
					+ pid.getValue());
786

    
787
		}
788

    
789

    
790
		SystemMetadata systemMetadata = null;
791
		try {
792
			lock = HazelcastService.getInstance().getLock(pid.getValue());
793
			lock.lock();
794
			logMetacat.debug("Locked identifier " + pid.getValue());
795

    
796
			try {
797
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
798
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
799
				}
800

    
801
				// did we get it correctly?
802
				if (systemMetadata == null) {
803
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
804
				}
805

    
806
				// does the request have the most current system metadata?
807
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
808
					String msg = "The requested system metadata version number "
809
							+ serialVersion
810
							+ " differs from the current version at "
811
							+ systemMetadata.getSerialVersion().longValue()
812
							+ ". Please get the latest copy in order to modify it.";
813
					throw new VersionMismatch("4886", msg);
814

    
815
				}
816
				
817
				 //only apply to the object whose authoritative member node is v1.
818
	              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
819
	              String version = checker.getVersion("MNRead");
820
	              if(version == null) {
821
	                  throw new ServiceFailure("4941", "Couldn't determine the MNRead version of the authoritative member node for the pid "+pid.getValue());
822
	              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
823
	                  //we don't apply this method to an object whose authoritative node is v2
824
	                  throw new NotAuthorized("4945", V2V1MISSMATCH);
825
	              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
826
	                  //we don't understand this version (it is not v1 or v2)
827
	                  throw new InvalidRequest("4942", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
828
	              }
829

    
830
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
831
				throw new NotFound("4884", "No record found for: " + pid.getValue());
832

    
833
			}
834

    
835
			// set the new policy
836
			systemMetadata.setObsoletedBy(obsoletedByPid);
837

    
838
			// update the metadata
839
			try {
840
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
841
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
842
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
843
			} catch (RuntimeException e) {
844
				throw new ServiceFailure("4882", e.getMessage());
845
			}
846
			
847

    
848
		} catch (RuntimeException e) {
849
			throw new ServiceFailure("4882", e.getMessage());
850
		} finally {
851
			lock.unlock();
852
			logMetacat.debug("Unlocked identifier " + pid.getValue());
853
		}
854

    
855
		return true;
856
	}
857
  
858
  
859
  /**
860
   * Set the replication status for an object given the object identifier
861
   * 
862
   * @param session - the Session object containing the credentials for the Subject
863
   * @param pid - the object identifier for the given object
864
   * @param status - the replication status to be applied
865
   * 
866
   * @return true or false
867
   * 
868
   * @throws NotImplemented
869
   * @throws NotAuthorized
870
   * @throws ServiceFailure
871
   * @throws InvalidRequest
872
   * @throws InvalidToken
873
   * @throws NotFound
874
   * 
875
   */
876
  @Override
877
  public boolean setReplicationStatus(Session session, Identifier pid,
878
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
879
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
880
      InvalidRequest, NotFound {
881
	  
882
	  // cannot be called by public
883
	  if (session == null) {
884
		  throw new NotAuthorized("4720", "Session cannot be null");
885
	  } 
886
	  
887
	  /*else {
888
	      if(!isCNAdmin(session)) {
889
              throw new NotAuthorized("4720", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for setting the replication status of the object "+pid.getValue());
890
        }
891
	  }*/
892
	  
893
	// do we have a valid pid?
894
      if (pid == null || pid.getValue().trim().equals("")) {
895
          throw new InvalidRequest("4730", "The provided identifier was invalid.");
896
          
897
      }
898
      
899
      /*String serviceFailureCode = "4700";
900
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
901
      if(sid != null) {
902
          pid = sid;
903
      }*/
904
      
905
      // The lock to be used for this identifier
906
      Lock lock = null;
907
      
908
      boolean allowed = false;
909
      int replicaEntryIndex = -1;
910
      List<Replica> replicas = null;
911
      // get the subject
912
      Subject subject = session.getSubject();
913
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
914
          " is " + status.toString());
915
      
916
      SystemMetadata systemMetadata = null;
917

    
918
      try {
919
          lock = HazelcastService.getInstance().getLock(pid.getValue());
920
          lock.lock();
921
          logMetacat.debug("Locked identifier " + pid.getValue());
922

    
923
          try {      
924
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
925

    
926
              // did we get it correctly?
927
              if ( systemMetadata == null ) {
928
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
929
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
930
                  
931
              }
932
              replicas = systemMetadata.getReplicaList();
933
              int count = 0;
934
              
935
              // was there a failure? log it
936
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
937
                 String msg = "The replication request of the object identified by " + 
938
                     pid.getValue() + " failed.  The error message was " +
939
                     failure.getMessage() + ".";
940
                 logMetacat.error(msg);
941
              }
942
              
943
              if (replicas.size() > 0 && replicas != null) {
944
                  // find the target replica index in the replica list
945
                  for (Replica replica : replicas) {
946
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
947
                      String targetNodeStr = targetNode.getValue();
948
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
949
                  
950
                      if (replicaNodeStr.equals(targetNodeStr)) {
951
                          replicaEntryIndex = count;
952
                          logMetacat.debug("replica entry index is: "
953
                                  + replicaEntryIndex);
954
                          break;
955
                      }
956
                      count++;
957
                  
958
                  }
959
              }
960
              // are we allowed to do this? only CNs and target MNs are allowed
961
              CNode cn = D1Client.getCN();
962
              List<Node> nodes = cn.listNodes().getNodeList();
963
              
964
              // find the node in the node list
965
              for ( Node node : nodes ) {
966
                  
967
                  NodeReference nodeReference = node.getIdentifier();
968
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
969
                      nodeReference.getValue());
970
                  
971
                  // allow target MN certs
972
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
973
                      node.getType().equals(NodeType.MN)) {
974
                      List<Subject> nodeSubjects = node.getSubjectList();
975
                      
976
                      // check if the session subject is in the node subject list
977
                      for (Subject nodeSubject : nodeSubjects) {
978
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
979
                                  nodeSubject.getValue() + " and " + subject.getValue());
980
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
981
                              
982
                              // lastly limit to COMPLETED, INVALIDATED,
983
                              // and FAILED status updates from MNs only
984
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
985
                                   status.equals(ReplicationStatus.INVALIDATED) ||
986
                                   status.equals(ReplicationStatus.FAILED)) {
987
                                  allowed = true;
988
                                  break;
989
                                  
990
                              }                              
991
                          }
992
                      }                 
993
                  }
994
              }
995

    
996
              if ( !allowed ) {
997
                  //check for CN admin access
998
                  //allowed = isAuthorized(session, pid, Permission.WRITE);
999
                  allowed = isCNAdmin(session);
1000
                  
1001
              }              
1002
              
1003
              if ( !allowed ) {
1004
                  String msg = "The subject identified by "
1005
                          + subject.getValue()
1006
                          + " is not a CN or MN, and does not have permission to set the replication status for "
1007
                          + "the replica identified by "
1008
                          + targetNode.getValue() + ".";
1009
                  logMetacat.info(msg);
1010
                  throw new NotAuthorized("4720", msg);
1011
                  
1012
              }
1013

    
1014
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1015
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
1016
                " : " + e.getMessage());
1017
            
1018
          }
1019
          
1020
          Replica targetReplica = new Replica();
1021
          // set the status for the replica
1022
          if ( replicaEntryIndex != -1 ) {
1023
              targetReplica = replicas.get(replicaEntryIndex);
1024
              
1025
              // don't allow status to change from COMPLETED to anything other
1026
              // than INVALIDATED: prevents overwrites from race conditions
1027
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1028
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
1029
            	  throw new InvalidRequest("4730", "Status state change from " +
1030
            			  targetReplica.getReplicationStatus() + " to " +
1031
            			  status.toString() + "is prohibited for identifier " +
1032
            			  pid.getValue() + " and target node " + 
1033
            			  targetReplica.getReplicaMemberNode().getValue());
1034
              }
1035
              
1036
              if(targetReplica.getReplicationStatus().equals(status)) {
1037
                  //There is no change in the status, we do nothing.
1038
                  return true;
1039
              }
1040
              
1041
              targetReplica.setReplicationStatus(status);
1042
              
1043
              logMetacat.debug("Set the replication status for " + 
1044
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
1045
                  targetReplica.getReplicationStatus() + " for identifier " +
1046
                  pid.getValue());
1047
              
1048
          } else {
1049
              // this is a new entry, create it
1050
              targetReplica.setReplicaMemberNode(targetNode);
1051
              targetReplica.setReplicationStatus(status);
1052
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
1053
              replicas.add(targetReplica);
1054
              
1055
          }
1056
          
1057
          systemMetadata.setReplicaList(replicas);
1058
                
1059
          // update the metadata
1060
          try {
1061
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1062
              // Based on CN behavior discussion 9/16/15, we no longer want to 
1063
              // update the modified date for changes to the replica list
1064
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1065
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1066

    
1067
              if ( !status.equals(ReplicationStatus.QUEUED) && 
1068
            	   !status.equals(ReplicationStatus.REQUESTED)) {
1069
                  
1070
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
1071
                          "\tNODE:\t" + targetNode.getValue() + 
1072
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1073
                
1074
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
1075
                          "\tPID:\t"  + pid.getValue() + 
1076
                          "\tNODE:\t" + targetNode.getValue() + 
1077
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
1078
              }
1079

    
1080
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
1081
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
1082
                      " on target node " + targetNode + ". The exception was: " +
1083
                      failure.getMessage());
1084
              }
1085
              
1086
			  // update the replica nodes about the completed replica when complete, failed or invalid
1087
              if (status.equals(ReplicationStatus.COMPLETED) || status.equals(ReplicationStatus.FAILED) ||
1088
                      status.equals(ReplicationStatus.INVALIDATED)) {
1089
				notifyReplicaNodes(systemMetadata);
1090
			}
1091
          
1092
          } catch (RuntimeException e) {
1093
              throw new ServiceFailure("4700", e.getMessage());
1094
          
1095
          }
1096
          
1097
    } catch (RuntimeException e) {
1098
        String msg = "There was a RuntimeException getting the lock for " +
1099
            pid.getValue();
1100
        logMetacat.info(msg);
1101
        
1102
    } finally {
1103
        lock.unlock();
1104
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1105
        
1106
    }
1107
      
1108
      return true;
1109
  }
1110
  
1111
/**
1112
   * Return the checksum of the object given the identifier 
1113
   * 
1114
   * @param session - the Session object containing the credentials for the Subject
1115
   * @param pid - the object identifier for the given object
1116
   * 
1117
   * @return checksum - the checksum of the object
1118
   * 
1119
   * @throws InvalidToken
1120
   * @throws ServiceFailure
1121
   * @throws NotAuthorized
1122
   * @throws NotFound
1123
   * @throws NotImplemented
1124
   */
1125
  @Override
1126
  public Checksum getChecksum(Session session, Identifier pid)
1127
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
1128
    NotImplemented {
1129
    
1130
	boolean isAuthorized = false;
1131
	try {
1132
		isAuthorized = isAuthorized(session, pid, Permission.READ);
1133
	} catch (InvalidRequest e) {
1134
		throw new ServiceFailure("1410", e.getDescription());
1135
	}  
1136
    if (!isAuthorized) {
1137
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
1138
    }
1139
    
1140
    SystemMetadata systemMetadata = null;
1141
    Checksum checksum = null;
1142
    
1143
    try {
1144
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
1145

    
1146
        if (systemMetadata == null ) {
1147
            String error ="";
1148
            String localId = null;
1149
            try {
1150
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1151
              
1152
             } catch (Exception e) {
1153
                logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1154
            }
1155
            
1156
            if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1157
                error = DELETEDMESSAGE;
1158
            } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1159
                error = DELETEDMESSAGE;
1160
            }
1161
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1162
        }
1163
        checksum = systemMetadata.getChecksum();
1164
        
1165
    } catch (RuntimeException e) {
1166
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
1167
            pid.getValue() + ". The error message was: " + e.getMessage());
1168
      
1169
    }
1170
    
1171
    return checksum;
1172
  }
1173

    
1174
  /**
1175
   * Resolve the location of a given object
1176
   * 
1177
   * @param session - the Session object containing the credentials for the Subject
1178
   * @param pid - the object identifier for the given object
1179
   * 
1180
   * @return objectLocationList - the list of nodes known to contain the object
1181
   * 
1182
   * @throws InvalidToken
1183
   * @throws ServiceFailure
1184
   * @throws NotAuthorized
1185
   * @throws NotFound
1186
   * @throws NotImplemented
1187
   */
1188
  @Override
1189
  public ObjectLocationList resolve(Session session, Identifier pid)
1190
    throws InvalidToken, ServiceFailure, NotAuthorized,
1191
    NotFound, NotImplemented {
1192

    
1193
    throw new NotImplemented("4131", "resolve not implemented");
1194

    
1195
  }
1196

    
1197
  /**
1198
   * Metacat does not implement this method at the CN level
1199
   */
1200
  @Override
1201
  public ObjectList search(Session session, String queryType, String query)
1202
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1203
    NotImplemented {
1204

    
1205
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
1206
	  
1207
//    ObjectList objectList = null;
1208
//    try {
1209
//        objectList = 
1210
//          IdentifierManager.getInstance().querySystemMetadata(
1211
//              null, //startTime, 
1212
//              null, //endTime,
1213
//              null, //objectFormat, 
1214
//              false, //replicaStatus, 
1215
//              0, //start, 
1216
//              1000 //count
1217
//              );
1218
//        
1219
//    } catch (Exception e) {
1220
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
1221
//    }
1222
//
1223
//      return objectList;
1224
		  
1225
  }
1226
  
1227
  /**
1228
   * Returns the object format registered in the DataONE Object Format 
1229
   * Vocabulary for the given format identifier
1230
   * 
1231
   * @param fmtid - the identifier of the format requested
1232
   * 
1233
   * @return objectFormat - the object format requested
1234
   * 
1235
   * @throws ServiceFailure
1236
   * @throws NotFound
1237
   * @throws InsufficientResources
1238
   * @throws NotImplemented
1239
   */
1240
  @Override
1241
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1242
    throws ServiceFailure, NotFound, NotImplemented {
1243
     
1244
      return ObjectFormatService.getInstance().getFormat(fmtid);
1245
      
1246
  }
1247

    
1248
    @Override
1249
    public ObjectFormatIdentifier addFormat(Session session, ObjectFormatIdentifier formatId, ObjectFormat format)
1250
            throws ServiceFailure, NotFound, NotImplemented, NotAuthorized, InvalidToken {
1251

    
1252
        logMetacat.debug("CNodeService.addFormat() called.\n" + 
1253
                "format ID: " + format.getFormatId() + "\n" + 
1254
                "format name: " + format.getFormatName() + "\n" + 
1255
                "format type: " + format.getFormatType() );
1256
        
1257
        // FIXME remove:
1258
        if (true)
1259
            throw new NotImplemented("0000", "Implementation underway... Will need testing too...");
1260
        
1261
        if (!isAdminAuthorized(session))
1262
            throw new NotAuthorized("0000", "Not authorized to call addFormat()");
1263

    
1264
        String separator = ".";
1265
        try {
1266
            separator = PropertyService.getProperty("document.accNumSeparator");
1267
        } catch (PropertyNotFoundException e) {
1268
            logMetacat.warn("Unable to find property \"document.accNumSeparator\"\n" + e.getMessage());
1269
        }
1270

    
1271
        // find pids of last and next ObjectFormatList
1272
        String OBJECT_FORMAT_DOCID = ObjectFormatService.OBJECT_FORMAT_DOCID;
1273
        int lastRev = -1;
1274
        try {
1275
            lastRev = DBUtil.getLatestRevisionInDocumentTable(OBJECT_FORMAT_DOCID);
1276
        } catch (SQLException e) {
1277
            throw new ServiceFailure("0000", "Unable to locate last revision of the object format list.\n" + e.getMessage());
1278
        }
1279
        int nextRev = lastRev + 1;
1280
        String lastDocID = OBJECT_FORMAT_DOCID + separator + lastRev;
1281
        String nextDocID = OBJECT_FORMAT_DOCID + separator + nextRev;
1282
        
1283
        Identifier lastPid = new Identifier();
1284
        lastPid.setValue(lastDocID);
1285
        Identifier nextPid = new Identifier();
1286
        nextPid.setValue(nextDocID);
1287
        
1288
        logMetacat.debug("Last ObjectFormatList document ID: " + lastDocID + "\n" 
1289
                + "Next ObjectFormatList document ID: " + nextDocID);
1290
        
1291
        // add new format to the current ObjectFormatList
1292
        ObjectFormatList objectFormatList = ObjectFormatService.getInstance().listFormats();
1293
        List<ObjectFormat> innerList = objectFormatList.getObjectFormatList();
1294
        innerList.add(format);
1295

    
1296
        // get existing (last) sysmeta and make a copy
1297
        SystemMetadata lastSysmeta = getSystemMetadata(session, lastPid);
1298
        SystemMetadata nextSysmeta = new SystemMetadata();
1299
        try {
1300
            BeanUtils.copyProperties(nextSysmeta, lastSysmeta);
1301
        } catch (IllegalAccessException | InvocationTargetException e) {
1302
            throw new ServiceFailure("0000", "Unable to create system metadata for updated object format list.\n" + e.getMessage());
1303
        }
1304
        
1305
        // create the new object format list, and update the old sysmeta with obsoletedBy
1306
        createNewObjectFormatList(session, lastPid, nextPid, objectFormatList, nextSysmeta);
1307
        updateOldObjectFormatList(session, lastPid, nextPid, lastSysmeta);
1308
        
1309
        // TODO add to ObjectFormatService local cache?
1310
        
1311
        return formatId;
1312
    }
1313

    
1314
    /**
1315
     * Creates the object for the next / updated version of the ObjectFormatList.
1316
     * 
1317
     * @param session
1318
     * @param lastPid
1319
     * @param nextPid
1320
     * @param objectFormatList
1321
     * @param lastSysmeta
1322
     */
1323
    private void createNewObjectFormatList(Session session, Identifier lastPid, Identifier nextPid,
1324
            ObjectFormatList objectFormatList, SystemMetadata lastSysmeta) 
1325
                    throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented {
1326
        
1327
        PipedInputStream is = new PipedInputStream();
1328
        PipedOutputStream os = null;
1329
        
1330
        try {
1331
            os = new PipedOutputStream(is);
1332
            TypeMarshaller.marshalTypeToOutputStream(objectFormatList, os);
1333
        } catch (MarshallingException | IOException e) {
1334
            throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + e.getMessage());
1335
        } finally {
1336
            try {
1337
                os.flush();
1338
                os.close();
1339
            } catch (IOException ioe) {
1340
                throw new ServiceFailure("0000", "Unable to marshal object format list.\n" + ioe.getMessage());
1341
            }
1342
        }
1343
        
1344
        BigInteger docSize = lastSysmeta.getSize();
1345
        try {
1346
            docSize = BigInteger.valueOf(is.available());
1347
        } catch (IOException e) {
1348
            logMetacat.warn("Unable to set an accurate size for the new object format list.", e);
1349
        }
1350
        
1351
        lastSysmeta.setIdentifier(nextPid);
1352
        lastSysmeta.setObsoletes(lastPid);
1353
        lastSysmeta.setSize(docSize); 
1354
        lastSysmeta.setSubmitter(session.getSubject());
1355
        lastSysmeta.setDateUploaded(new Date());
1356
        
1357
        // create new object format list
1358
        try {
1359
            create(session, nextPid, is, lastSysmeta);
1360
        } catch (IdentifierNotUnique | UnsupportedType | InsufficientResources
1361
                | InvalidSystemMetadata | InvalidRequest e) {
1362
            throw new ServiceFailure("0000", "Unable to create() new object format list" + e.getMessage());
1363
        }
1364
    }
1365
  
1366
    /**
1367
     * Updates the SystemMetadata for the old version of the ObjectFormatList
1368
     * by setting the obsoletedBy value to the pid of the new version of the 
1369
     * ObjectFormatList.
1370
     * 
1371
     * @param session
1372
     * @param lastPid
1373
     * @param obsoletedByPid
1374
     * @param lastSysmeta
1375
     * @throws ServiceFailure
1376
     */
1377
    private void updateOldObjectFormatList(Session session, Identifier lastPid, Identifier obsoletedByPid, SystemMetadata lastSysmeta) 
1378
            throws ServiceFailure {
1379
        
1380
        lastSysmeta.setObsoletedBy(obsoletedByPid);
1381
        
1382
        try {
1383
            this.updateSystemMetadata(session, lastPid, lastSysmeta);
1384
        } catch (NotImplemented | NotAuthorized | ServiceFailure | InvalidRequest
1385
                | InvalidSystemMetadata | InvalidToken e) {
1386
            throw new ServiceFailure("0000", "Unable to update metadata of old object format list.\n" + e.getMessage());
1387
        }
1388
    }
1389
  /**
1390
   * Returns a list of all object formats registered in the DataONE Object 
1391
   * Format Vocabulary
1392
    * 
1393
   * @return objectFormatList - The list of object formats registered in 
1394
   *                            the DataONE Object Format Vocabulary
1395
   * 
1396
   * @throws ServiceFailure
1397
   * @throws NotImplemented
1398
   * @throws InsufficientResources
1399
   */
1400
  @Override
1401
  public ObjectFormatList listFormats() 
1402
    throws ServiceFailure, NotImplemented {
1403

    
1404
    return ObjectFormatService.getInstance().listFormats();
1405
  }
1406

    
1407
  /**
1408
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1409
    * 
1410
   * @return nodeList - List of nodes from the registry
1411
   * 
1412
   * @throws ServiceFailure
1413
   * @throws NotImplemented
1414
   */
1415
  @Override
1416
  public NodeList listNodes() 
1417
    throws NotImplemented, ServiceFailure {
1418

    
1419
    throw new NotImplemented("4800", "listNodes not implemented");
1420
  }
1421

    
1422
  /**
1423
   * Provides a mechanism for adding system metadata independently of its 
1424
   * associated object, such as when adding system metadata for data objects.
1425
    * 
1426
   * @param session - the Session object containing the credentials for the Subject
1427
   * @param pid - The identifier of the object to register the system metadata against
1428
   * @param sysmeta - The system metadata to be registered
1429
   * 
1430
   * @return true if the registration succeeds
1431
   * 
1432
   * @throws NotImplemented
1433
   * @throws NotAuthorized
1434
   * @throws ServiceFailure
1435
   * @throws InvalidRequest
1436
   * @throws InvalidSystemMetadata
1437
   */
1438
  @Override
1439
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1440
      SystemMetadata sysmeta) 
1441
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1442
      InvalidSystemMetadata {
1443

    
1444
      // The lock to be used for this identifier
1445
      Lock lock = null;
1446

    
1447
      // TODO: control who can call this?
1448
      if (session == null) {
1449
          //TODO: many of the thrown exceptions do not use the correct error codes
1450
          //check these against the docs and correct them
1451
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1452
                  "  If you are not logged in, please do so and retry the request.");
1453
      } else {
1454
          //only CN is allwoed
1455
          if(!isCNAdmin(session)) {
1456
                throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for registering the system metadata of the object "+pid.getValue());
1457
          }
1458
      }
1459
      // the identifier can't be an SID
1460
      try {
1461
          if(IdentifierManager.getInstance().systemMetadataSIDExists(pid)) {
1462
              throw new InvalidRequest("4863", "The provided identifier "+pid.getValue()+" is a series id which is not allowed.");
1463
          }
1464
      } catch (SQLException sqle) {
1465
          throw new ServiceFailure("4862", "Couldn't determine if the pid "+pid.getValue()+" is a series id since "+sqle.getMessage());
1466
      }
1467
      
1468
      // verify that guid == SystemMetadata.getIdentifier()
1469
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1470
          "|" + sysmeta.getIdentifier().getValue());
1471
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1472
          throw new InvalidRequest("4863", 
1473
              "The identifier in method call (" + pid.getValue() + 
1474
              ") does not match identifier in system metadata (" +
1475
              sysmeta.getIdentifier().getValue() + ").");
1476
      }
1477
      
1478
      //check if the sid is legitimate in the system metadata
1479
      //checkSidInModifyingSystemMetadata(sysmeta, "4864", "4862");
1480
      Identifier sid = sysmeta.getSeriesId();
1481
      if(sid != null) {
1482
          if (!isValidIdentifier(sid)) {
1483
              throw new InvalidRequest("4863", "The series id in the system metadata is invalid in the request.");
1484
          }
1485
      }
1486

    
1487
      try {
1488
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1489
          lock.lock();
1490
          logMetacat.debug("Locked identifier " + pid.getValue());
1491
          logMetacat.debug("Checking if identifier exists...");
1492
          // Check that the identifier does not already exist
1493
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1494
              throw new InvalidRequest("4863", 
1495
                  "The identifier is already in use by an existing object.");
1496
          
1497
          }
1498
          
1499
          // insert the system metadata into the object store
1500
          logMetacat.debug("Starting to insert SystemMetadata...");
1501
          try {
1502
              //for the object whose authoriative mn is v1. we need reset the modification date.
1503
              //d1-sync already set the serial version. so we don't need do again.
1504
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1505
              String version = checker.getVersion("MNRead");
1506
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1507
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1508
              }
1509
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1510
              
1511
          } catch (RuntimeException e) {
1512
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1513
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1514
                  e.getClass() + ": " + e.getMessage());
1515
              
1516
          }
1517
          
1518
      } catch (RuntimeException e) {
1519
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1520
                  e.getClass() + ": " + e.getMessage());
1521
          
1522
      }  finally {
1523
          lock.unlock();
1524
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1525
          
1526
      }
1527

    
1528
      
1529
      logMetacat.debug("Returning from registerSystemMetadata");
1530
      
1531
      try {
1532
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1533
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1534
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1535
    	          localId, "registerSystemMetadata");
1536
      } catch (McdbDocNotFoundException e) {
1537
    	  // do nothing, no localId to log with
1538
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1539
      } catch (SQLException ee) {
1540
          // do nothing, no localId to log with
1541
          logMetacat.warn("Could not log 'registerSystemMetadata' event because the localId couldn't be identified for pid: " + pid.getValue());
1542
      }
1543
      
1544
      
1545
      return pid;
1546
  }
1547
  
1548
  /**
1549
   * Given an optional scope and format, reserves and returns an identifier 
1550
   * within that scope and format that is unique and will not be 
1551
   * used by any other sessions. 
1552
    * 
1553
   * @param session - the Session object containing the credentials for the Subject
1554
   * @param pid - The identifier of the object to register the system metadata against
1555
   * @param scope - An optional string to be used to qualify the scope of 
1556
   *                the identifier namespace, which is applied differently 
1557
   *                depending on the format requested. If scope is not 
1558
   *                supplied, a default scope will be used.
1559
   * @param format - The optional name of the identifier format to be used, 
1560
   *                  drawn from a DataONE-specific vocabulary of identifier 
1561
   *                 format names, including several common syntaxes such 
1562
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1563
   *                 format is not supplied by the caller, the CN service 
1564
   *                 will use a default identifier format, which may change 
1565
   *                 over time.
1566
   * 
1567
   * @return true if the registration succeeds
1568
   * 
1569
   * @throws InvalidToken
1570
   * @throws ServiceFailure
1571
   * @throws NotAuthorized
1572
   * @throws IdentifierNotUnique
1573
   * @throws NotImplemented
1574
   */
1575
  @Override
1576
  public Identifier reserveIdentifier(Session session, Identifier pid)
1577
  throws InvalidToken, ServiceFailure,
1578
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1579

    
1580
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1581
  }
1582
  
1583
  @Override
1584
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1585
  throws InvalidToken, ServiceFailure,
1586
        NotAuthorized, NotImplemented, InvalidRequest {
1587
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1588
  }
1589
  
1590
  /**
1591
    * Checks whether the pid is reserved by the subject in the session param
1592
    * If the reservation is held on the pid by the subject, we return true.
1593
    * 
1594
   * @param session - the Session object containing the Subject
1595
   * @param pid - The identifier to check
1596
   * 
1597
   * @return true if the reservation exists for the subject/pid
1598
   * 
1599
   * @throws InvalidToken
1600
   * @throws ServiceFailure
1601
   * @throws NotFound - when the pid is not found (in use or in reservation)
1602
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1603
   * @throws IdentifierNotUnique - when the pid is in use
1604
   * @throws NotImplemented
1605
   */
1606

    
1607
  @Override
1608
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1609
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1610
      NotImplemented, InvalidRequest {
1611
  
1612
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1613
  }
1614

    
1615
  /**
1616
   * Changes ownership (RightsHolder) of the specified object to the 
1617
   * subject specified by userId
1618
    * 
1619
   * @param session - the Session object containing the credentials for the Subject
1620
   * @param pid - Identifier of the object to be modified
1621
   * @param userId - The subject that will be taking ownership of the specified object.
1622
   *
1623
   * @return pid - the identifier of the modified object
1624
   * 
1625
   * @throws ServiceFailure
1626
   * @throws InvalidToken
1627
   * @throws NotFound
1628
   * @throws NotAuthorized
1629
   * @throws NotImplemented
1630
   * @throws InvalidRequest
1631
   */  
1632
  @Override
1633
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1634
      long serialVersion)
1635
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1636
      NotImplemented, InvalidRequest, VersionMismatch {
1637
      
1638
      // The lock to be used for this identifier
1639
      Lock lock = null;
1640
      
1641
      // do we have a valid pid?
1642
      if (pid == null || pid.getValue().trim().equals("")) {
1643
          throw new InvalidRequest("4442", "The provided identifier was invalid.");
1644
          
1645
      }
1646

    
1647
      // get the subject
1648
      Subject subject = session.getSubject();
1649
      
1650
      String serviceFailureCode = "4490";
1651
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
1652
      if(sid != null) {
1653
          pid = sid;
1654
      }
1655
      
1656
      // are we allowed to do this?
1657
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1658
          throw new NotAuthorized("4440", "not allowed by "
1659
                  + subject.getValue() + " on " + pid.getValue());
1660
          
1661
      }
1662
      
1663
      SystemMetadata systemMetadata = null;
1664
      try {
1665
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1666
          lock.lock();
1667
          logMetacat.debug("Locked identifier " + pid.getValue());
1668

    
1669
          try {
1670
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1671
              
1672
              if(systemMetadata == null) {
1673
                  throw new NotFound("4460", "The object "+pid.getValue()+" doesn't exist in the node.");
1674
              }
1675
              // does the request have the most current system metadata?
1676
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1677
                 String msg = "The requested system metadata version number " + 
1678
                     serialVersion + " differs from the current version at " +
1679
                     systemMetadata.getSerialVersion().longValue() +
1680
                     ". Please get the latest copy in order to modify it.";
1681
                 throw new VersionMismatch("4443", msg);
1682
              }
1683
              
1684
              //only apply to the object whose authoritative member node is v1.
1685
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
1686
              String version = checker.getVersion("MNRead");
1687
              if(version == null) {
1688
                  throw new ServiceFailure("4490", "Couldn't determine the MNRead version of the authoritative member node storage version for the pid "+pid.getValue());
1689
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
1690
                  //we don't apply this method to an object whose authoritative node is v2
1691
                  throw new NotAuthorized("4440", V2V1MISSMATCH);
1692
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1693
                  //we don't understand this version (it is not v1 or v2)
1694
                  throw new InvalidRequest("4442", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
1695
              }
1696
              
1697
              
1698
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1699
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1700
              
1701
          }
1702
              
1703
          // set the new rights holder
1704
          systemMetadata.setRightsHolder(userId);
1705
          
1706
          // update the metadata
1707
          try {
1708
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1709
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1710
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1711
              notifyReplicaNodes(systemMetadata);
1712
              
1713
          } catch (RuntimeException e) {
1714
              throw new ServiceFailure("4490", e.getMessage());
1715
          
1716
          }
1717
          
1718
      } catch (RuntimeException e) {
1719
          throw new ServiceFailure("4490", e.getMessage());
1720
          
1721
      } finally {
1722
          lock.unlock();
1723
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1724
      
1725
      }
1726
      
1727
      return pid;
1728
  }
1729

    
1730
  /**
1731
   * Verify that a replication task is authorized by comparing the target node's
1732
   * Subject (from the X.509 certificate-derived Session) with the list of 
1733
   * subjects in the known, pending replication tasks map.
1734
   * 
1735
   * @param originatingNodeSession - Session information that contains the 
1736
   *                                 identity of the calling user
1737
   * @param targetNodeSubject - Subject identifying the target node
1738
   * @param pid - the identifier of the object to be replicated
1739
   * @param replicatePermission - the execute permission to be granted
1740
   * 
1741
   * @throws ServiceFailure
1742
   * @throws NotImplemented
1743
   * @throws InvalidToken
1744
   * @throws NotAuthorized
1745
   * @throws InvalidRequest
1746
   * @throws NotFound
1747
   */
1748
  @Override
1749
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1750
    Subject targetNodeSubject, Identifier pid) 
1751
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1752
    NotFound, InvalidRequest {
1753
    
1754
    boolean isAllowed = false;
1755
    SystemMetadata sysmeta = null;
1756
    NodeReference targetNode = null;
1757
    
1758
    try {
1759
      // get the target node reference from the nodes list
1760
      CNode cn = D1Client.getCN();
1761
      List<Node> nodes = cn.listNodes().getNodeList();
1762
      
1763
      if ( nodes != null ) {
1764
        for (Node node : nodes) {
1765
            
1766
        	if (node.getSubjectList() != null) {
1767
        		
1768
	            for (Subject nodeSubject : node.getSubjectList()) {
1769
	            	
1770
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1771
	                    targetNode = node.getIdentifier();
1772
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1773
	                    break;
1774
	                }
1775
	            }
1776
        	}
1777
            
1778
            if ( targetNode != null) { break; }
1779
        }
1780
        
1781
      } else {
1782
          String msg = "Couldn't get the node list from the CN";
1783
          logMetacat.debug(msg);
1784
          throw new ServiceFailure("4872", msg);
1785
          
1786
      }
1787
      
1788
      // can't find a node listed with the given subject
1789
      if ( targetNode == null ) {
1790
          String msg = "There is no Member Node registered with a node subject " +
1791
              "matching " + targetNodeSubject.getValue();
1792
          logMetacat.info(msg);
1793
          throw new NotAuthorized("4871", msg);
1794
          
1795
      }
1796
      
1797
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1798
      
1799
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1800

    
1801
      if ( sysmeta != null ) {
1802
          
1803
          List<Replica> replicaList = sysmeta.getReplicaList();
1804
          
1805
          if ( replicaList != null ) {
1806
              
1807
              // find the replica with the status set to 'requested'
1808
              for (Replica replica : replicaList) {
1809
                  ReplicationStatus status = replica.getReplicationStatus();
1810
                  NodeReference listedNode = replica.getReplicaMemberNode();
1811
                  if ( listedNode != null && targetNode != null ) {
1812
                      logMetacat.debug("Comparing " + listedNode.getValue()
1813
                              + " to " + targetNode.getValue());
1814
                      
1815
                      if (listedNode.getValue().equals(targetNode.getValue())
1816
                              && status.equals(ReplicationStatus.REQUESTED)) {
1817
                          isAllowed = true;
1818
                          break;
1819

    
1820
                      }
1821
                  }
1822
              }
1823
          }
1824
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1825
              "to replicate: " + isAllowed + " for " + pid.getValue());
1826

    
1827
          
1828
      } else {
1829
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1830
          " is null.");
1831
          String error ="";
1832
          String localId = null;
1833
          try {
1834
              localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1835
            
1836
           } catch (Exception e) {
1837
              logMetacat.warn("Couldn't find the local id for the pid "+pid.getValue());
1838
          }
1839
          
1840
          if(localId != null && EventLog.getInstance().isDeleted(localId)) {
1841
              error = DELETEDMESSAGE;
1842
          } else if (localId == null && EventLog.getInstance().isDeleted(pid.getValue())) {
1843
              error = DELETEDMESSAGE;
1844
          }
1845
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue()+". "+error);
1846
          
1847
      }
1848

    
1849
    } catch (RuntimeException e) {
1850
    	  ServiceFailure sf = new ServiceFailure("4872", 
1851
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1852
                e.getMessage());
1853
    	  sf.initCause(e);
1854
        throw sf;
1855
        
1856
    }
1857
      
1858
    return isAllowed;
1859
    
1860
  }
1861

    
1862
  /**
1863
   * Adds a new object to the Node, where the object is a science metadata object.
1864
   * 
1865
   * @param session - the Session object containing the credentials for the Subject
1866
   * @param pid - The object identifier to be created
1867
   * @param object - the object bytes
1868
   * @param sysmeta - the system metadata that describes the object  
1869
   * 
1870
   * @return pid - the object identifier created
1871
   * 
1872
   * @throws InvalidToken
1873
   * @throws ServiceFailure
1874
   * @throws NotAuthorized
1875
   * @throws IdentifierNotUnique
1876
   * @throws UnsupportedType
1877
   * @throws InsufficientResources
1878
   * @throws InvalidSystemMetadata
1879
   * @throws NotImplemented
1880
   * @throws InvalidRequest
1881
   */
1882
  public Identifier create(Session session, Identifier pid, InputStream object,
1883
    SystemMetadata sysmeta) 
1884
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1885
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1886
    NotImplemented, InvalidRequest {
1887
    
1888
    try {
1889
      // verify the pid is valid format
1890
      if (!isValidIdentifier(pid)) {
1891
          throw new InvalidRequest("4891", "The provided identifier is invalid.");
1892
      }
1893
      logMetacat.debug("CN.create -start to create the object with pid "+pid.getValue());
1894
      // The lock to be used for this identifier
1895
      Lock lock = null;
1896

    
1897
      try {
1898
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1899
          lock.lock();
1900
          // are we allowed?
1901
          boolean isAllowed = false;
1902
          isAllowed = isAdminAuthorized(session);
1903
          
1904
          // additional check if it is the authoritative node if it is not the admin
1905
          if(!isAllowed) {
1906
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1907
          }
1908

    
1909
          // proceed if we're called by a CN
1910
          if ( isAllowed ) {
1911
              //check if the series id is legitimate. It uses the same rules of the method registerSystemMetadata
1912
              //checkSidInModifyingSystemMetadata(sysmeta, "4896", "4893");
1913
              Identifier sid = sysmeta.getSeriesId();
1914
              if(sid != null) {
1915
                  if (!isValidIdentifier(sid)) {
1916
                      throw new InvalidRequest("4891", "The series id in the system metadata is invalid in the request.");
1917
                  }
1918
              }
1919
              // create the coordinating node version of the document      
1920
              logMetacat.debug("CN.create - after locking identifier, passing authorization check, continue to create the object " + pid.getValue());
1921
              sysmeta.setSerialVersion(BigInteger.ONE);
1922
              //for the object whose authoritative mn is v1. we need reset the modification date.
1923
              //for the object whose authoritative mn is v2. we just accept the modification date.
1924
              D1NodeVersionChecker checker = new D1NodeVersionChecker(sysmeta.getAuthoritativeMemberNode());
1925
              String version = checker.getVersion("MNRead");
1926
              if(version != null && version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
1927
                  sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1928
              }
1929
              //sysmeta.setArchived(false); // this is a create op, not update
1930
              
1931
              // the CN should have set the origin and authoritative member node fields
1932
              try {
1933
                  sysmeta.getOriginMemberNode().getValue();
1934
                  sysmeta.getAuthoritativeMemberNode().getValue();
1935
                  
1936
              } catch (NullPointerException npe) {
1937
                  throw new InvalidSystemMetadata("4896", 
1938
                      "Both the origin and authoritative member node identifiers need to be set.");
1939
                  
1940
              }
1941
              pid = super.create(session, pid, object, sysmeta);
1942
              // submit the index task to the message broker.
1943
              String methodName="create";
1944
              boolean isData = false;
1945
              submitAddIndexTask(sysmeta, methodName, isData);
1946
          } else {
1947
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1948
                  " isn't allowed to call create() on a Coordinating Node for pid "+pid.getValue();
1949
              logMetacat.error(msg);
1950
              throw new NotAuthorized("1100", msg);
1951
          }
1952
          
1953
      } catch (RuntimeException e) {
1954
          // Convert Hazelcast runtime exceptions to service failures
1955
          String msg = "There was a problem creating the object identified by " +
1956
              pid.getValue() + ". There error message was: " + e.getMessage();
1957
          throw new ServiceFailure("4893", msg);
1958
          
1959
      } finally {
1960
    	  if (lock != null) {
1961
	          lock.unlock();
1962
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1963
    	  }
1964
      }
1965
    } finally {
1966
        IOUtils.closeQuietly(object);
1967
    }
1968
    return pid;
1969

    
1970
  }
1971

    
1972
  /**
1973
   * Set access for a given object using the object identifier and a Subject
1974
   * under a given Session.
1975
   * This method only applies the objects whose authoritative mn is a v1 node.
1976
   * @param session - the Session object containing the credentials for the Subject
1977
   * @param pid - the object identifier for the given object to apply the policy
1978
   * @param policy - the access policy to be applied
1979
   * 
1980
   * @return true if the application of the policy succeeds
1981
   * @throws InvalidToken
1982
   * @throws ServiceFailure
1983
   * @throws NotFound
1984
   * @throws NotAuthorized
1985
   * @throws NotImplemented
1986
   * @throws InvalidRequest
1987
   */
1988
  public boolean setAccessPolicy(Session session, Identifier pid, 
1989
      AccessPolicy accessPolicy, long serialVersion) 
1990
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1991
      NotImplemented, InvalidRequest, VersionMismatch {
1992
      
1993
   // do we have a valid pid?
1994
      if (pid == null || pid.getValue().trim().equals("")) {
1995
          throw new InvalidRequest("4402", "The provided identifier was invalid.");
1996
          
1997
      }
1998
      
1999
      String serviceFailureCode = "4430";
2000
      Identifier sid = getPIDForSID(pid, serviceFailureCode);
2001
      if(sid != null) {
2002
          pid = sid;
2003
      }
2004
      // The lock to be used for this identifier
2005
      Lock lock = null;
2006
      SystemMetadata systemMetadata = null;
2007
      
2008
      boolean success = false;
2009
      
2010
      // get the subject
2011
      Subject subject = session.getSubject();
2012
      
2013
      // are we allowed to do this?
2014
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
2015
          throw new NotAuthorized("4420", "not allowed by "
2016
                  + subject.getValue() + " on " + pid.getValue());
2017
      }
2018
      
2019
      try {
2020
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2021
          lock.lock();
2022
          logMetacat.debug("Locked identifier " + pid.getValue());
2023

    
2024
          try {
2025
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2026

    
2027
              if ( systemMetadata == null ) {
2028
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
2029
                  
2030
              }
2031
              // does the request have the most current system metadata?
2032
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2033
                 String msg = "The requested system metadata version number " + 
2034
                     serialVersion + " differs from the current version at " +
2035
                     systemMetadata.getSerialVersion().longValue() +
2036
                     ". Please get the latest copy in order to modify it.";
2037
                 throw new VersionMismatch("4402", msg);
2038
                 
2039
              }
2040
              
2041
              D1NodeVersionChecker checker = new D1NodeVersionChecker(systemMetadata.getAuthoritativeMemberNode());
2042
              String version = checker.getVersion("MNRead");
2043
              if(version == null) {
2044
                  throw new ServiceFailure("4430", "Couldn't determine the version of MNRead of the authoritative member node for the pid "+pid.getValue());
2045
              } else if (version.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2046
                  //we don't apply this method to an object whose authoritative node is v2
2047
                  throw new NotAuthorized("4420", V2V1MISSMATCH);
2048
              } else if (!version.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2049
                  //we don't understand this version (it is not v1 or v2)
2050
                  throw new InvalidRequest("4402", "The version of the MNRead is "+version+" for the authoritative member node of the object "+pid.getValue()+". We don't support it.");
2051
              }
2052
              
2053
          } catch (RuntimeException e) {
2054
              // convert Hazelcast RuntimeException to NotFound
2055
              throw new NotFound("4400", "No record found for: " + pid);
2056
            
2057
          }
2058
              
2059
          // set the access policy
2060
          systemMetadata.setAccessPolicy(accessPolicy);
2061
          
2062
          // update the system metadata
2063
          try {
2064
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2065
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2066
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2067
              notifyReplicaNodes(systemMetadata);
2068
              
2069
          } catch (RuntimeException e) {
2070
              // convert Hazelcast RuntimeException to ServiceFailure
2071
              throw new ServiceFailure("4430", e.getMessage());
2072
            
2073
          }
2074
          
2075
      } catch (RuntimeException e) {
2076
          throw new ServiceFailure("4430", e.getMessage());
2077
          
2078
      } finally {
2079
          lock.unlock();
2080
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2081
        
2082
      }
2083

    
2084
    
2085
    // TODO: how do we know if the map was persisted?
2086
    success = true;
2087
    
2088
    return success;
2089
  }
2090

    
2091
  /**
2092
   * Full replacement of replication metadata in the system metadata for the 
2093
   * specified object, changes date system metadata modified
2094
   * 
2095
   * @param session - the Session object containing the credentials for the Subject
2096
   * @param pid - the object identifier for the given object to apply the policy
2097
   * @param replica - the replica to be updated
2098
   * @return
2099
   * @throws NotImplemented
2100
   * @throws NotAuthorized
2101
   * @throws ServiceFailure
2102
   * @throws InvalidRequest
2103
   * @throws NotFound
2104
   * @throws VersionMismatch
2105
   */
2106
  @Override
2107
  public boolean updateReplicationMetadata(Session session, Identifier pid,
2108
      Replica replica, long serialVersion) 
2109
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
2110
      NotFound, VersionMismatch {
2111
      
2112
      // The lock to be used for this identifier
2113
      Lock lock = null;
2114
      
2115
      // get the subject
2116
      Subject subject = session.getSubject();
2117
      
2118
      // are we allowed to do this?
2119
      if(session == null) {
2120
          throw new NotAuthorized("4851", "Session cannot be null. It is not authorized for updating the replication metadata of the object "+pid.getValue());
2121
      } else {
2122
          if(!isCNAdmin(session)) {
2123
              throw new NotAuthorized("4851", "The client -"+ session.getSubject().getValue()+ "is not a CN and is not authorized for updating the replication metadata of the object "+pid.getValue());
2124
        }
2125
      }
2126
      /*try {
2127

    
2128
          // what is the controlling permission?
2129
          if (!isAuthorized(session, pid, Permission.WRITE)) {
2130
              throw new NotAuthorized("4851", "not allowed by "
2131
                      + subject.getValue() + " on " + pid.getValue());
2132
          }
2133

    
2134
        
2135
      } catch (InvalidToken e) {
2136
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
2137
                  " on " + pid.getValue());  
2138
          
2139
      }*/
2140

    
2141
      SystemMetadata systemMetadata = null;
2142
      try {
2143
          lock = HazelcastService.getInstance().getLock(pid.getValue());
2144
          lock.lock();
2145
          logMetacat.debug("Locked identifier " + pid.getValue());
2146

    
2147
          try {      
2148
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2149

    
2150
              // does the request have the most current system metadata?
2151
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
2152
                 String msg = "The requested system metadata version number " + 
2153
                     serialVersion + " differs from the current version at " +
2154
                     systemMetadata.getSerialVersion().longValue() +
2155
                     ". Please get the latest copy in order to modify it.";
2156
                 throw new VersionMismatch("4855", msg);
2157
              }
2158
              
2159
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
2160
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
2161
                  " : " + e.getMessage());
2162
            
2163
          }
2164
              
2165
          // set the status for the replica
2166
          List<Replica> replicas = systemMetadata.getReplicaList();
2167
          NodeReference replicaNode = replica.getReplicaMemberNode();
2168
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
2169
          int index = 0;
2170
          for (Replica listedReplica: replicas) {
2171
              
2172
              // remove the replica that we are replacing
2173
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
2174
                      // don't allow status to change from COMPLETED to anything other
2175
                      // than INVALIDATED: prevents overwrites from race conditions
2176
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
2177
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
2178
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
2179
                	  throw new InvalidRequest("4853", "Status state change from " +
2180
                			  listedReplica.getReplicationStatus() + " to " +
2181
                			  replicaStatus.toString() + "is prohibited for identifier " +
2182
                			  pid.getValue() + " and target node " + 
2183
                			  listedReplica.getReplicaMemberNode().getValue());
2184

    
2185
            	  }
2186
                  replicas.remove(index);
2187
                  break;
2188
                  
2189
              }
2190
              index++;
2191
          }
2192
          
2193
          // add the new replica item
2194
          replicas.add(replica);
2195
          systemMetadata.setReplicaList(replicas);
2196
          
2197
          // update the metadata
2198
          try {
2199
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
2200
              // Based on CN behavior discussion 9/16/15, we no longer want to 
2201
              // update the modified date for changes to the replica list
2202
              //systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
2203
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
2204
              
2205
              // inform replica nodes of the change if the status is complete
2206
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
2207
            	  notifyReplicaNodes(systemMetadata);
2208
            	  
2209
              }
2210
          } catch (RuntimeException e) {
2211
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2212
              throw new ServiceFailure("4852", e.getMessage());
2213
          
2214
          }
2215
          
2216
      } catch (RuntimeException e) {
2217
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
2218
          throw new ServiceFailure("4852", e.getMessage());
2219
      
2220
      } finally {
2221
          lock.unlock();
2222
          logMetacat.debug("Unlocked identifier " + pid.getValue());
2223
          
2224
      }
2225
    
2226
      return true;
2227
      
2228
  }
2229
  
2230
  /**
2231
   * 
2232
   */
2233
  @Override
2234
  public ObjectList listObjects(Session session, Date startTime, 
2235
      Date endTime, ObjectFormatIdentifier formatid, NodeReference nodeId,Identifier identifier,
2236
      Integer start, Integer count)
2237
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
2238
      ServiceFailure {
2239

    
2240
      return super.listObjects(session, startTime, endTime, formatid, identifier, nodeId, start, count);
2241
  }
2242

    
2243
  
2244
 	/**
2245
 	 * Returns a list of checksum algorithms that are supported by DataONE.
2246
 	 * @return cal  the list of checksum algorithms
2247
 	 * 
2248
 	 * @throws ServiceFailure
2249
 	 * @throws NotImplemented
2250
 	 */
2251
  @Override
2252
  public ChecksumAlgorithmList listChecksumAlgorithms()
2253
			throws ServiceFailure, NotImplemented {
2254
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
2255
		cal.addAlgorithm("MD5");
2256
		cal.addAlgorithm("SHA-1");
2257
		return cal;
2258
		
2259
	}
2260

    
2261
  /**
2262
   * Notify replica Member Nodes of system metadata changes for a given pid
2263
   * 
2264
   * @param currentSystemMetadata - the up to date system metadata
2265
   */
2266
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
2267
      
2268
      Session session = null;
2269
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
2270
      //MNode mn = null;
2271
      NodeReference replicaNodeRef = null;
2272
      CNode cn = null;
2273
      NodeType nodeType = null;
2274
      List<Node> nodeList = null;
2275
      
2276
      try {
2277
          cn = D1Client.getCN();
2278
          nodeList = cn.listNodes().getNodeList();
2279
          
2280
      } catch (Exception e) { // handle BaseException and other I/O issues
2281
          
2282
          // swallow errors since the call is not critical
2283
          logMetacat.error("Can't inform MNs of system metadata changes due " +
2284
              "to communication issues with the CN: " + e.getMessage());
2285
          
2286
      }
2287
      
2288
      if ( replicaList != null ) {
2289
          
2290
          // iterate through the replicas and inform  MN replica nodes
2291
          for (Replica replica : replicaList) {
2292
              String replicationVersion = null;
2293
              replicaNodeRef = replica.getReplicaMemberNode();
2294
              try {
2295
                  if (nodeList != null) {
2296
                      // find the node type
2297
                      for (Node node : nodeList) {
2298
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
2299
                              nodeType = node.getType();
2300
                              D1NodeVersionChecker checker = new D1NodeVersionChecker(replicaNodeRef);
2301
                              replicationVersion = checker.getVersion("MNRead");
2302
                              break;
2303
              
2304
                          }
2305
                      }
2306
                  }
2307
              
2308
                  // notify only MNs
2309
                  if (replicationVersion != null && nodeType != null && nodeType == NodeType.MN) {
2310
                      if(replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V2)) {
2311
                          //connect to a v2 mn
2312
                          MNode mn = D1Client.getMN(replicaNodeRef);
2313
                          mn.systemMetadataChanged(session, 
2314
                              currentSystemMetadata.getIdentifier(), 
2315
                              currentSystemMetadata.getSerialVersion().longValue(),
2316
                              currentSystemMetadata.getDateSysMetadataModified());
2317
                      } else if (replicationVersion.equalsIgnoreCase(D1NodeVersionChecker.V1)) {
2318
                          //connect to a v1 mn
2319
                          org.dataone.client.v1.MNode mn = org.dataone.client.v1.itk.D1Client.getMN(replicaNodeRef);
2320
                          mn.systemMetadataChanged(session, 
2321
                                  currentSystemMetadata.getIdentifier(), 
2322
                                  currentSystemMetadata.getSerialVersion().longValue(),
2323
                                  currentSystemMetadata.getDateSysMetadataModified());
2324
                      }
2325
                      
2326
                  }
2327
              
2328
              } catch (Exception e) { // handle BaseException and other I/O issues
2329
              
2330
                  // swallow errors since the call is not critical
2331
                  logMetacat.error("Can't inform "
2332
                          + replicaNodeRef.getValue()
2333
                          + " of system metadata changes due "
2334
                          + "to communication issues with the CN: "
2335
                          + e.getMessage());
2336
              
2337
              }
2338
          }
2339
      }
2340
  }
2341
  
2342
  /**
2343
   * Update the system metadata of the specified pid.
2344
   * Note: the serial version and the replica list in the new system metadata will be ignored and the old values will be kept.
2345
   */
2346
  @Override
2347
  public boolean updateSystemMetadata(Session session, Identifier pid,
2348
          SystemMetadata sysmeta) throws NotImplemented, NotAuthorized,
2349
          ServiceFailure, InvalidRequest, InvalidSystemMetadata, InvalidToken {
2350
   if(sysmeta == null) {
2351
       throw  new InvalidRequest("4863", "The system metadata object should NOT be null in the updateSystemMetadata request.");
2352
   }
2353
   if(pid == null || pid.getValue() == null) {
2354
       throw new InvalidRequest("4863", "Please specify the id in the updateSystemMetadata request ") ;
2355
   }
2356

    
2357
   if (session == null) {
2358
       //TODO: many of the thrown exceptions do not use the correct error codes
2359
       //check these against the docs and correct them
2360
       throw new NotAuthorized("4861", "No Session - could not authorize for updating system metadata." +
2361
               "  If you are not logged in, please do so and retry the request.");
2362
   } else {
2363
         //only CN is allwoed
2364
         if(!isCNAdmin(session)) {
2365
               throw new NotAuthorized("4861", "The client -"+ session.getSubject().getValue()+ "is not authorized for updating the system metadata of the object "+pid.getValue());
2366
         }
2367
   }
2368

    
2369
    //update the system metadata locally
2370
    boolean success = false;
2371
    try {
2372
        HazelcastService.getInstance().getSystemMetadataMap().lock(pid);
2373
        SystemMetadata currentSysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
2374
       
2375
        if(currentSysmeta == null) {
2376
            throw  new InvalidRequest("4863", "We can't find the current system metadata on the member node for the id "+pid.getValue());
2377
        }
2378
        // CN will ignore the comming serial version and replica list fields from the mn node. 
2379
        BigInteger currentSerialVersion = currentSysmeta.getSerialVersion();
2380
        sysmeta.setSerialVersion(currentSerialVersion);
2381
        List<Replica> replicas = currentSysmeta.getReplicaList();
2382
        sysmeta.setReplicaList(replicas);
2383
        boolean needUpdateModificationDate = false;//cn doesn't need to change the modification date.
2384
        boolean fromCN = true;
2385
        success = updateSystemMetadata(session, pid, sysmeta, needUpdateModificationDate, currentSysmeta, fromCN);
2386
    } finally {
2387
        HazelcastService.getInstance().getSystemMetadataMap().unlock(pid);
2388
    }
2389
    return success;
2390
  }
2391
  
2392
    @Override
2393
    public boolean synchronize(Session session, Identifier pid) throws NotAuthorized, InvalidRequest, NotImplemented{
2394
        throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2395

    
2396
    }
2397

    
2398
	@Override
2399
	public QueryEngineDescription getQueryEngineDescription(Session session,
2400
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
2401
			NotImplemented, NotFound {
2402
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2403

    
2404
	}
2405
	
2406
	@Override
2407
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
2408
			ServiceFailure, NotAuthorized, NotImplemented {
2409
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2410

    
2411
	}
2412
	
2413
	@Override
2414
	public InputStream query(Session session, String queryEngine, String query)
2415
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
2416
			NotImplemented, NotFound {
2417
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
2418

    
2419
	}
2420
	
2421
	@Override
2422
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
2423
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
2424
	}
2425

    
2426
}
(1-1/9)