Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.v2.CNode;
38
import org.dataone.client.v2.itk.D1Client;
39
import org.dataone.client.v2.MNode;
40
import org.dataone.service.cn.v2.CNAuthorization;
41
import org.dataone.service.cn.v2.CNCore;
42
import org.dataone.service.cn.v2.CNRead;
43
import org.dataone.service.cn.v2.CNReplication;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.UnsupportedType;
55
import org.dataone.service.exceptions.VersionMismatch;
56
import org.dataone.service.types.v1.AccessPolicy;
57
import org.dataone.service.types.v1.Checksum;
58
import org.dataone.service.types.v1.ChecksumAlgorithmList;
59
import org.dataone.service.types.v1.DescribeResponse;
60
import org.dataone.service.types.v1.Event;
61
import org.dataone.service.types.v1.Identifier;
62
import org.dataone.service.types.v2.Log;
63
import org.dataone.service.types.v2.Node;
64
import org.dataone.service.types.v2.NodeList;
65
import org.dataone.service.types.v1.NodeReference;
66
import org.dataone.service.types.v1.NodeType;
67
import org.dataone.service.types.v2.ObjectFormat;
68
import org.dataone.service.types.v1.ObjectFormatIdentifier;
69
import org.dataone.service.types.v2.ObjectFormatList;
70
import org.dataone.service.types.v1.ObjectList;
71
import org.dataone.service.types.v1.ObjectLocationList;
72
import org.dataone.service.types.v1.Permission;
73
import org.dataone.service.types.v1.Replica;
74
import org.dataone.service.types.v1.ReplicationPolicy;
75
import org.dataone.service.types.v1.ReplicationStatus;
76
import org.dataone.service.types.v1.Session;
77
import org.dataone.service.types.v1.Subject;
78
import org.dataone.service.types.v2.SystemMetadata;
79
import org.dataone.service.types.v2.util.ServiceMethodRestrictionUtil;
80
import org.dataone.service.types.v1_1.QueryEngineDescription;
81
import org.dataone.service.types.v1_1.QueryEngineList;
82

    
83
import edu.ucsb.nceas.metacat.EventLog;
84
import edu.ucsb.nceas.metacat.IdentifierManager;
85
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
86
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
87
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
88

    
89
/**
90
 * Represents Metacat's implementation of the DataONE Coordinating Node 
91
 * service API. Methods implement the various CN* interfaces, and methods common
92
 * to both Member Node and Coordinating Node interfaces are found in the
93
 * D1NodeService super class.
94
 *
95
 */
96
public class CNodeService extends D1NodeService implements CNAuthorization,
97
    CNCore, CNRead, CNReplication {
98

    
99
  /* the logger instance */
100
  private Logger logMetacat = null;
101

    
102
  /**
103
   * singleton accessor
104
   */
105
  public static CNodeService getInstance(HttpServletRequest request) { 
106
    return new CNodeService(request);
107
  }
108
  
109
  /**
110
   * Constructor, private for singleton access
111
   */
112
  private CNodeService(HttpServletRequest request) {
113
    super(request);
114
    logMetacat = Logger.getLogger(CNodeService.class);
115
        
116
  }
117
    
118
  /**
119
   * Set the replication policy for an object given the object identifier
120
   * 
121
   * @param session - the Session object containing the credentials for the Subject
122
   * @param pid - the object identifier for the given object
123
   * @param policy - the replication policy to be applied
124
   * 
125
   * @return true or false
126
   * 
127
   * @throws NotImplemented
128
   * @throws NotAuthorized
129
   * @throws ServiceFailure
130
   * @throws InvalidRequest
131
   * @throws VersionMismatch
132
   * 
133
   */
134
  @Override
135
  public boolean setReplicationPolicy(Session session, Identifier pid,
136
      ReplicationPolicy policy, long serialVersion) 
137
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
138
      InvalidRequest, InvalidToken, VersionMismatch {
139
      
140
      // The lock to be used for this identifier
141
      Lock lock = null;
142
      
143
      // get the subject
144
      Subject subject = session.getSubject();
145
      
146
      // are we allowed to do this?
147
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
148
          throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
149
                  + " not allowed by " + subject.getValue() + " on "
150
                  + pid.getValue());
151
          
152
      }
153
      
154
      SystemMetadata systemMetadata = null;
155
      try {
156
          lock = HazelcastService.getInstance().getLock(pid.getValue());
157
          lock.lock();
158
          logMetacat.debug("Locked identifier " + pid.getValue());
159

    
160
          try {
161
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
162
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
163
                  
164
              }
165
              
166
              // did we get it correctly?
167
              if ( systemMetadata == null ) {
168
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
169
                  
170
              }
171

    
172
              // does the request have the most current system metadata?
173
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
174
                 String msg = "The requested system metadata version number " + 
175
                     serialVersion + " differs from the current version at " +
176
                     systemMetadata.getSerialVersion().longValue() +
177
                     ". Please get the latest copy in order to modify it.";
178
                 throw new VersionMismatch("4886", msg);
179
                 
180
              }
181
              
182
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
183
              throw new NotFound("4884", "No record found for: " + pid.getValue());
184
            
185
          }
186
          
187
          // set the new policy
188
          systemMetadata.setReplicationPolicy(policy);
189
          
190
          // update the metadata
191
          try {
192
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
193
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
194
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
195
              notifyReplicaNodes(systemMetadata);
196
              
197
          } catch (RuntimeException e) {
198
              throw new ServiceFailure("4882", e.getMessage());
199
          
200
          }
201
          
202
      } catch (RuntimeException e) {
203
          throw new ServiceFailure("4882", e.getMessage());
204
          
205
      } finally {
206
          lock.unlock();
207
          logMetacat.debug("Unlocked identifier " + pid.getValue());
208
          
209
      }
210
    
211
      return true;
212
  }
213

    
214
  /**
215
   * Deletes the replica from the given Member Node
216
   * NOTE: MN.delete() may be an "archive" operation. TBD.
217
   * @param session
218
   * @param pid
219
   * @param nodeId
220
   * @param serialVersion
221
   * @return
222
   * @throws InvalidToken
223
   * @throws ServiceFailure
224
   * @throws NotAuthorized
225
   * @throws NotFound
226
   * @throws NotImplemented
227
   * @throws VersionMismatch
228
   */
229
  @Override
230
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
231
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
232
	  
233
	  	// The lock to be used for this identifier
234
		Lock lock = null;
235

    
236
		// get the subject
237
		Subject subject = session.getSubject();
238

    
239
		// are we allowed to do this?
240
		boolean isAuthorized = false;
241
		try {
242
			isAuthorized = isAuthorized(session, pid, Permission.WRITE);
243
		} catch (InvalidRequest e) {
244
			throw new ServiceFailure("4882", e.getDescription());
245
		}
246
		if (!isAuthorized) {
247
			throw new NotAuthorized("4881", Permission.WRITE
248
					+ " not allowed by " + subject.getValue() + " on "
249
					+ pid.getValue());
250

    
251
		}
252

    
253
		SystemMetadata systemMetadata = null;
254
		try {
255
			lock = HazelcastService.getInstance().getLock(pid.getValue());
256
			lock.lock();
257
			logMetacat.debug("Locked identifier " + pid.getValue());
258

    
259
			try {
260
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
261
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
262
				}
263

    
264
				// did we get it correctly?
265
				if (systemMetadata == null) {
266
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
267
				}
268

    
269
				// does the request have the most current system metadata?
270
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
271
					String msg = "The requested system metadata version number "
272
							+ serialVersion
273
							+ " differs from the current version at "
274
							+ systemMetadata.getSerialVersion().longValue()
275
							+ ". Please get the latest copy in order to modify it.";
276
					throw new VersionMismatch("4886", msg);
277

    
278
				}
279

    
280
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
281
				throw new NotFound("4884", "No record found for: " + pid.getValue());
282

    
283
			}
284
			  
285
			// check permissions
286
			// TODO: is this necessary?
287
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
288
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
289
			if (!isAllowed) {
290
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
291
			}
292
			  
293
			// delete the replica from the given node
