Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.ArrayList;
29
import java.util.Calendar;
30
import java.util.Date;
31
import java.util.List;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.service.cn.v1.CNAuthorization;
40
import org.dataone.service.cn.v1.CNCore;
41
import org.dataone.service.cn.v1.CNRead;
42
import org.dataone.service.cn.v1.CNReplication;
43
import org.dataone.service.exceptions.BaseException;
44
import org.dataone.service.exceptions.IdentifierNotUnique;
45
import org.dataone.service.exceptions.InsufficientResources;
46
import org.dataone.service.exceptions.InvalidRequest;
47
import org.dataone.service.exceptions.InvalidSystemMetadata;
48
import org.dataone.service.exceptions.InvalidToken;
49
import org.dataone.service.exceptions.NotAuthorized;
50
import org.dataone.service.exceptions.NotFound;
51
import org.dataone.service.exceptions.NotImplemented;
52
import org.dataone.service.exceptions.ServiceFailure;
53
import org.dataone.service.exceptions.UnsupportedType;
54
import org.dataone.service.exceptions.VersionMismatch;
55
import org.dataone.service.types.v1.AccessPolicy;
56
import org.dataone.service.types.v1.Checksum;
57
import org.dataone.service.types.v1.ChecksumAlgorithmList;
58
import org.dataone.service.types.v1.Identifier;
59
import org.dataone.service.types.v1.Node;
60
import org.dataone.service.types.v1.NodeList;
61
import org.dataone.service.types.v1.NodeReference;
62
import org.dataone.service.types.v1.NodeType;
63
import org.dataone.service.types.v1.ObjectFormat;
64
import org.dataone.service.types.v1.ObjectFormatIdentifier;
65
import org.dataone.service.types.v1.ObjectFormatList;
66
import org.dataone.service.types.v1.ObjectList;
67
import org.dataone.service.types.v1.ObjectLocationList;
68
import org.dataone.service.types.v1.Permission;
69
import org.dataone.service.types.v1.Replica;
70
import org.dataone.service.types.v1.ReplicationPolicy;
71
import org.dataone.service.types.v1.ReplicationStatus;
72
import org.dataone.service.types.v1.Session;
73
import org.dataone.service.types.v1.Subject;
74
import org.dataone.service.types.v1.SystemMetadata;
75
import org.dataone.service.types.v1.util.ServiceMethodRestrictionUtil;
76

    
77
import edu.ucsb.nceas.metacat.EventLog;
78
import edu.ucsb.nceas.metacat.IdentifierManager;
79
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
80

    
81
/**
82
 * Represents Metacat's implementation of the DataONE Coordinating Node 
83
 * service API. Methods implement the various CN* interfaces, and methods common
84
 * to both Member Node and Coordinating Node interfaces are found in the
85
 * D1NodeService super class.
86
 *
87
 */
88
public class CNodeService extends D1NodeService implements CNAuthorization,
89
    CNCore, CNRead, CNReplication {
90

    
91
  /* the logger instance */
92
  private Logger logMetacat = null;
93

    
94
  /**
95
   * singleton accessor
96
   */
97
  public static CNodeService getInstance(HttpServletRequest request) { 
98
    return new CNodeService(request);
99
  }
100
  
101
  /**
102
   * Constructor, private for singleton access
103
   */
104
  private CNodeService(HttpServletRequest request) {
105
    super(request);
106
    logMetacat = Logger.getLogger(CNodeService.class);
107
        
108
  }
109
    
110
  /**
111
   * Set the replication policy for an object given the object identifier
112
   * 
113
   * @param session - the Session object containing the credentials for the Subject
114
   * @param pid - the object identifier for the given object
115
   * @param policy - the replication policy to be applied
116
   * 
117
   * @return true or false
118
   * 
119
   * @throws NotImplemented
120
   * @throws NotAuthorized
121
   * @throws ServiceFailure
122
   * @throws InvalidRequest
123
   * @throws VersionMismatch
124
   * 
125
   */
126
  @Override
127
  public boolean setReplicationPolicy(Session session, Identifier pid,
128
      ReplicationPolicy policy, long serialVersion) 
129
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
130
      InvalidRequest, InvalidToken, VersionMismatch {
131
      
132
      // The lock to be used for this identifier
133
      Lock lock = null;
134
      
135
      // get the subject
136
      Subject subject = session.getSubject();
137
      
138
      // are we allowed to do this?
139
      if (!isAdminAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
140
          if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
141
              throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION
142
                      + " not allowed by " + subject.getValue() + " on "
143
                      + pid.getValue());
144
              
145
          }
146
      }
147
      
148
      SystemMetadata systemMetadata = null;
149
      try {
150
          lock = HazelcastService.getInstance().getLock(pid.getValue());
151
          lock.lock();
152
          logMetacat.debug("Locked identifier " + pid.getValue());
153

    
154
          try {
155
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
156
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
157
                  
158
              }
159
              
160
              // did we get it correctly?
161
              if ( systemMetadata == null ) {
162
                  throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
163
                  
164
              }
165

    
166
              // does the request have the most current system metadata?
167
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
168
                 String msg = "The requested system metadata version number " + 
169
                     serialVersion + " differs from the current version at " +
170
                     systemMetadata.getSerialVersion().longValue() +
171
                     ". Please get the latest copy in order to modify it.";
172
                 throw new VersionMismatch("4886", msg);
173
                 
174
              }
175
              
176
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
177
              throw new NotFound("4884", "No record found for: " + pid.getValue());
178
            
179
          }
180
          
181
          // set the new policy
182
          systemMetadata.setReplicationPolicy(policy);
183
          
184
          // update the metadata
185
          try {
186
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
187
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
188
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
189
            
190
          } catch (RuntimeException e) {
191
              throw new ServiceFailure("4882", e.getMessage());
192
          
193
          }
194
          