294
			// CSJ: use CN.delete() to truly delete a replica, semantically
295
			// deleteReplicaMetadata() only modifies the sytem metadata entry.
296
			//D1Client.getMN(nodeId).delete(session, pid);
297
			  
298
			// reflect that change in the system metadata
299
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
300
			for (Replica r: systemMetadata.getReplicaList()) {
301
				  if (r.getReplicaMemberNode().equals(nodeId)) {
302
					  updatedReplicas.remove(r);
303
					  break;
304
				  }
305
			}
306
			systemMetadata.setReplicaList(updatedReplicas);
307

    
308
			// update the metadata
309
			try {
310
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
311
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
312
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
313
			} catch (RuntimeException e) {
314
				throw new ServiceFailure("4882", e.getMessage());
315
			}
316

    
317
		} catch (RuntimeException e) {
318
			throw new ServiceFailure("4882", e.getMessage());
319
		} finally {
320
			lock.unlock();
321
			logMetacat.debug("Unlocked identifier " + pid.getValue());
322
		}
323

    
324
		return true;	  
325
	  
326
  }
327
  
328
  /**
329
   * Deletes an object from the Coordinating Node
330
   * 
331
   * @param session - the Session object containing the credentials for the Subject
332
   * @param pid - The object identifier to be deleted
333
   * 
334
   * @return pid - the identifier of the object used for the deletion
335
   * 
336
   * @throws InvalidToken
337
   * @throws ServiceFailure
338
   * @throws NotAuthorized
339
   * @throws NotFound
340
   * @throws NotImplemented
341
   * @throws InvalidRequest
342
   */
343
  @Override
344
  public Identifier delete(Session session, Identifier pid) 
345
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
346
      
347
      String localId = null;      // The corresponding docid for this pid
348
	  Lock lock = null;           // The lock to be used for this identifier
349
      CNode cn = null;            // a reference to the CN to get the node list    
350
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
351
      List<Node> nodeList = null; // the list of nodes in this CN environment
352

    
353
      // check for a valid session
354
      if (session == null) {
355
        	throw new InvalidToken("4963", "No session has been provided");
356
        	
357
      }
358

    
359
      // do we have a valid pid?
360
      if (pid == null || pid.getValue().trim().equals("")) {
361
          throw new ServiceFailure("4960", "The provided identifier was invalid.");
362
          
363
      }
364

    
365
	  // check that it is CN/admin
366
	  boolean allowed = isAdminAuthorized(session);
367
	  
368
	  // additional check if it is the authoritative node if it is not the admin
369
      if(!allowed) {
370
          allowed = isAuthoritativeMNodeAdmin(session, pid);
371
          
372
      }
373
	  
374
	  if (!allowed) {
375
		  String msg = "The subject " + session.getSubject().getValue() + 
376
			  " is not allowed to call delete() on a Coordinating Node.";
377
		  logMetacat.info(msg);
378
		  throw new NotAuthorized("4960", msg);
379
		  
380
	  }
381
	  
382
	  // Don't defer to superclass implementation without a locally registered identifier
383
	  
384
      // Check for the existing identifier
385
      try {
386
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
387
          super.delete(session, pid);
388
          
389
      } catch (McdbDocNotFoundException e) {
390
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
391
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
392
    	  
393
          try {
394
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
395
  			  lock.lock();
396
  			  logMetacat.debug("Locked identifier " + pid.getValue());
397

    
398
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
399
			  if ( sysMeta != null ) {
400
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
401
				sysMeta.setArchived(true);
402
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
403
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
404
				
405
			  } else {
406
				  throw new ServiceFailure("4962", "Couldn't delete the object " + pid.getValue() +
407
					  ". Couldn't obtain the system metadata record.");
408
				  
409
			  }
410
			  
411
		  } catch (RuntimeException re) {
412
			  throw new ServiceFailure("4962", "Couldn't delete " + pid.getValue() + 
413
				  ". The error message was: " + re.getMessage());
414
			  
415
		  } finally {
416
			  lock.unlock();
417
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
418

    
419
		  }
420

    
421
          // NOTE: cannot log the delete without localId
422
//          EventLog.getInstance().log(request.getRemoteAddr(), 
423
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
424
//                  pid.getValue(), Event.DELETE.xmlValue());
425

    
426
      }
427

    
428
      // get the node list
429
      try {
430
          cn = D1Client.getCN();
431
          nodeList = cn.listNodes().getNodeList();
432
          
433
      } catch (Exception e) { // handle BaseException and other I/O issues
434
          
435
          // swallow errors since the call is not critical
436
          logMetacat.error("Can't inform MNs of the deletion of " + pid.getValue() + 
437
              " due to communication issues with the CN: " + e.getMessage());
438
          
439
      }
440

    
441
	  // notify the replicas
442
	  SystemMetadata systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
443
	  if (systemMetadata.getReplicaList() != null) {
444
		  for (Replica replica: systemMetadata.getReplicaList()) {
445
			  NodeReference replicaNode = replica.getReplicaMemberNode();
446
			  try {
447
                  if (nodeList != null) {
448
                      // find the node type
449
                      for (Node node : nodeList) {
450
                          if ( node.getIdentifier().getValue().equals(replicaNode.getValue()) ) {
451
                              nodeType = node.getType();
452
                              break;
453
              
454
                          }
455
                      }
456
                  }
457
                  
458
                  // only send call MN.delete() to avoid an infinite loop with the CN
459
                  if (nodeType != null && nodeType == NodeType.MN) {
460
				      Identifier mnRetId = D1Client.getMN(replicaNode).delete(null, pid);
461
                  }
462
                  
463
			  } catch (Exception e) {
464
				  // all we can really do is log errors and carry on with life
465
				  logMetacat.error("Error deleting pid: " +  pid.getValue() + 
466
					  " from replica MN: " + replicaNode.getValue(), e);
467
			}
468
			  
469
		  }
470
	  }
471
	  
472
	  return pid;
473
      
474
  }
475
  
476
  /**
477
   * Archives an object from the Coordinating Node
478
   * 
479
   * @param session - the Session object containing the credentials for the Subject
480
   * @param pid - The object identifier to be deleted
481
   * 
482
   * @return pid - the identifier of the object used for the deletion
483
   * 
484
   * @throws InvalidToken
485
   * @throws ServiceFailure
486
   * @throws NotAuthorized
487
   * @throws NotFound
488
   * @throws NotImplemented
489
   * @throws InvalidRequest
490
   */
491
  @Override
492
  public Identifier archive(Session session, Identifier pid) 
493
      throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
494

    
495
      String localId = null; // The corresponding docid for this pid
496
	  Lock lock = null;      // The lock to be used for this identifier
497
      CNode cn = null;            // a reference to the CN to get the node list    
498
      NodeType nodeType = null;   // the nodeType of the replica node being contacted
499
      List<Node> nodeList = null; // the list of nodes in this CN environment
500
      
501

    
502
      // check for a valid session
503
      if (session == null) {
504
        	throw new InvalidToken("4973", "No session has been provided");
505
        	
506
      }
507

    
508
      // do we have a valid pid?
509
      if (pid == null || pid.getValue().trim().equals("")) {
510
          throw new ServiceFailure("4972", "The provided identifier was invalid.");
511
          
512
      }
513

    
514
	  // check that it is CN/admin
515
	  boolean allowed = isAdminAuthorized(session);
516
	  
517
	  //check if it is the authoritative member node
518
	  if(!allowed) {
519
	      allowed = isAuthoritativeMNodeAdmin(session, pid);
520
	  }
521
	  
522
	  if (!allowed) {
523
		  String msg = "The subject " + session.getSubject().getValue() + 
524
				  " is not allowed to call archive() on a Coordinating Node.";
525
		  logMetacat.info(msg);
526
		  throw new NotAuthorized("4970", msg);
527
	  }
528
	  
529
      // Check for the existing identifier
530
      try {
531
          localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
532
          super.archive(session, pid);
533
          
534
      } catch (McdbDocNotFoundException e) {
535
          // This object is not registered in the identifier table. Assume it is of formatType DATA,
536
    	  // and set the archive flag. (i.e. the *object* doesn't exist on the CN)
537
    	  
538
          try {
539
  			  lock = HazelcastService.getInstance().getLock(pid.getValue());
540
  			  lock.lock();
541
  			  logMetacat.debug("Locked identifier " + pid.getValue());
542

    
543
			  SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
544
			  if ( sysMeta != null ) {
545
				sysMeta.setSerialVersion(sysMeta.getSerialVersion().add(BigInteger.ONE));
546
				sysMeta.setArchived(true);
547
				sysMeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
548
				HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
549
			    // notify the replicas
550
				notifyReplicaNodes(sysMeta);
551
				  
552
			  } else {
553
				  throw new ServiceFailure("4972", "Couldn't archive the object " + pid.getValue() +
554
					  ". Couldn't obtain the system metadata record.");
555
				  
556
			  }
557
			  
558
		  } catch (RuntimeException re) {
559
			  throw new ServiceFailure("4972", "Couldn't archive " + pid.getValue() + 
560
				  ". The error message was: " + re.getMessage());
561
			  
562
		  } finally {
563
			  lock.unlock();
564
			  logMetacat.debug("Unlocked identifier " + pid.getValue());
565

    
566
		  }
567

    
568
          // NOTE: cannot log the archive without localId
569
//          EventLog.getInstance().log(request.getRemoteAddr(), 
570
//                  request.getHeader("User-Agent"), session.getSubject().getValue(), 
571
//                  pid.getValue(), Event.DELETE.xmlValue());
572

    
573
      }
574

    
575
	  return pid;
576
      
577
  }
578
  
579
  /**
580
   * Set the obsoletedBy attribute in System Metadata
581
   * @param session
582
   * @param pid
583
   * @param obsoletedByPid
584
   * @param serialVersion
585
   * @return
586
   * @throws NotImplemented
587
   * @throws NotFound
588
   * @throws NotAuthorized
589
   * @throws ServiceFailure
590
   * @throws InvalidRequest
591
   * @throws InvalidToken
592
   * @throws VersionMismatch
593
   */
594
  @Override
595
  public boolean setObsoletedBy(Session session, Identifier pid,
596
			Identifier obsoletedByPid, long serialVersion)
597
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
598
			InvalidRequest, InvalidToken, VersionMismatch {
599

    
600
		// The lock to be used for this identifier
601
		Lock lock = null;
602

    
603
		// get the subject
604
		Subject subject = session.getSubject();
605

    
606
		// are we allowed to do this?
607
		if (!isAuthorized(session, pid, Permission.WRITE)) {
608
			throw new NotAuthorized("4881", Permission.WRITE
609
					+ " not allowed by " + subject.getValue() + " on "
610
					+ pid.getValue());
611

    
612
		}
613

    
614

    
615
		SystemMetadata systemMetadata = null;
616
		try {
617
			lock = HazelcastService.getInstance().getLock(pid.getValue());
618
			lock.lock();
619
			logMetacat.debug("Locked identifier " + pid.getValue());
620

    
621
			try {
622
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
623
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
624
				}
625

    
626
				// did we get it correctly?
627
				if (systemMetadata == null) {
628
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
629
				}
630

    
631
				// does the request have the most current system metadata?
632
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
633
					String msg = "The requested system metadata version number "
634
							+ serialVersion
635
							+ " differs from the current version at "
636
							+ systemMetadata.getSerialVersion().longValue()
637
							+ ". Please get the latest copy in order to modify it.";
638
					throw new VersionMismatch("4886", msg);
639

    
640
				}
641

    
642
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
643
				throw new NotFound("4884", "No record found for: " + pid.getValue());
644

    
645
			}
646

    
647
			// set the new policy
648
			systemMetadata.setObsoletedBy(obsoletedByPid);
649

    
650
			// update the metadata
651
			try {
652
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
653
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
654
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
655
			} catch (RuntimeException e) {
656
				throw new ServiceFailure("4882", e.getMessage());
657
			}
658

    
659
		} catch (RuntimeException e) {
660
			throw new ServiceFailure("4882", e.getMessage());
661
		} finally {
662
			lock.unlock();
663
			logMetacat.debug("Unlocked identifier " + pid.getValue());
664
		}
665

    
666
		return true;
667
	}
668
  
669
  
670
  /**
671
   * Set the replication status for an object given the object identifier
672
   * 
673
   * @param session - the Session object containing the credentials for the Subject
674
   * @param pid - the object identifier for the given object
675
   * @param status - the replication status to be applied
676
   * 
677
   * @return true or false
678
   * 
679
   * @throws NotImplemented
680
   * @throws NotAuthorized
681
   * @throws ServiceFailure
682
   * @throws InvalidRequest
683
   * @throws InvalidToken
684
   * @throws NotFound
685
   * 
686
   */
687
  @Override
688
  public boolean setReplicationStatus(Session session, Identifier pid,
689
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
690
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
691
      InvalidRequest, NotFound {
692
	  
693
	  // cannot be called by public
694
	  if (session == null) {
695
		  throw new NotAuthorized("4720", "Session cannot be null");
696
	  }
697
      
698
      // The lock to be used for this identifier
699
      Lock lock = null;
700
      
701
      boolean allowed = false;
702
      int replicaEntryIndex = -1;
703
      List<Replica> replicas = null;
704
      // get the subject
705
      Subject subject = session.getSubject();
706
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
707
          " is " + status.toString());
708
      
709
      SystemMetadata systemMetadata = null;
710

    
711
      try {
712
          lock = HazelcastService.getInstance().getLock(pid.getValue());
713
          lock.lock();
714
          logMetacat.debug("Locked identifier " + pid.getValue());
715

    
716
          try {      
717
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
718

    
719
              // did we get it correctly?
720
              if ( systemMetadata == null ) {
721
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
722
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
723
                  
724
              }
725
              replicas = systemMetadata.getReplicaList();
726
              int count = 0;
727
              
728
              // was there a failure? log it
729
              if ( failure != null && status.equals(ReplicationStatus.FAILED) ) {
730
                 String msg = "The replication request of the object identified by " + 
731
                     pid.getValue() + " failed.  The error message was " +
732
                     failure.getMessage() + ".";
733
              }
734
              
735
              if (replicas.size() > 0 && replicas != null) {
736
                  // find the target replica index in the replica list
737
                  for (Replica replica : replicas) {
738
                      String replicaNodeStr = replica.getReplicaMemberNode().getValue();
739
                      String targetNodeStr = targetNode.getValue();
740
                      logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
741
                  
742
                      if (replicaNodeStr.equals(targetNodeStr)) {
743
                          replicaEntryIndex = count;
744
                          logMetacat.debug("replica entry index is: "
745
                                  + replicaEntryIndex);
746
                          break;
747
                      }
748
                      count++;
749
                  
750
                  }
751
              }
752
              // are we allowed to do this? only CNs and target MNs are allowed
753
              CNode cn = D1Client.getCN();
754
              List<Node> nodes = cn.listNodes().getNodeList();
755
              
756
              // find the node in the node list
757
              for ( Node node : nodes ) {
758
                  
759
                  NodeReference nodeReference = node.getIdentifier();
760
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + 
761
                      nodeReference.getValue());
762
                  
763
                  // allow target MN certs
764
                  if ( targetNode.getValue().equals(nodeReference.getValue() ) &&
765
                      node.getType().equals(NodeType.MN)) {
766
                      List<Subject> nodeSubjects = node.getSubjectList();
767
                      
768
                      // check if the session subject is in the node subject list
769
                      for (Subject nodeSubject : nodeSubjects) {
770
                          logMetacat.debug("In setReplicationStatus(), comparing subjects: " +
771
                                  nodeSubject.getValue() + " and " + subject.getValue());
772
                          if ( nodeSubject.equals(subject) ) { // subject of session == target node subject
773
                              
774
                              // lastly limit to COMPLETED, INVALIDATED,
775
                              // and FAILED status updates from MNs only
776
                              if ( status.equals(ReplicationStatus.COMPLETED) ||
777
                                   status.equals(ReplicationStatus.INVALIDATED) ||
778
                                   status.equals(ReplicationStatus.FAILED)) {
779
                                  allowed = true;
780
                                  break;
781
                                  
782
                              }                              
783
                          }
784
                      }                 
785
                  }
786
              }
787

    
788
              if ( !allowed ) {
789
                  //check for CN admin access
790
                  allowed = isAuthorized(session, pid, Permission.WRITE);
791
                  
792
              }              
793
              
794
              if ( !allowed ) {
795
                  String msg = "The subject identified by "
796
                          + subject.getValue()
797
                          + " does not have permission to set the replication status for "
798
                          + "the replica identified by "
799
                          + targetNode.getValue() + ".";
800
                  logMetacat.info(msg);
801
                  throw new NotAuthorized("4720", msg);
802
                  
803
              }
804

    
805
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
806
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
807
                " : " + e.getMessage());
808
            
809
          }