195
      } catch (RuntimeException e) {
196
          throw new ServiceFailure("4882", e.getMessage());
197
          
198
      } finally {
199
          lock.unlock();
200
          logMetacat.debug("Unlocked identifier " + pid.getValue());
201
          
202
      }
203
    
204
      return true;
205
  }
206

    
207
  /**
208
   * Deletes the replica from the given Member Node
209
   * NOTE: MN.delete() may be an "archive" operation. TBD.
210
   * @param session
211
   * @param pid
212
   * @param nodeId
213
   * @param serialVersion
214
   * @return
215
   * @throws InvalidToken
216
   * @throws ServiceFailure
217
   * @throws NotAuthorized
218
   * @throws NotFound
219
   * @throws NotImplemented
220
   * @throws VersionMismatch
221
   */
222
  @Override
223
  public boolean deleteReplicationMetadata(Session session, Identifier pid, NodeReference nodeId, long serialVersion) 
224
  	throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, VersionMismatch {
225
	  
226
	  	// The lock to be used for this identifier
227
		Lock lock = null;
228

    
229
		// get the subject
230
		Subject subject = session.getSubject();
231

    
232
		// are we allowed to do this?
233
		if (!isAdminAuthorized(session, pid, Permission.WRITE)) {
234
			if (!isAuthorized(session, pid, Permission.WRITE)) {
235
				throw new NotAuthorized("4881", Permission.WRITE
236
						+ " not allowed by " + subject.getValue() + " on "
237
						+ pid.getValue());
238

    
239
			}
240
		}
241

    
242
		SystemMetadata systemMetadata = null;
243
		try {
244
			lock = HazelcastService.getInstance().getLock(pid.getValue());
245
			lock.lock();
246
			logMetacat.debug("Locked identifier " + pid.getValue());
247

    
248
			try {
249
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
250
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
251
				}
252

    
253
				// did we get it correctly?
254
				if (systemMetadata == null) {
255
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
256
				}
257

    
258
				// does the request have the most current system metadata?
259
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
260
					String msg = "The requested system metadata version number "
261
							+ serialVersion
262
							+ " differs from the current version at "
263
							+ systemMetadata.getSerialVersion().longValue()
264
							+ ". Please get the latest copy in order to modify it.";
265
					throw new VersionMismatch("4886", msg);
266

    
267
				}
268

    
269
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
270
				throw new NotFound("4884", "No record found for: " + pid.getValue());
271

    
272
			}
273
			  
274
			// check permissions
275
			// TODO: is this necessary?
276
			List<Node> nodeList = D1Client.getCN().listNodes().getNodeList();
277
			boolean isAllowed = ServiceMethodRestrictionUtil.isMethodAllowed(session.getSubject(), nodeList, "CNReplication", "deleteReplicationMetadata");
278
			if (isAllowed) {
279
				throw new NotAuthorized("4881", "Caller is not authorized to deleteReplicationMetadata");
280
			}
281
			  
282
			// delete the replica from the given node
283
			D1Client.getMN(nodeId).delete(session, pid);
284
			  
285
			// reflect that change in the system metadata
286
			List<Replica> updatedReplicas = new ArrayList<Replica>(systemMetadata.getReplicaList());
287
			for (Replica r: systemMetadata.getReplicaList()) {
288
				  if (r.getReplicaMemberNode().equals(nodeId)) {
289
					  updatedReplicas.remove(r);
290
					  break;
291
				  }
292
			}
293
			systemMetadata.setReplicaList(updatedReplicas);
294

    
295
			// update the metadata
296
			try {
297
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
298
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
299
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
300
			} catch (RuntimeException e) {
301
				throw new ServiceFailure("4882", e.getMessage());
302
			}
303

    
304
		} catch (RuntimeException e) {
305
			throw new ServiceFailure("4882", e.getMessage());
306
		} finally {
307
			lock.unlock();
308
			logMetacat.debug("Unlocked identifier " + pid.getValue());
309
		}
310

    
311
		return true;	  
312
	  
313
  }
314
  
315
  /**
316
   * Set the obsoletedBy attribute in System Metadata
317
   * @param session
318
   * @param pid
319
   * @param obsoletedByPid
320
   * @param serialVersion
321
   * @return
322
   * @throws NotImplemented
323
   * @throws NotFound
324
   * @throws NotAuthorized
325
   * @throws ServiceFailure
326
   * @throws InvalidRequest
327
   * @throws InvalidToken
328
   * @throws VersionMismatch
329
   */
330
  @Override
331
  public boolean setObsoletedBy(Session session, Identifier pid,
332
			Identifier obsoletedByPid, long serialVersion)
333
			throws NotImplemented, NotFound, NotAuthorized, ServiceFailure,
334
			InvalidRequest, InvalidToken, VersionMismatch {
335

    
336
		// The lock to be used for this identifier
337
		Lock lock = null;
338

    
339
		// get the subject
340
		Subject subject = session.getSubject();
341

    
342
		// are we allowed to do this?
343
		if (!isAdminAuthorized(session, pid, Permission.WRITE)) {
344
			if (!isAuthorized(session, pid, Permission.WRITE)) {
345
				throw new NotAuthorized("4881", Permission.WRITE
346
						+ " not allowed by " + subject.getValue() + " on "
347
						+ pid.getValue());
348

    
349
			}
350
		}
351

    
352
		SystemMetadata systemMetadata = null;
353
		try {
354
			lock = HazelcastService.getInstance().getLock(pid.getValue());
355
			lock.lock();
356
			logMetacat.debug("Locked identifier " + pid.getValue());
357

    
358
			try {
359
				if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
360
					systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
361
				}
362

    
363
				// did we get it correctly?
364
				if (systemMetadata == null) {
365
					throw new NotFound("4884", "Couldn't find an object identified by " + pid.getValue());
366
				}
367

    
368
				// does the request have the most current system metadata?
369
				if (systemMetadata.getSerialVersion().longValue() != serialVersion) {
370
					String msg = "The requested system metadata version number "
371
							+ serialVersion
372
							+ " differs from the current version at "
373
							+ systemMetadata.getSerialVersion().longValue()
374
							+ ". Please get the latest copy in order to modify it.";
375
					throw new VersionMismatch("4886", msg);
376

    
377
				}
378

    
379
			} catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
380
				throw new NotFound("4884", "No record found for: " + pid.getValue());
381

    
382
			}
383

    
384
			// set the new policy
385
			systemMetadata.setObsoletedBy(obsoletedByPid);
386

    
387
			// update the metadata
388
			try {
389
				systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
390
				systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
391
				HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
392
			} catch (RuntimeException e) {
393
				throw new ServiceFailure("4882", e.getMessage());
394
			}
395

    
396
		} catch (RuntimeException e) {
397
			throw new ServiceFailure("4882", e.getMessage());
398
		} finally {
399
			lock.unlock();
400
			logMetacat.debug("Unlocked identifier " + pid.getValue());
401
		}
402

    
403
		return true;
404
	}
405
  
406
  
407
  /**
408
   * Set the replication status for an object given the object identifier
409
   * 
410
   * @param session - the Session object containing the credentials for the Subject
411
   * @param pid - the object identifier for the given object
412
   * @param status - the replication status to be applied
413
   * 
414
   * @return true or false
415
   * 
416
   * @throws NotImplemented
417
   * @throws NotAuthorized
418
   * @throws ServiceFailure
419
   * @throws InvalidRequest
420
   * @throws InvalidToken
421
   * @throws NotFound
422
   * 
423
   */
424
  @Override
425
  public boolean setReplicationStatus(Session session, Identifier pid,
426
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
427
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
428
      InvalidRequest, NotFound {
429
      
430
      // The lock to be used for this identifier
431
      Lock lock = null;
432
      
433
      boolean allowed = false;
434
      int replicaEntryIndex = -1;
435
      List<Replica> replicas = null;
436
      // get the subject
437
      Subject subject = session.getSubject();
438
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
439
          " is " + status.toString());
440
      
441
      SystemMetadata systemMetadata = null;
442

    
443
      try {
444
          lock = HazelcastService.getInstance().getLock(pid.getValue());
445
          lock.lock();
446
          logMetacat.debug("Locked identifier " + pid.getValue());
447

    
448
          try {      
449
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
450

    
451
              // did we get it correctly?
452
              if ( systemMetadata == null ) {
453
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
454
                  throw new NotFound("4740", "Couldn't find an object identified by " + pid.getValue());
455
                  
456
              }
457
              replicas = systemMetadata.getReplicaList();
458
              int count = 0;
459
              
460
              // was there a failure? log it
461
              if ( failure != null && status == ReplicationStatus.FAILED ) {
462
                 String msg = "The replication request of the object identified by " + 
463
                     pid.getValue() + " failed.  The error message was " +
464
                     failure.getMessage() + ".";
465
              }
466
              
467
              if (replicas.size() > 0 && replicas != null) {
468
                  // find the target replica index in the replica list
469
                  for (Replica replica : replicas) {
470
                      String replicaNodeStr = replica.getReplicaMemberNode()
471
                              .getValue();
472
                      String targetNodeStr = targetNode.getValue();
473
                      logMetacat.debug("Comparing " + replicaNodeStr + " to "
474
                              + targetNodeStr);
475
                  
476
                      if (replicaNodeStr.equals(targetNodeStr)) {
477
                          replicaEntryIndex = count;
478
                          logMetacat.debug("replica entry index is: "
479
                                  + replicaEntryIndex);
480
                          break;
481
                      }
482
                      count++;
483
                  
484
                  }
485
              }
486
              // are we allowed to do this? only CNs and target MNs are allowed
487
              CNode cn = D1Client.getCN();
488
              List<Node> nodes = cn.listNodes().getNodeList();
489
              
490
              // find the node in the node list
491
              for ( Node node : nodes ) {
492
                  
493
                  NodeReference nodeReference = node.getIdentifier();
494
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
495
                  
496
                  // allow target MN certs and CN certs
497
                  if (targetNode.getValue().equals(nodeReference.getValue()) ||
498
                      node.getType() == NodeType.CN) {
499
                      List<Subject> nodeSubjects = node.getSubjectList();
500
                      
501
                      // check if the session subject is in the node subject list
502
                      for (Subject nodeSubject : nodeSubjects) {
503
                          if ( nodeSubject.equals(subject) ) {
504
                              allowed = true; // subject of session == target node subject
505
                              break;
506
                              
507
                          }
508
                      }                 
509
                  }
510
              }
511

    
512
              if ( !isAdminAuthorized(session, pid, Permission.WRITE) ) {
513
                  if (!allowed) {
514
                    String msg = "The subject identified by "
515
                            + subject.getValue()
516
                            + " does not have permission to set the replication status for "
517
                            + "the replica identified by "
518
                            + targetNode.getValue() + ".";
519
                    logMetacat.info(msg);
520
                    throw new NotAuthorized("4720", msg);
521
                }
522
                  
523
              }
524
              
525

    
526
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
527
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
528
                " : " + e.getMessage());
529
            
530
          }
531
          
532
          Replica targetReplica = new Replica();
533
          // set the status for the replica
534
          if ( replicaEntryIndex != -1 ) {
535
              targetReplica = replicas.get(replicaEntryIndex);
536
              targetReplica.setReplicationStatus(status);
537
              logMetacat.debug("Set the replication status for " + 
538
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
539
                  targetReplica.getReplicationStatus());
540
              
541
          } else {
542
              // this is a new entry, create it
543
              targetReplica.setReplicaMemberNode(targetNode);
544
              targetReplica.setReplicationStatus(status);
545
              targetReplica.setReplicaVerified(Calendar.getInstance().getTime());
546
              replicas.add(targetReplica);
547
              
548
          }
549
          
550
          systemMetadata.setReplicaList(replicas);
551
                
552
          // update the metadata
553
          try {
554
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
555
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
556
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
557
              
558
              if ( status == ReplicationStatus.FAILED && failure != null ) {
559
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
560
                      " on target node " + targetNode + ". The exception was: " +
561
                      failure.getMessage());
562
              }
563
          } catch (RuntimeException e) {
564
              throw new ServiceFailure("4700", e.getMessage());
565
          
566
          }