810
          
811
          Replica targetReplica = new Replica();
812
          // set the status for the replica
813
          if ( replicaEntryIndex != -1 ) {
814
              targetReplica = replicas.get(replicaEntryIndex);
815
              
816
              // don't allow status to change from COMPLETED to anything other
817
              // than INVALIDATED: prevents overwrites from race conditions
818
              if ( targetReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
819
            	   !status.equals(ReplicationStatus.INVALIDATED)) {
820
            	  throw new InvalidRequest("4730", "Status state change from " +
821
            			  targetReplica.getReplicationStatus() + " to " +
822
            			  status.toString() + "is prohibited for identifier " +
823
            			  pid.getValue() + " and target node " + 
824
            			  targetReplica.getReplicaMemberNode().getValue());
825
              }
826
              
827
              targetReplica.setReplicationStatus(status);
828
              
829
              logMetacat.debug("Set the replication status for " + 
830
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
831
                  targetReplica.getReplicationStatus() + " for identifier " +
832
                  pid.getValue());
833
              
834
          } else {
835
              // this is a new entry, create it
836
              targetReplica.setReplicaMemberNode(targetNode);
837
              targetReplica.setReplicationStatus(status);
838
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
839
              replicas.add(targetReplica);
840
              
841
          }
842
          
843
          systemMetadata.setReplicaList(replicas);
844
                
845
          // update the metadata
846
          try {
847
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
848
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
849
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
850

    
851
              if ( !status.equals(ReplicationStatus.QUEUED) && 
852
            	   !status.equals(ReplicationStatus.REQUESTED)) {
853
                  
854
                logMetacat.trace("METRICS:\tREPLICATION:\tEND REQUEST:\tPID:\t" + pid.getValue() + 
855
                          "\tNODE:\t" + targetNode.getValue() + 
856
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
857
                
858
                logMetacat.trace("METRICS:\tREPLICATION:\t" + status.toString().toUpperCase() +
859
                          "\tPID:\t"  + pid.getValue() + 
860
                          "\tNODE:\t" + targetNode.getValue() + 
861
                          "\tSIZE:\t" + systemMetadata.getSize().intValue());
862
              }
863

    
864
              if ( status.equals(ReplicationStatus.FAILED) && failure != null ) {
865
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
866
                      " on target node " + targetNode + ". The exception was: " +
867
                      failure.getMessage());
868
              }
869
              
870
			  // update the replica nodes about the completed replica when complete
871
              if (status.equals(ReplicationStatus.COMPLETED)) {
872
				notifyReplicaNodes(systemMetadata);
873
			}
874
          
875
          } catch (RuntimeException e) {
876
              throw new ServiceFailure("4700", e.getMessage());
877
          
878
          }
879
          
880
    } catch (RuntimeException e) {
881
        String msg = "There was a RuntimeException getting the lock for " +
882
            pid.getValue();
883
        logMetacat.info(msg);
884
        
885
    } finally {
886
        lock.unlock();
887
        logMetacat.debug("Unlocked identifier " + pid.getValue());
888
        
889
    }
890
      
891
      return true;
892
  }
893
  
894
/**
895
   * Return the checksum of the object given the identifier 
896
   * 
897
   * @param session - the Session object containing the credentials for the Subject
898
   * @param pid - the object identifier for the given object
899
   * 
900
   * @return checksum - the checksum of the object
901
   * 
902
   * @throws InvalidToken
903
   * @throws ServiceFailure
904
   * @throws NotAuthorized
905
   * @throws NotFound
906
   * @throws NotImplemented
907
   */
908
  @Override
909
  public Checksum getChecksum(Session session, Identifier pid)
910
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
911
    NotImplemented {
912
    
913
	boolean isAuthorized = false;
914
	try {
915
		isAuthorized = isAuthorized(session, pid, Permission.READ);
916
	} catch (InvalidRequest e) {
917
		throw new ServiceFailure("1410", e.getDescription());
918
	}  
919
    if (!isAuthorized) {
920
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
921
    }
922
    
923
    SystemMetadata systemMetadata = null;
924
    Checksum checksum = null;
925
    
926
    try {
927
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
928

    
929
        if (systemMetadata == null ) {
930
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
931
        }
932
        checksum = systemMetadata.getChecksum();
933
        
934
    } catch (RuntimeException e) {
935
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
936
            pid.getValue() + ". The error message was: " + e.getMessage());
937
      
938
    }
939
    
940
    return checksum;
941
  }
942

    
943
  /**
944
   * Resolve the location of a given object
945
   * 
946
   * @param session - the Session object containing the credentials for the Subject
947
   * @param pid - the object identifier for the given object
948
   * 
949
   * @return objectLocationList - the list of nodes known to contain the object
950
   * 
951
   * @throws InvalidToken
952
   * @throws ServiceFailure
953
   * @throws NotAuthorized
954
   * @throws NotFound
955
   * @throws NotImplemented
956
   */
957
  @Override
958
  public ObjectLocationList resolve(Session session, Identifier pid)
959
    throws InvalidToken, ServiceFailure, NotAuthorized,
960
    NotFound, NotImplemented {
961

    
962
    throw new NotImplemented("4131", "resolve not implemented");
963

    
964
  }
965

    
966
  /**
967
   * Metacat does not implement this method at the CN level
968
   */
969
  @Override
970
  public ObjectList search(Session session, String queryType, String query)
971
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
972
    NotImplemented {
973

    
974
		  throw new NotImplemented("4281", "Metacat does not implement CN.search");
975
	  
976
//    ObjectList objectList = null;
977
//    try {
978
//        objectList = 
979
//          IdentifierManager.getInstance().querySystemMetadata(
980
//              null, //startTime, 
981
//              null, //endTime,
982
//              null, //objectFormat, 
983
//              false, //replicaStatus, 
984
//              0, //start, 
985
//              1000 //count
986
//              );
987
//        
988
//    } catch (Exception e) {
989
//      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
990
//    }
991
//
992
//      return objectList;
993
		  
994
  }
995
  
996
  /**
997
   * Returns the object format registered in the DataONE Object Format 
998
   * Vocabulary for the given format identifier
999
   * 
1000
   * @param fmtid - the identifier of the format requested
1001
   * 
1002
   * @return objectFormat - the object format requested
1003
   * 
1004
   * @throws ServiceFailure
1005
   * @throws NotFound
1006
   * @throws InsufficientResources
1007
   * @throws NotImplemented
1008
   */
1009
  @Override
1010
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
1011
    throws ServiceFailure, NotFound, NotImplemented {
1012
     
1013
      return ObjectFormatService.getInstance().getFormat(fmtid);
1014
      
1015
  }
1016

    
1017
  /**
1018
   * Returns a list of all object formats registered in the DataONE Object 
1019
   * Format Vocabulary
1020
    * 
1021
   * @return objectFormatList - The list of object formats registered in 
1022
   *                            the DataONE Object Format Vocabulary
1023
   * 
1024
   * @throws ServiceFailure
1025
   * @throws NotImplemented
1026
   * @throws InsufficientResources
1027
   */
1028
  @Override
1029
  public ObjectFormatList listFormats() 
1030
    throws ServiceFailure, NotImplemented {
1031

    
1032
    return ObjectFormatService.getInstance().listFormats();
1033
  }
1034

    
1035
  /**
1036
   * Returns a list of nodes that have been registered with the DataONE infrastructure
1037
    * 
1038
   * @return nodeList - List of nodes from the registry
1039
   * 
1040
   * @throws ServiceFailure
1041
   * @throws NotImplemented
1042
   */
1043
  @Override
1044
  public NodeList listNodes() 
1045
    throws NotImplemented, ServiceFailure {
1046

    
1047
    throw new NotImplemented("4800", "listNodes not implemented");
1048
  }
1049

    
1050
  /**
1051
   * Provides a mechanism for adding system metadata independently of its 
1052
   * associated object, such as when adding system metadata for data objects.
1053
    * 
1054
   * @param session - the Session object containing the credentials for the Subject
1055
   * @param pid - The identifier of the object to register the system metadata against
1056
   * @param sysmeta - The system metadata to be registered
1057
   * 
1058
   * @return true if the registration succeeds
1059
   * 
1060
   * @throws NotImplemented
1061
   * @throws NotAuthorized
1062
   * @throws ServiceFailure
1063
   * @throws InvalidRequest
1064
   * @throws InvalidSystemMetadata
1065
   */
1066
  @Override
1067
  public Identifier registerSystemMetadata(Session session, Identifier pid,
1068
      SystemMetadata sysmeta) 
1069
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
1070
      InvalidSystemMetadata {
1071

    
1072
      // The lock to be used for this identifier
1073
      Lock lock = null;
1074

    
1075
      // TODO: control who can call this?
1076
      if (session == null) {
1077
          //TODO: many of the thrown exceptions do not use the correct error codes
1078
          //check these against the docs and correct them
1079
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
1080
                  "  If you are not logged in, please do so and retry the request.");
1081
      }
1082
      
1083
      // verify that guid == SystemMetadata.getIdentifier()
1084
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
1085
          "|" + sysmeta.getIdentifier().getValue());
1086
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
1087
          throw new InvalidRequest("4863", 
1088
              "The identifier in method call (" + pid.getValue() + 
1089
              ") does not match identifier in system metadata (" +
1090
              sysmeta.getIdentifier().getValue() + ").");
1091
      }
1092

    
1093
      try {
1094
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
1095
          lock.lock();
1096
          logMetacat.debug("Locked identifier " + pid.getValue());
1097
          logMetacat.debug("Checking if identifier exists...");
1098
          // Check that the identifier does not already exist
1099
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
1100
              throw new InvalidRequest("4863", 
1101
                  "The identifier is already in use by an existing object.");
1102
          
1103
          }
1104
          
1105
          // insert the system metadata into the object store
1106
          logMetacat.debug("Starting to insert SystemMetadata...");
1107
          try {
1108
              sysmeta.setSerialVersion(BigInteger.ONE);
1109
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1110
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
1111
              
1112
          } catch (RuntimeException e) {
1113
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
1114
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1115
                  e.getClass() + ": " + e.getMessage());
1116
              
1117
          }
1118
          
1119
      } catch (RuntimeException e) {
1120
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
1121
                  e.getClass() + ": " + e.getMessage());
1122
          
1123
      }  finally {
1124
          lock.unlock();
1125
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1126
          
1127
      }
1128

    
1129
      
1130
      logMetacat.debug("Returning from registerSystemMetadata");
1131
      
1132
      try {
1133
    	  String localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1134
    	  EventLog.getInstance().log(request.getRemoteAddr(), 
1135
    	          request.getHeader("User-Agent"), session.getSubject().getValue(), 
1136
    	          localId, "registerSystemMetadata");
1137
      } catch (McdbDocNotFoundException e) {
1138
    	  // do nothing, no localId to log with
1139
    	  logMetacat.warn("Could not log 'registerSystemMetadata' event because no localId was found for pid: " + pid.getValue());
1140
      }
1141
      
1142
      
1143
      return pid;
1144
  }
1145
  
1146
  /**
1147
   * Given an optional scope and format, reserves and returns an identifier 
1148
   * within that scope and format that is unique and will not be 
1149
   * used by any other sessions. 
1150
    * 
1151
   * @param session - the Session object containing the credentials for the Subject
1152
   * @param pid - The identifier of the object to register the system metadata against
1153
   * @param scope - An optional string to be used to qualify the scope of 
1154
   *                the identifier namespace, which is applied differently 
1155
   *                depending on the format requested. If scope is not 
1156
   *                supplied, a default scope will be used.
1157
   * @param format - The optional name of the identifier format to be used, 
1158
   *                  drawn from a DataONE-specific vocabulary of identifier 
1159
   *                 format names, including several common syntaxes such 
1160
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
1161
   *                 format is not supplied by the caller, the CN service 
1162
   *                 will use a default identifier format, which may change 
1163
   *                 over time.
1164
   * 
1165
   * @return true if the registration succeeds
1166
   * 
1167
   * @throws InvalidToken
1168
   * @throws ServiceFailure
1169
   * @throws NotAuthorized
1170
   * @throws IdentifierNotUnique
1171
   * @throws NotImplemented
1172
   */
1173
  @Override
1174
  public Identifier reserveIdentifier(Session session, Identifier pid)
1175
  throws InvalidToken, ServiceFailure,
1176
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
1177

    
1178
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
1179
  }
1180
  
1181
  @Override
1182
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
1183
  throws InvalidToken, ServiceFailure,
1184
        NotAuthorized, NotImplemented, InvalidRequest {
1185
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
1186
  }
1187
  
1188
  /**
1189
    * Checks whether the pid is reserved by the subject in the session param
1190
    * If the reservation is held on the pid by the subject, we return true.
1191
    * 
1192
   * @param session - the Session object containing the Subject
1193
   * @param pid - The identifier to check
1194
   * 
1195
   * @return true if the reservation exists for the subject/pid
1196
   * 
1197
   * @throws InvalidToken
1198
   * @throws ServiceFailure
1199
   * @throws NotFound - when the pid is not found (in use or in reservation)
1200
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
1201
   * @throws IdentifierNotUnique - when the pid is in use
1202
   * @throws NotImplemented
1203
   */
1204

    
1205
  @Override
1206
  public boolean hasReservation(Session session, Subject subject, Identifier pid) 
1207
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
1208
      NotImplemented, InvalidRequest {
1209
  
1210
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
1211
  }
1212

    
1213
  /**
1214
   * Changes ownership (RightsHolder) of the specified object to the 
1215
   * subject specified by userId
1216
    * 
1217
   * @param session - the Session object containing the credentials for the Subject
1218
   * @param pid - Identifier of the object to be modified
1219
   * @param userId - The subject that will be taking ownership of the specified object.
1220
   *
1221
   * @return pid - the identifier of the modified object
1222
   * 
1223
   * @throws ServiceFailure
1224
   * @throws InvalidToken
1225
   * @throws NotFound
1226
   * @throws NotAuthorized
1227
   * @throws NotImplemented
1228
   * @throws InvalidRequest
1229
   */  
1230
  @Override
1231
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
1232
      long serialVersion)
1233
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
1234
      NotImplemented, InvalidRequest, VersionMismatch {
1235
      
1236
      // The lock to be used for this identifier
1237
      Lock lock = null;
1238

    
1239
      // get the subject
1240
      Subject subject = session.getSubject();
1241
      
1242
      // are we allowed to do this?
1243
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1244
          throw new NotAuthorized("4440", "not allowed by "
1245
                  + subject.getValue() + " on " + pid.getValue());
1246
          
1247
      }
1248
      
1249
      SystemMetadata systemMetadata = null;
1250
      try {
1251
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1252
          lock.lock();
1253
          logMetacat.debug("Locked identifier " + pid.getValue());
1254

    
1255
          try {
1256
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1257
              
1258
              // does the request have the most current system metadata?
1259
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1260
                 String msg = "The requested system metadata version number " + 
1261
                     serialVersion + " differs from the current version at " +
1262
                     systemMetadata.getSerialVersion().longValue() +
1263
                     ". Please get the latest copy in order to modify it.";
1264
                 throw new VersionMismatch("4443", msg);
1265
              }
1266
              
1267
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1268
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1269
              
1270
          }