567
          
568
    } catch (RuntimeException e) {
569
        String msg = "There was a RuntimeException getting the lock for " +
570
            pid.getValue();
571
        logMetacat.info(msg);
572
        
573
    } finally {
574
        lock.unlock();
575
        logMetacat.debug("Unlocked identifier " + pid.getValue());
576
        
577
    }
578
      
579
      return true;
580
  }
581
  
582
  /**
583
   * Return the checksum of the object given the identifier 
584
   * 
585
   * @param session - the Session object containing the credentials for the Subject
586
   * @param pid - the object identifier for the given object
587
   * 
588
   * @return checksum - the checksum of the object
589
   * 
590
   * @throws InvalidToken
591
   * @throws ServiceFailure
592
   * @throws NotAuthorized
593
   * @throws NotFound
594
   * @throws NotImplemented
595
   */
596
  @Override
597
  public Checksum getChecksum(Session session, Identifier pid)
598
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
599
    NotImplemented {
600
            
601
    if (!isAuthorized(session, pid, Permission.READ)) {
602
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
603
    }
604
    
605
    SystemMetadata systemMetadata = null;
606
    Checksum checksum = null;
607
    
608
    try {
609
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);        
610

    
611
        if (systemMetadata == null ) {
612
            throw new NotFound("1420", "Couldn't find an object identified by " + pid.getValue());
613
        }
614
        checksum = systemMetadata.getChecksum();
615
        
616
    } catch (RuntimeException e) {
617
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
618
            pid.getValue() + ". The error message was: " + e.getMessage());
619
      
620
    }
621
    
622
    return checksum;
623
  }
624

    
625
  /**
626
   * Resolve the location of a given object
627
   * 
628
   * @param session - the Session object containing the credentials for the Subject
629
   * @param pid - the object identifier for the given object
630
   * 
631
   * @return objectLocationList - the list of nodes known to contain the object
632
   * 
633
   * @throws InvalidToken
634
   * @throws ServiceFailure
635
   * @throws NotAuthorized
636
   * @throws NotFound
637
   * @throws NotImplemented
638
   */
639
  @Override
640
  public ObjectLocationList resolve(Session session, Identifier pid)
641
    throws InvalidToken, ServiceFailure, NotAuthorized,
642
    NotFound, NotImplemented {
643

    
644
    throw new NotImplemented("4131", "resolve not implemented");
645

    
646
  }
647

    
648
  /**
649
   * Search the metadata catalog for identifiers that match the criteria
650
   * 
651
   * @param session - the Session object containing the credentials for the Subject
652
   * @param queryType - An identifier for the type of query expression 
653
   *                    provided in the query
654
   * @param query -  The criteria for matching the characteristics of the 
655
   *                 metadata objects of interest
656
   * 
657
   * @return objectList - the list of objects matching the criteria
658
   * 
659
   * @throws InvalidToken
660
   * @throws ServiceFailure
661
   * @throws NotAuthorized
662
   * @throws InvalidRequest
663
   * @throws NotImplemented
664
   */
665
  @Override
666
  public ObjectList search(Session session, String queryType, String query)
667
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
668
    NotImplemented {
669

    
670
    ObjectList objectList = null;
671
    try {
672
        objectList = 
673
          IdentifierManager.getInstance().querySystemMetadata(
674
              null, //startTime, 
675
              null, //endTime,
676
              null, //objectFormat, 
677
              false, //replicaStatus, 
678
              0, //start, 
679
              -1 //count
680
              );
681
        
682
    } catch (Exception e) {
683
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
684
    }
685

    
686
      return objectList;
687
      
688
    //throw new NotImplemented("4281", "search not implemented");
689
    
690
    // the code block below is from an older implementation
691
    
692
    /*  This block commented out because of the EcoGrid circular dependency.
693
         *  For now, query will not be supported until the circularity can be
694
         *  resolved, probably by moving the ecogrid query syntax transformers
695
         *  directly into the Metacat codebase.  MBJ 2010-02-03
696
         
697
        try {
698
            EcogridQueryParser parser = new EcogridQueryParser(request
699
                    .getReader());
700
            parser.parseXML();
701
            QueryType queryType = parser.getEcogridQuery();
702
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
703
                new EcogridJavaToMetacatJavaQueryTransformer();
704
            QuerySpecification metacatQuery = queryTransformer
705
                    .transform(queryType);
706

    
707
            DBQuery metacat = new DBQuery();
708

    
709
            boolean useXMLIndex = (new Boolean(PropertyService
710
                    .getProperty("database.usexmlindex"))).booleanValue();
711
            String xmlquery = "query"; // we don't care the query in resultset,
712
            // the query can be anything
713
            PrintWriter out = null; // we don't want metacat result, so set out null
714

    
715
            // parameter: queryspecification, user, group, usingIndexOrNot
716
            StringBuffer result = metacat.createResultDocument(xmlquery,
717
                    metacatQuery, out, username, groupNames, useXMLIndex);
718

    
719
            // create result set transfer       
720
            String saxparser = PropertyService.getProperty("xml.saxparser");
721
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
722
                    new StringReader(result.toString()), saxparser, queryType
723
                            .getNamespace().get_value());
724
            ResultsetType records = metacatResultsetParser.getEcogridResult();
725

    
726
            System.out
727
                    .println(EcogridResultsetTransformer.toXMLString(records));
728
            response.setContentType("text/xml");
729
            out = response.getWriter();
730
            out.print(EcogridResultsetTransformer.toXMLString(records));
731

    
732
        } catch (Exception e) {
733
            e.printStackTrace();
734
        }*/
735
    
736

    
737
  }
738
  
739
  /**
740
   * Returns the object format registered in the DataONE Object Format 
741
   * Vocabulary for the given format identifier
742
   * 
743
   * @param fmtid - the identifier of the format requested
744
   * 
745
   * @return objectFormat - the object format requested
746
   * 
747
   * @throws ServiceFailure
748
   * @throws NotFound
749
   * @throws InsufficientResources
750
   * @throws NotImplemented
751
   */
752
  @Override
753
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
754
    throws ServiceFailure, NotFound, NotImplemented {
755
     
756
      return ObjectFormatService.getInstance().getFormat(fmtid);
757
      
758
  }
759

    
760
  /**
761
   * Returns a list of all object formats registered in the DataONE Object 
762
   * Format Vocabulary
763
    * 
764
   * @return objectFormatList - The list of object formats registered in 
765
   *                            the DataONE Object Format Vocabulary
766
   * 
767
   * @throws ServiceFailure
768
   * @throws NotImplemented
769
   * @throws InsufficientResources
770
   */
771
  @Override
772
  public ObjectFormatList listFormats() 
773
    throws ServiceFailure, NotImplemented {
774

    
775
    return ObjectFormatService.getInstance().listFormats();
776
  }
777

    
778
  /**
779
   * Returns a list of nodes that have been registered with the DataONE infrastructure
780
    * 
781
   * @return nodeList - List of nodes from the registry
782
   * 
783
   * @throws ServiceFailure
784
   * @throws NotImplemented
785
   */
786
  @Override
787
  public NodeList listNodes() 
788
    throws NotImplemented, ServiceFailure {
789

    
790
    throw new NotImplemented("4800", "listNodes not implemented");
791
  }
792

    
793
  /**
794
   * Provides a mechanism for adding system metadata independently of its 
795
   * associated object, such as when adding system metadata for data objects.
796
    * 
797
   * @param session - the Session object containing the credentials for the Subject
798
   * @param pid - The identifier of the object to register the system metadata against
799
   * @param sysmeta - The system metadata to be registered
800
   * 
801
   * @return true if the registration succeeds
802
   * 
803
   * @throws NotImplemented
804
   * @throws NotAuthorized
805
   * @throws ServiceFailure
806
   * @throws InvalidRequest
807
   * @throws InvalidSystemMetadata
808
   */
809
  @Override
810
  public Identifier registerSystemMetadata(Session session, Identifier pid,
811
      SystemMetadata sysmeta) 
812
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
813
      InvalidSystemMetadata {
814

    
815
      // The lock to be used for this identifier
816
      Lock lock = null;
817

    
818
      // TODO: control who can call this?
819
      if (session == null) {
820
          //TODO: many of the thrown exceptions do not use the correct error codes
821
          //check these against the docs and correct them
822
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
823
                  "  If you are not logged in, please do so and retry the request.");
824
      }
825
      
826
      // verify that guid == SystemMetadata.getIdentifier()
827
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
828
          "|" + sysmeta.getIdentifier().getValue());
829
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
830
          throw new InvalidRequest("4863", 
831
              "The identifier in method call (" + pid.getValue() + 
832
              ") does not match identifier in system metadata (" +
833
              sysmeta.getIdentifier().getValue() + ").");
834
      }
835

    
836
      try {
837
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
838
          lock.lock();
839
          logMetacat.debug("Locked identifier " + pid.getValue());
840
          logMetacat.debug("Checking if identifier exists...");
841
          // Check that the identifier does not already exist
842
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
843
              throw new InvalidRequest("4863", 
844
                  "The identifier is already in use by an existing object.");
845
          
846
          }
847
          
848
          // insert the system metadata into the object store
849
          logMetacat.debug("Starting to insert SystemMetadata...");
850
          try {
851
              sysmeta.setSerialVersion(BigInteger.ONE);
852
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
853
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
854
              
855
          } catch (RuntimeException e) {
856
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
857
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
858
                  e.getClass() + ": " + e.getMessage());
859
              
860
          }
861
          
862
      } catch (RuntimeException e) {
863
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
864
                  e.getClass() + ": " + e.getMessage());
865
          
866
      }  finally {
867
          lock.unlock();
868
          logMetacat.debug("Unlocked identifier " + pid.getValue());
869
          
870
      }
871

    
872
      
873
      logMetacat.debug("Returning from registerSystemMetadata");
874
      EventLog.getInstance().log(request.getRemoteAddr(), 
875
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
876
          pid.getValue(), "registerSystemMetadata");
877
      return pid;
878
  }
879
  
880
  /**
881
   * Given an optional scope and format, reserves and returns an identifier 
882
   * within that scope and format that is unique and will not be 
883
   * used by any other sessions. 
884
    * 
885
   * @param session - the Session object containing the credentials for the Subject
886
   * @param pid - The identifier of the object to register the system metadata against
887
   * @param scope - An optional string to be used to qualify the scope of 
888
   *                the identifier namespace, which is applied differently 
889
   *                depending on the format requested. If scope is not 
890
   *                supplied, a default scope will be used.
891
   * @param format - The optional name of the identifier format to be used, 
892
   *                  drawn from a DataONE-specific vocabulary of identifier 
893
   *                 format names, including several common syntaxes such 
894
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
895
   *                 format is not supplied by the caller, the CN service 
896
   *                 will use a default identifier format, which may change 
897
   *                 over time.
898
   * 
899
   * @return true if the registration succeeds
900
   * 
901
   * @throws InvalidToken
902
   * @throws ServiceFailure
903
   * @throws NotAuthorized
904
   * @throws IdentifierNotUnique
905
   * @throws NotImplemented
906
   */
907
  @Override
908
  public Identifier reserveIdentifier(Session session, Identifier pid)
909
  throws InvalidToken, ServiceFailure,
910
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
911

    
912
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
913
  }
914
  
915
  @Override
916
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
917
  throws InvalidToken, ServiceFailure,