1271
              
1272
          // set the new rights holder
1273
          systemMetadata.setRightsHolder(userId);
1274
          
1275
          // update the metadata
1276
          try {
1277
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1278
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1279
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1280
              notifyReplicaNodes(systemMetadata);
1281
              
1282
          } catch (RuntimeException e) {
1283
              throw new ServiceFailure("4490", e.getMessage());
1284
          
1285
          }
1286
          
1287
      } catch (RuntimeException e) {
1288
          throw new ServiceFailure("4490", e.getMessage());
1289
          
1290
      } finally {
1291
          lock.unlock();
1292
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1293
      
1294
      }
1295
      
1296
      return pid;
1297
  }
1298

    
1299
  /**
1300
   * Verify that a replication task is authorized by comparing the target node's
1301
   * Subject (from the X.509 certificate-derived Session) with the list of 
1302
   * subjects in the known, pending replication tasks map.
1303
   * 
1304
   * @param originatingNodeSession - Session information that contains the 
1305
   *                                 identity of the calling user
1306
   * @param targetNodeSubject - Subject identifying the target node
1307
   * @param pid - the identifier of the object to be replicated
1308
   * @param replicatePermission - the execute permission to be granted
1309
   * 
1310
   * @throws ServiceFailure
1311
   * @throws NotImplemented
1312
   * @throws InvalidToken
1313
   * @throws NotAuthorized
1314
   * @throws InvalidRequest
1315
   * @throws NotFound
1316
   */
1317
  @Override
1318
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1319
    Subject targetNodeSubject, Identifier pid) 
1320
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1321
    NotFound, InvalidRequest {
1322
    
1323
    boolean isAllowed = false;
1324
    SystemMetadata sysmeta = null;
1325
    NodeReference targetNode = null;
1326
    
1327
    try {
1328
      // get the target node reference from the nodes list
1329
      CNode cn = D1Client.getCN();
1330
      List<Node> nodes = cn.listNodes().getNodeList();
1331
      
1332
      if ( nodes != null ) {
1333
        for (Node node : nodes) {
1334
            
1335
        	if (node.getSubjectList() != null) {
1336
        		
1337
	            for (Subject nodeSubject : node.getSubjectList()) {
1338
	            	
1339
	                if ( nodeSubject.equals(targetNodeSubject) ) {
1340
	                    targetNode = node.getIdentifier();
1341
	                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1342
	                    break;
1343
	                }
1344
	            }
1345
        	}
1346
            
1347
            if ( targetNode != null) { break; }
1348
        }
1349
        
1350
      } else {
1351
          String msg = "Couldn't get the node list from the CN";
1352
          logMetacat.debug(msg);
1353
          throw new ServiceFailure("4872", msg);
1354
          
1355
      }
1356
      
1357
      // can't find a node listed with the given subject
1358
      if ( targetNode == null ) {
1359
          String msg = "There is no Member Node registered with a node subject " +
1360
              "matching " + targetNodeSubject.getValue();
1361
          logMetacat.info(msg);
1362
          throw new NotAuthorized("4871", msg);
1363
          
1364
      }
1365
      
1366
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1367
      
1368
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1369

    
1370
      if ( sysmeta != null ) {
1371
          
1372
          List<Replica> replicaList = sysmeta.getReplicaList();
1373
          
1374
          if ( replicaList != null ) {
1375
              
1376
              // find the replica with the status set to 'requested'
1377
              for (Replica replica : replicaList) {
1378
                  ReplicationStatus status = replica.getReplicationStatus();
1379
                  NodeReference listedNode = replica.getReplicaMemberNode();
1380
                  if ( listedNode != null && targetNode != null ) {
1381
                      logMetacat.debug("Comparing " + listedNode.getValue()
1382
                              + " to " + targetNode.getValue());
1383
                      
1384
                      if (listedNode.getValue().equals(targetNode.getValue())
1385
                              && status.equals(ReplicationStatus.REQUESTED)) {
1386
                          isAllowed = true;
1387
                          break;
1388

    
1389
                      }
1390
                  }
1391
              }
1392
          }
1393
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1394
              "to replicate: " + isAllowed + " for " + pid.getValue());
1395

    
1396
          
1397
      } else {
1398
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1399
          " is null.");          
1400
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1401
          
1402
      }
1403

    
1404
    } catch (RuntimeException e) {
1405
    	  ServiceFailure sf = new ServiceFailure("4872", 
1406
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1407
                e.getMessage());
1408
    	  sf.initCause(e);
1409
        throw sf;
1410
        
1411
    }
1412
      
1413
    return isAllowed;
1414
    
1415
  }
1416

    
1417
  /**
1418
   * Adds a new object to the Node, where the object is a science metadata object.
1419
   * 
1420
   * @param session - the Session object containing the credentials for the Subject
1421
   * @param pid - The object identifier to be created
1422
   * @param object - the object bytes
1423
   * @param sysmeta - the system metadata that describes the object  
1424
   * 
1425
   * @return pid - the object identifier created
1426
   * 
1427
   * @throws InvalidToken
1428
   * @throws ServiceFailure
1429
   * @throws NotAuthorized
1430
   * @throws IdentifierNotUnique
1431
   * @throws UnsupportedType
1432
   * @throws InsufficientResources
1433
   * @throws InvalidSystemMetadata
1434
   * @throws NotImplemented
1435
   * @throws InvalidRequest
1436
   */
1437
  public Identifier create(Session session, Identifier pid, InputStream object,
1438
    SystemMetadata sysmeta) 
1439
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1440
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1441
    NotImplemented, InvalidRequest {
1442
                  
1443
      // The lock to be used for this identifier
1444
      Lock lock = null;
1445

    
1446
      try {
1447
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1448
          // are we allowed?
1449
          boolean isAllowed = false;
1450
          isAllowed = isAdminAuthorized(session);
1451
          
1452
          // additional check if it is the authoritative node if it is not the admin
1453
          if(!isAllowed) {
1454
              isAllowed = isAuthoritativeMNodeAdmin(session, pid);
1455
          }
1456

    
1457
          // proceed if we're called by a CN
1458
          if ( isAllowed ) {
1459
              // create the coordinating node version of the document      
1460
              lock.lock();
1461
              logMetacat.debug("Locked identifier " + pid.getValue());
1462
              sysmeta.setSerialVersion(BigInteger.ONE);
1463
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1464
              //sysmeta.setArchived(false); // this is a create op, not update
1465
              
1466
              // the CN should have set the origin and authoritative member node fields
1467
              try {
1468
                  sysmeta.getOriginMemberNode().getValue();
1469
                  sysmeta.getAuthoritativeMemberNode().getValue();
1470
                  
1471
              } catch (NullPointerException npe) {
1472
                  throw new InvalidSystemMetadata("4896", 
1473
                      "Both the origin and authoritative member node identifiers need to be set.");
1474
                  
1475
              }
1476
              pid = super.create(session, pid, object, sysmeta);
1477

    
1478
          } else {
1479
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1480
                  " isn't allowed to call create() on a Coordinating Node.";
1481
              logMetacat.info(msg);
1482
              throw new NotAuthorized("1100", msg);
1483
          }
1484
          
1485
      } catch (RuntimeException e) {
1486
          // Convert Hazelcast runtime exceptions to service failures
1487
          String msg = "There was a problem creating the object identified by " +
1488
              pid.getValue() + ". There error message was: " + e.getMessage();
1489
          throw new ServiceFailure("4893", msg);
1490
          
1491
      } finally {
1492
    	  if (lock != null) {
1493
	          lock.unlock();
1494
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1495
    	  }
1496
      }
1497
      
1498
      return pid;
1499

    
1500
  }
1501

    
1502
  /**
1503
   * Set access for a given object using the object identifier and a Subject
1504
   * under a given Session.
1505
   * 
1506
   * @param session - the Session object containing the credentials for the Subject
1507
   * @param pid - the object identifier for the given object to apply the policy
1508
   * @param policy - the access policy to be applied
1509
   * 
1510
   * @return true if the application of the policy succeeds
1511
   * @throws InvalidToken
1512
   * @throws ServiceFailure
1513
   * @throws NotFound
1514
   * @throws NotAuthorized
1515
   * @throws NotImplemented
1516
   * @throws InvalidRequest
1517
   */