918
        NotAuthorized, NotImplemented, InvalidRequest {
919
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
920
  }
921
  
922
  /**
923
    * Checks whether the pid is reserved by the subject in the session param
924
    * If the reservation is held on the pid by the subject, we return true.
925
    * 
926
   * @param session - the Session object containing the Subject
927
   * @param pid - The identifier to check
928
   * 
929
   * @return true if the reservation exists for the subject/pid
930
   * 
931
   * @throws InvalidToken
932
   * @throws ServiceFailure
933
   * @throws NotFound - when the pid is not found (in use or in reservation)
934
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
935
   * @throws IdentifierNotUnique - when the pid is in use
936
   * @throws NotImplemented
937
   */
938

    
939
  @Override
940
  public boolean hasReservation(Session session, Identifier pid) 
941
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
942
      NotImplemented, InvalidRequest {
943
  
944
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
945
  }
946

    
947
  /**
948
   * Changes ownership (RightsHolder) of the specified object to the 
949
   * subject specified by userId
950
    * 
951
   * @param session - the Session object containing the credentials for the Subject
952
   * @param pid - Identifier of the object to be modified
953
   * @param userId - The subject that will be taking ownership of the specified object.
954
   *
955
   * @return pid - the identifier of the modified object
956
   * 
957
   * @throws ServiceFailure
958
   * @throws InvalidToken
959
   * @throws NotFound
960
   * @throws NotAuthorized
961
   * @throws NotImplemented
962
   * @throws InvalidRequest
963
   */  
964
  @Override
965
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
966
      long serialVersion)
967
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
968
      NotImplemented, InvalidRequest, VersionMismatch {
969
      
970
      // The lock to be used for this identifier
971
      Lock lock = null;
972

    
973
      // get the subject
974
      Subject subject = session.getSubject();
975
      
976
      // are we allowed to do this?
977
      if (!isAdminAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
978
          if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
979
              throw new NotAuthorized("4440", "not allowed by "
980
                      + subject.getValue() + " on " + pid.getValue());
981
              
982
          }
983
      }
984
      
985
      SystemMetadata systemMetadata = null;
986
      try {
987
          lock = HazelcastService.getInstance().getLock(pid.getValue());
988
          logMetacat.debug("Locked identifier " + pid.getValue());
989

    
990
          try {
991
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
992
              
993
              // does the request have the most current system metadata?
994
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
995
                 String msg = "The requested system metadata version number " + 
996
                     serialVersion + " differs from the current version at " +
997
                     systemMetadata.getSerialVersion().longValue() +
998
                     ". Please get the latest copy in order to modify it.";
999
                 throw new VersionMismatch("4443", msg);
1000
              }
1001
              
1002
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1003
              throw new NotFound("4460", "No record found for: " + pid.getValue());
1004
              
1005
          }
1006
              
1007
          // set the new rights holder
1008
          systemMetadata.setRightsHolder(userId);
1009
          
1010
          // update the metadata
1011
          try {
1012
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1013
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1014
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
1015
              
1016
          } catch (RuntimeException e) {
1017
              throw new ServiceFailure("4490", e.getMessage());
1018
          
1019
          }
1020
          
1021
      } catch (RuntimeException e) {
1022
          throw new ServiceFailure("4490", e.getMessage());
1023
          
1024
      } finally {
1025
          lock.unlock();
1026
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1027
      
1028
      }
1029
      
1030
      return pid;
1031
  }
1032

    
1033
  /**
1034
   * Verify that a replication task is authorized by comparing the target node's
1035
   * Subject (from the X.509 certificate-derived Session) with the list of 
1036
   * subjects in the known, pending replication tasks map.
1037
   * 
1038
   * @param originatingNodeSession - Session information that contains the 
1039
   *                                 identity of the calling user
1040
   * @param targetNodeSubject - Subject identifying the target node
1041
   * @param pid - the identifier of the object to be replicated
1042
   * @param replicatePermission - the execute permission to be granted
1043
   * 
1044
   * @throws ServiceFailure
1045
   * @throws NotImplemented
1046
   * @throws InvalidToken
1047
   * @throws NotAuthorized
1048
   * @throws InvalidRequest
1049
   * @throws NotFound
1050
   */
1051
  @Override
1052
  public boolean isNodeAuthorized(Session originatingNodeSession, 
1053
    Subject targetNodeSubject, Identifier pid) 
1054
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
1055
    NotFound, InvalidRequest {
1056
    
1057
    boolean isAllowed = false;
1058
    SystemMetadata sysmeta = null;
1059
    NodeReference targetNode = null;
1060
    
1061
    try {
1062
      // get the target node reference from the nodes list
1063
      CNode cn = D1Client.getCN();
1064
      List<Node> nodes = cn.listNodes().getNodeList();
1065
      
1066
      if ( nodes != null ) {
1067
        for (Node node : nodes) {
1068
            
1069
            for (Subject nodeSubject : node.getSubjectList()) {
1070
                
1071
                if ( nodeSubject.equals(targetNodeSubject) ) {
1072
                    targetNode = node.getIdentifier();
1073
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
1074
                    break;
1075
                }
1076
            }
1077
            
1078
            if ( targetNode != null) { break; }
1079
        }
1080
        
1081
      } else {
1082
          String msg = "Couldn't get the node list from the CN";
1083
          logMetacat.debug(msg);
1084
          throw new ServiceFailure("4872", msg);
1085
          
1086
      }
1087
      
1088
      // can't find a node listed with the given subject
1089
      if ( targetNode == null ) {
1090
          String msg = "There is no Member Node registered with a node subject " +
1091
              "matching " + targetNodeSubject.getValue();
1092
          logMetacat.info(msg);
1093
          throw new ServiceFailure("4872", msg);
1094
          
1095
      }
1096
      
1097
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
1098
      
1099
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1100

    
1101
      if ( sysmeta != null ) {
1102
          
1103
          List<Replica> replicaList = sysmeta.getReplicaList();
1104
          
1105
          if ( replicaList != null ) {
1106
              
1107
              // find the replica with the status set to 'requested'
1108
              for (Replica replica : replicaList) {
1109
                  ReplicationStatus status = replica.getReplicationStatus();
1110
                  NodeReference listedNode = replica.getReplicaMemberNode();
1111
                  if ( listedNode != null && targetNode != null ) {
1112
                      logMetacat.debug("Comparing " + listedNode.getValue()
1113
                              + " to " + targetNode.getValue());
1114
                      
1115
                      if (listedNode.getValue().equals(targetNode.getValue())
1116
                              && status.equals(ReplicationStatus.REQUESTED)) {
1117
                          isAllowed = true;
1118
                          break;
1119

    
1120
                      }
1121
                  }
1122
              }
1123
          }
1124
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
1125
              "to replicate: " + isAllowed + " for " + pid.getValue());
1126

    
1127
          
1128
      } else {
1129
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
1130
          " is null.");          
1131
          throw new NotFound("4874", "Couldn't find an object identified by " + pid.getValue());
1132
          
1133
      }
1134

    
1135
    } catch (RuntimeException e) {
1136
    	  ServiceFailure sf = new ServiceFailure("4872", 
1137
                "Runtime Exception: Couldn't determine if node is allowed: " + 
1138
                e.getCause().getMessage());
1139
    	  sf.initCause(e);
1140
        throw sf;
1141
        
1142
    }
1143
      
1144
    return isAllowed;
1145
    
1146
  }
1147

    
1148
  /**
1149
   * Adds a new object to the Node, where the object is a science metadata object.
1150
   * 
1151
   * @param session - the Session object containing the credentials for the Subject
1152
   * @param pid - The object identifier to be created
1153
   * @param object - the object bytes
1154
   * @param sysmeta - the system metadata that describes the object  
1155
   * 
1156
   * @return pid - the object identifier created
1157
   * 
1158
   * @throws InvalidToken
1159
   * @throws ServiceFailure
1160
   * @throws NotAuthorized
1161
   * @throws IdentifierNotUnique
1162
   * @throws UnsupportedType
1163
   * @throws InsufficientResources
1164
   * @throws InvalidSystemMetadata
1165
   * @throws NotImplemented
1166
   * @throws InvalidRequest
1167
   */
1168
  public Identifier create(Session session, Identifier pid, InputStream object,
1169
    SystemMetadata sysmeta) 
1170
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1171
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1172
    NotImplemented, InvalidRequest {
1173
      
1174
      
1175
      // The lock to be used for this identifier
1176
      Lock lock = null;
1177
      
1178
      try {
1179
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1180
          // are we allowed?
1181
          boolean isAllowed = false;
1182
          CNode cn = D1Client.getCN();
1183
          NodeList nodeList = cn.listNodes();
1184
          
1185
          for (Node node : nodeList.getNodeList()) {
1186
              if ( node.getType().equals(NodeType.CN) ) {
1187
                  
1188
                  List<Subject> subjects = node.getSubjectList();
1189
                  for (Subject subject : subjects) {
1190
                     if (subject.equals(session.getSubject())) {
1191
                         isAllowed = true;
1192
                         break;
1193
                     }
1194
                  }
1195
              }
1196
          }
1197

    
1198
          // proceed if we're called by a CN
1199
          if ( isAllowed ) {
1200
              // create the coordinating node version of the document      
1201
              lock.lock();
1202
              logMetacat.debug("Locked identifier " + pid.getValue());
1203
              sysmeta.setSerialVersion(BigInteger.ONE);
1204
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1205
              pid = super.create(session, pid, object, sysmeta);
1206

    
1207
          } else {
1208
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1209
                  " isn't allowed to call create() on a Coordinating Node.";
1210
              logMetacat.info(msg);
1211
              throw new NotAuthorized("1100", msg);
1212
          }
1213
          
1214
      } catch (RuntimeException e) {
1215
          // Convert Hazelcast runtime exceptions to service failures
1216
          String msg = "There was a problem creating the object identified by " +
1217
              pid.getValue() + ". There error message was: " + e.getMessage();
1218
          throw new ServiceFailure("4893", msg);
1219
          
1220
      } finally {
1221
    	  if (lock != null) {
1222
	          lock.unlock();
1223
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1224
    	  }
1225
      }
1226
      
1227
      return pid;
1228

    
1229
  }
1230

    
1231
  /**
1232
   * Set access for a given object using the object identifier and a Subject
1233
   * under a given Session.
1234
   * 
1235
   * @param session - the Session object containing the credentials for the Subject
1236
   * @param pid - the object identifier for the given object to apply the policy
1237
   * @param policy - the access policy to be applied
1238
   * 
1239
   * @return true if the application of the policy succeeds
1240
   * @throws InvalidToken
1241
   * @throws ServiceFailure
1242
   * @throws NotFound
1243
   * @throws NotAuthorized
1244
   * @throws NotImplemented
1245
   * @throws InvalidRequest
1246
   */
1247
  public boolean setAccessPolicy(Session session, Identifier pid, 
1248
      AccessPolicy accessPolicy, long serialVersion) 
1249
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1250
      NotImplemented, InvalidRequest, VersionMismatch {
1251
      
1252
      // The lock to be used for this identifier
1253
      Lock lock = null;
1254
      SystemMetadata systemMetadata = null;
1255
      
1256
      boolean success = false;
1257
      
1258
      // get the subject
1259
      Subject subject = session.getSubject();
1260
      
1261
      if (!isAdminAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1262
          // are we allowed to do this?
1263
          if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1264
              throw new NotAuthorized("4420", "not allowed by "
1265
                      + subject.getValue() + " on " + pid.getValue());
1266
          }
1267
      }
1268
      