1518
  public boolean setAccessPolicy(Session session, Identifier pid, 
1519
      AccessPolicy accessPolicy, long serialVersion) 
1520
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1521
      NotImplemented, InvalidRequest, VersionMismatch {
1522
      
1523
      // The lock to be used for this identifier
1524
      Lock lock = null;
1525
      SystemMetadata systemMetadata = null;
1526
      
1527
      boolean success = false;
1528
      
1529
      // get the subject
1530
      Subject subject = session.getSubject();
1531
      
1532
      // are we allowed to do this?
1533
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1534
          throw new NotAuthorized("4420", "not allowed by "
1535
                  + subject.getValue() + " on " + pid.getValue());
1536
      }
1537
      
1538
      try {
1539
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1540
          lock.lock();
1541
          logMetacat.debug("Locked identifier " + pid.getValue());
1542

    
1543
          try {
1544
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1545

    
1546
              if ( systemMetadata == null ) {
1547
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1548
                  
1549
              }
1550
              // does the request have the most current system metadata?
1551
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1552
                 String msg = "The requested system metadata version number " + 
1553
                     serialVersion + " differs from the current version at " +
1554
                     systemMetadata.getSerialVersion().longValue() +
1555
                     ". Please get the latest copy in order to modify it.";
1556
                 throw new VersionMismatch("4402", msg);
1557
                 
1558
              }
1559
              
1560
          } catch (RuntimeException e) {
1561
              // convert Hazelcast RuntimeException to NotFound
1562
              throw new NotFound("4400", "No record found for: " + pid);
1563
            
1564
          }
1565
              
1566
          // set the access policy
1567
          systemMetadata.setAccessPolicy(accessPolicy);
1568
          
1569
          // update the system metadata
1570
          try {
1571
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1572
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1573
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1574
              notifyReplicaNodes(systemMetadata);
1575
              
1576
          } catch (RuntimeException e) {
1577
              // convert Hazelcast RuntimeException to ServiceFailure
1578
              throw new ServiceFailure("4430", e.getMessage());
1579
            
1580
          }
1581
          
1582
      } catch (RuntimeException e) {
1583
          throw new ServiceFailure("4430", e.getMessage());
1584
          
1585
      } finally {
1586
          lock.unlock();
1587
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1588
        
1589
      }
1590

    
1591
    
1592
    // TODO: how do we know if the map was persisted?
1593
    success = true;
1594
    
1595
    return success;
1596
  }
1597

    
1598
  /**
1599
   * Full replacement of replication metadata in the system metadata for the 
1600
   * specified object, changes date system metadata modified
1601
   * 
1602
   * @param session - the Session object containing the credentials for the Subject
1603
   * @param pid - the object identifier for the given object to apply the policy
1604
   * @param replica - the replica to be updated
1605
   * @return
1606
   * @throws NotImplemented
1607
   * @throws NotAuthorized
1608
   * @throws ServiceFailure
1609
   * @throws InvalidRequest
1610
   * @throws NotFound
1611
   * @throws VersionMismatch
1612
   */
1613
  @Override
1614
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1615
      Replica replica, long serialVersion) 
1616
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1617
      NotFound, VersionMismatch {
1618
      
1619
      // The lock to be used for this identifier
1620
      Lock lock = null;
1621
      
1622
      // get the subject
1623
      Subject subject = session.getSubject();
1624
      
1625
      // are we allowed to do this?
1626
      try {
1627

    
1628
          // what is the controlling permission?
1629
          if (!isAuthorized(session, pid, Permission.WRITE)) {
1630
              throw new NotAuthorized("4851", "not allowed by "
1631
                      + subject.getValue() + " on " + pid.getValue());
1632
          }
1633

    
1634
        
1635
      } catch (InvalidToken e) {
1636
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1637
                  " on " + pid.getValue());  
1638
          
1639
      }
1640

    
1641
      SystemMetadata systemMetadata = null;
1642
      try {
1643
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1644
          lock.lock();
1645
          logMetacat.debug("Locked identifier " + pid.getValue());
1646

    
1647
          try {      
1648
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1649

    
1650
              // does the request have the most current system metadata?
1651
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1652
                 String msg = "The requested system metadata version number " + 
1653
                     serialVersion + " differs from the current version at " +
1654
                     systemMetadata.getSerialVersion().longValue() +
1655
                     ". Please get the latest copy in order to modify it.";
1656
                 throw new VersionMismatch("4855", msg);
1657
              }
1658
              
1659
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1660
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1661
                  " : " + e.getMessage());
1662
            
1663
          }
1664
              
1665
          // set the status for the replica
1666
          List<Replica> replicas = systemMetadata.getReplicaList();
1667
          NodeReference replicaNode = replica.getReplicaMemberNode();
1668
          ReplicationStatus replicaStatus = replica.getReplicationStatus();
1669
          int index = 0;
1670
          for (Replica listedReplica: replicas) {
1671
              
1672
              // remove the replica that we are replacing
1673
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1674
                      // don't allow status to change from COMPLETED to anything other
1675
                      // than INVALIDATED: prevents overwrites from race conditions
1676
                	  if ( !listedReplica.getReplicationStatus().equals(replicaStatus) && 
1677
                	       listedReplica.getReplicationStatus().equals(ReplicationStatus.COMPLETED) &&
1678
            		       !replicaStatus.equals(ReplicationStatus.INVALIDATED) ) {
1679
                	  throw new InvalidRequest("4853", "Status state change from " +
1680
                			  listedReplica.getReplicationStatus() + " to " +
1681
                			  replicaStatus.toString() + "is prohibited for identifier " +
1682
                			  pid.getValue() + " and target node " + 
1683
                			  listedReplica.getReplicaMemberNode().getValue());
1684

    
1685
            	  }
1686
                  replicas.remove(index);
1687
                  break;
1688
                  
1689
              }
1690
              index++;
1691
          }
1692
          
1693
          // add the new replica item
1694
          replicas.add(replica);
1695
          systemMetadata.setReplicaList(replicas);
1696
          
1697
          // update the metadata
1698
          try {
1699
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1700
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1701
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1702
              
1703
              // inform replica nodes of the change if the status is complete
1704
              if ( replicaStatus.equals(ReplicationStatus.COMPLETED) ) {
1705
            	  notifyReplicaNodes(systemMetadata);
1706
            	  
1707
              }
1708
          } catch (RuntimeException e) {
1709
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1710
              throw new ServiceFailure("4852", e.getMessage());
1711
          
1712
          }
1713
          
1714
      } catch (RuntimeException e) {
1715
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1716
          throw new ServiceFailure("4852", e.getMessage());
1717
      
1718
      } finally {
1719
          lock.unlock();
1720
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1721
          
1722
      }
1723
    
1724
      return true;
1725
      
1726
  }
1727
  
1728
  /**
1729
   * 
1730
   */
1731
  @Override
1732
  public ObjectList listObjects(Session session, Date startTime, 
1733
      Date endTime, ObjectFormatIdentifier formatid, Identifier identifier, Boolean replicaStatus,
1734
      Integer start, Integer count)
1735
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1736
      ServiceFailure {
1737
      
1738
      ObjectList objectList = null;
1739
        try {
1740
        	if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
1741
            	count = MAXIMUM_DB_RECORD_COUNT;
1742
            }
1743
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1744
        } catch (Exception e) {
1745
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1746
        }
1747

    
1748
        return objectList;
1749
  }
1750

    
1751
  
1752
 	/**
1753
 	 * Returns a list of checksum algorithms that are supported by DataONE.
1754
 	 * @return cal  the list of checksum algorithms
1755
 	 * 
1756
 	 * @throws ServiceFailure
1757
 	 * @throws NotImplemented
1758
 	 */
1759
  @Override
1760
  public ChecksumAlgorithmList listChecksumAlgorithms()