1269
      try {
1270
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1271
          lock.lock();
1272
          logMetacat.debug("Locked identifier " + pid.getValue());
1273

    
1274
          try {
1275
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1276

    
1277
              if ( systemMetadata == null ) {
1278
                  throw new NotFound("4400", "Couldn't find an object identified by " + pid.getValue());
1279
                  
1280
              }
1281
              // does the request have the most current system metadata?
1282
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1283
                 String msg = "The requested system metadata version number " + 
1284
                     serialVersion + " differs from the current version at " +
1285
                     systemMetadata.getSerialVersion().longValue() +
1286
                     ". Please get the latest copy in order to modify it.";
1287
                 throw new VersionMismatch("4402", msg);
1288
                 
1289
              }
1290
              
1291
          } catch (RuntimeException e) {
1292
              // convert Hazelcast RuntimeException to NotFound
1293
              throw new NotFound("4400", "No record found for: " + pid);
1294
            
1295
          }
1296
              
1297
          // set the access policy
1298
          systemMetadata.setAccessPolicy(accessPolicy);
1299
          
1300
          // update the system metadata
1301
          try {
1302
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1303
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1304
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1305
            
1306
          } catch (RuntimeException e) {
1307
              // convert Hazelcast RuntimeException to ServiceFailure
1308
              throw new ServiceFailure("4430", e.getMessage());
1309
            
1310
          }
1311
          
1312
      } catch (RuntimeException e) {
1313
          throw new ServiceFailure("4430", e.getMessage());
1314
          
1315
      } finally {
1316
          lock.unlock();
1317
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1318
        
1319
      }
1320

    
1321
    
1322
    // TODO: how do we know if the map was persisted?
1323
    success = true;
1324
    
1325
    return success;
1326
  }
1327

    
1328
  /**
1329
   * Full replacement of replication metadata in the system metadata for the 
1330
   * specified object, changes date system metadata modified
1331
   * 
1332
   * @param session - the Session object containing the credentials for the Subject
1333
   * @param pid - the object identifier for the given object to apply the policy
1334
   * @param replica - the replica to be updated
1335
   * @return
1336
   * @throws NotImplemented
1337
   * @throws NotAuthorized
1338
   * @throws ServiceFailure
1339
   * @throws InvalidRequest
1340
   * @throws NotFound
1341
   * @throws VersionMismatch
1342
   */
1343
  @Override
1344
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1345
      Replica replica, long serialVersion) 
1346
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1347
      NotFound, VersionMismatch {
1348
      
1349
      // The lock to be used for this identifier
1350
      Lock lock = null;
1351
      
1352
      // get the subject
1353
      Subject subject = session.getSubject();
1354
      
1355
      // are we allowed to do this?
1356
      try {
1357
        if (!isAdminAuthorized(session, pid, Permission.WRITE)) {
1358
            // what is the controlling permission?
1359
            if (!isAuthorized(session, pid, Permission.WRITE)) {
1360
                throw new NotAuthorized("4851", "not allowed by "
1361
                        + subject.getValue() + " on " + pid.getValue());
1362
            }
1363
        }
1364
        
1365
      } catch (InvalidToken e) {
1366
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1367
                  " on " + pid.getValue());  
1368
          
1369
      }
1370

    
1371
      SystemMetadata systemMetadata = null;
1372
      try {
1373
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1374
          lock.lock();
1375
          logMetacat.debug("Locked identifier " + pid.getValue());
1376

    
1377
          try {      
1378
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1379

    
1380
              // does the request have the most current system metadata?
1381
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1382
                 String msg = "The requested system metadata version number " + 
1383
                     serialVersion + " differs from the current version at " +
1384
                     systemMetadata.getSerialVersion().longValue() +
1385
                     ". Please get the latest copy in order to modify it.";
1386
                 throw new VersionMismatch("4855", msg);
1387
              }
1388
              
1389
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
1390
              throw new NotFound("4854", "No record found for: " + pid.getValue() +
1391
                  " : " + e.getMessage());
1392
            
1393
          }
1394
              
1395
          // set the status for the replica
1396
          List<Replica> replicas = systemMetadata.getReplicaList();
1397
          NodeReference replicaNode = replica.getReplicaMemberNode();
1398
          int index = 0;
1399
          for (Replica listedReplica: replicas) {
1400
              
1401
              // remove the replica that we are replacing
1402
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1403
                  replicas.remove(index);
1404
                  break;
1405
                  
1406
              }
1407
              index++;
1408
          }
1409
          
1410
          // add the new replica item
1411
          replicas.add(replica);
1412
          systemMetadata.setReplicaList(replicas);
1413
          
1414
          // update the metadata
1415
          try {
1416
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1417
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1418
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1419
            
1420
          } catch (RuntimeException e) {
1421
              logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1422
              throw new ServiceFailure("4852", e.getMessage());
1423
          
1424
          }
1425
          
1426
      } catch (RuntimeException e) {
1427
          logMetacat.info("Unknown RuntimeException thrown: " + e.getCause().getMessage());
1428
          throw new ServiceFailure("4852", e.getMessage());
1429
      
1430
      } finally {
1431
          lock.unlock();
1432
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1433
          
1434
      }
1435
    
1436
      return true;
1437
      
1438
  }
1439
  
1440
  /**
1441
   * 
1442
   */
1443
  @Override
1444
  public ObjectList listObjects(Session session, Date startTime, 
1445
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1446
      Integer start, Integer count)
1447
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1448
      ServiceFailure {
1449
      
1450
      ObjectList objectList = null;
1451
        try {
1452
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1453
        } catch (Exception e) {
1454
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1455
        }
1456

    
1457
        return objectList;
1458
  }
1459

    
1460
	/**
1461
	 * 
1462
	 */
1463
  @Override
1464
	public ChecksumAlgorithmList listChecksumAlgorithms()
1465
			throws ServiceFailure, NotImplemented {
1466
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1467
		cal.addAlgorithm("MD5");
1468
		cal.addAlgorithm("SHA-1");
1469
		return null;
1470
	}
1471
    
1472
}
(1-1/5)