1761
			throws ServiceFailure, NotImplemented {
1762
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1763
		cal.addAlgorithm("MD5");
1764
		cal.addAlgorithm("SHA-1");
1765
		return cal;
1766
		
1767
	}
1768

    
1769
  /**
1770
   * Notify replica Member Nodes of system metadata changes for a given pid
1771
   * 
1772
   * @param currentSystemMetadata - the up to date system metadata
1773
   */
1774
  public void notifyReplicaNodes(SystemMetadata currentSystemMetadata) {
1775
      
1776
      Session session = null;
1777
      List<Replica> replicaList = currentSystemMetadata.getReplicaList();
1778
      MNode mn = null;
1779
      NodeReference replicaNodeRef = null;
1780
      CNode cn = null;
1781
      NodeType nodeType = null;
1782
      List<Node> nodeList = null;
1783
      
1784
      try {
1785
          cn = D1Client.getCN();
1786
          nodeList = cn.listNodes().getNodeList();
1787
          
1788
      } catch (Exception e) { // handle BaseException and other I/O issues
1789
          
1790
          // swallow errors since the call is not critical
1791
          logMetacat.error("Can't inform MNs of system metadata changes due " +
1792
              "to communication issues with the CN: " + e.getMessage());
1793
          
1794
      }
1795
      
1796
      if ( replicaList != null ) {
1797
          
1798
          // iterate through the replicas and inform  MN replica nodes
1799
          for (Replica replica : replicaList) {
1800
              
1801
              replicaNodeRef = replica.getReplicaMemberNode();
1802
              try {
1803
                  if (nodeList != null) {
1804
                      // find the node type
1805
                      for (Node node : nodeList) {
1806
                          if ( node.getIdentifier().getValue().equals(replicaNodeRef.getValue()) ) {
1807
                              nodeType = node.getType();
1808
                              break;
1809
              
1810
                          }
1811
                      }
1812
                  }
1813
              
1814
                  // notify only MNs
1815
                  if (nodeType != null && nodeType == NodeType.MN) {
1816
                      mn = D1Client.getMN(replicaNodeRef);
1817
                      mn.systemMetadataChanged(session, 
1818
                          currentSystemMetadata.getIdentifier(), 
1819
                          currentSystemMetadata.getSerialVersion().longValue(),
1820
                          currentSystemMetadata.getDateSysMetadataModified());
1821
                  }
1822
              
1823
              } catch (Exception e) { // handle BaseException and other I/O issues
1824
              
1825
                  // swallow errors since the call is not critical
1826
                  logMetacat.error("Can't inform "
1827
                          + replicaNodeRef.getValue()
1828
                          + " of system metadata changes due "
1829
                          + "to communication issues with the CN: "
1830
                          + e.getMessage());
1831
              
1832
              }
1833
          }
1834
      }
1835
  }
1836

    
1837
	@Override
1838
	public QueryEngineDescription getQueryEngineDescription(Session session,
1839
			String queryEngine) throws InvalidToken, ServiceFailure, NotAuthorized,
1840
			NotImplemented, NotFound {
1841
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1842

    
1843
	}
1844
	
1845
	@Override
1846
	public QueryEngineList listQueryEngines(Session session) throws InvalidToken,
1847
			ServiceFailure, NotAuthorized, NotImplemented {
1848
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1849

    
1850
	}
1851
	
1852
	@Override
1853
	public InputStream query(Session session, String queryEngine, String query)
1854
			throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
1855
			NotImplemented, NotFound {
1856
		throw new NotImplemented("0000", "CN query services are not implemented in Metacat.");
1857

    
1858
	}
1859
	
1860
	@Override
1861
	public Node getCapabilities() throws NotImplemented, ServiceFailure {
1862
		throw new NotImplemented("0000", "The CN capabilities are not stored in Metacat.");
1863
	}
1864
	
1865
	/**
1866
     * A method to notify the Coordinating Node that the authoritative copy of 
1867
     * system metadata on the Authoritative Member Node has changed.
1868
     * 
1869
     * @param session   Session information that contains the identity of the 
1870
     *                  calling user as retrieved from the X.509 certificate 
1871
     *                  which must be traceable to the CILogon service.
1872
     * @param serialVersion   The serialVersion of the system metadata
1873
     * @param dateSysMetaLastModified  The time stamp for when the system metadata was changed
1874
     * @throws NotImplemented
1875
     * @throws ServiceFailure
1876
     * @throws NotAuthorized
1877
     * @throws InvalidRequest
1878
     * @throws InvalidToken
1879
     */
1880
	@Override
1881
    public boolean systemMetadataChanged(Session session, Identifier pid,
1882
        long serialVersion, Date dateSysMetaLastModified) 
1883
        throws NotImplemented, ServiceFailure, NotAuthorized, InvalidRequest,
1884
        InvalidToken {
1885
        
1886
        // cannot be called by public
1887
        if (session == null) {
1888
        	throw new InvalidToken("2183", "No session was provided.");
1889
        }
1890

    
1891
        SystemMetadata currentLocalSysMeta = null;
1892
        SystemMetadata newSysMeta = null;
1893
        boolean allowed = false;
1894
        
1895
        // are we allowed to call this?
1896
        allowed = super.isAuthoritativeMNodeAdmin(session, pid);
1897
        
1898
        if (!allowed ) {
1899
            String msg = "The subject identified by " + session.getSubject().getValue() +
1900
              " is not authorized to call this service.";
1901
            throw new NotAuthorized("1331", msg);
1902
            
1903
        }
1904
        
1905
        // compare what we have locally to what is sent in the change notification
1906
        try {
1907
            currentLocalSysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1908
             
1909
        } catch (RuntimeException e) {
1910
            String msg = "SystemMetadata for pid " + pid.getValue() +
1911
              " couldn't be updated because it couldn't be found locally: " +
1912
              e.getMessage();
1913
            logMetacat.error(msg);
1914
            ServiceFailure sf = new ServiceFailure("1333", msg);
1915
            sf.initCause(e);
1916
            throw sf; 
1917
        }
1918
        
1919
        if (currentLocalSysMeta.getSerialVersion().longValue() < serialVersion ) {
1920
            try {
1921
            	
1922
                MNode mn = D1Client.getMN(currentLocalSysMeta.getAuthoritativeMemberNode());
1923
				newSysMeta = mn .getSystemMetadata(null, pid);
1924
            } catch (NotFound e) {
1925
                // huh? you just said you had it
1926
            	String msg = "On updating the local copy of system metadata " + 
1927
                "for pid " + pid.getValue() +", the AuthMN reports it is not found." +
1928
                " The error message was: " + e.getMessage();
1929
                logMetacat.error(msg);
1930
                ServiceFailure sf = new ServiceFailure("1333", msg);
1931
                sf.initCause(e);
1932
                throw sf;
1933
            }
1934
            
1935
            // update the local copy of system metadata for the pid
1936
            try {
1937
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1938
                logMetacat.info("Updated local copy of system metadata for pid " +
1939
                    pid.getValue() + " after change notification from the CN.");
1940
                
1941
                // TODO: consider inspecting the change for archive
1942
                // see: https://projects.ecoinformatics.org/ecoinfo/issues/6417
1943

    
1944
            } catch (RuntimeException e) {
1945
                String msg = "SystemMetadata for pid " + pid.getValue() +
1946
                  " couldn't be updated: " +
1947
                  e.getMessage();
1948
                logMetacat.error(msg);
1949
                ServiceFailure sf = new ServiceFailure("1333", msg);
1950
                sf.initCause(e);
1951
                throw sf;
1952
            }
1953
            
1954
            // submit for indexing
1955
            try {
1956
				MetacatSolrIndex.getInstance().submit(newSysMeta.getIdentifier(), newSysMeta, null, true);
1957
			} catch (Exception e) {
1958
                logMetacat.error("Could not submit changed systemMetadata for indexing, pid: " + newSysMeta.getIdentifier().getValue(), e);
1959
			}
1960
        }
1961
        
1962
        return true;
1963
        
1964
    }
1965
	
1966
}
(1-1/7)