Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.Calendar;
29
import java.util.Date;
30
import java.util.List;
31
import java.util.Set;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.service.cn.v1.CNAuthorization;
40
import org.dataone.service.cn.v1.CNCore;
41
import org.dataone.service.cn.v1.CNRead;
42
import org.dataone.service.cn.v1.CNReplication;
43
import org.dataone.service.exceptions.BaseException;
44
import org.dataone.service.exceptions.IdentifierNotUnique;
45
import org.dataone.service.exceptions.InsufficientResources;
46
import org.dataone.service.exceptions.InvalidRequest;
47
import org.dataone.service.exceptions.InvalidSystemMetadata;
48
import org.dataone.service.exceptions.InvalidToken;
49
import org.dataone.service.exceptions.NotAuthorized;
50
import org.dataone.service.exceptions.NotFound;
51
import org.dataone.service.exceptions.NotImplemented;
52
import org.dataone.service.exceptions.ServiceFailure;
53
import org.dataone.service.exceptions.UnsupportedType;
54
import org.dataone.service.types.v1.AccessPolicy;
55
import org.dataone.service.types.v1.Checksum;
56
import org.dataone.service.types.v1.ChecksumAlgorithmList;
57
import org.dataone.service.types.v1.Identifier;
58
import org.dataone.service.types.v1.Node;
59
import org.dataone.service.types.v1.NodeList;
60
import org.dataone.service.types.v1.NodeReference;
61
import org.dataone.service.types.v1.NodeType;
62
import org.dataone.service.types.v1.ObjectFormat;
63
import org.dataone.service.types.v1.ObjectFormatIdentifier;
64
import org.dataone.service.types.v1.ObjectFormatList;
65
import org.dataone.service.types.v1.ObjectList;
66
import org.dataone.service.types.v1.ObjectLocationList;
67
import org.dataone.service.types.v1.Permission;
68
import org.dataone.service.types.v1.Replica;
69
import org.dataone.service.types.v1.ReplicationPolicy;
70
import org.dataone.service.types.v1.ReplicationStatus;
71
import org.dataone.service.types.v1.Session;
72
import org.dataone.service.types.v1.Subject;
73
import org.dataone.service.types.v1.SystemMetadata;
74

    
75
import edu.ucsb.nceas.metacat.EventLog;
76
import edu.ucsb.nceas.metacat.IdentifierManager;
77
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
78

    
79
/**
80
 * Represents Metacat's implementation of the DataONE Coordinating Node 
81
 * service API. Methods implement the various CN* interfaces, and methods common
82
 * to both Member Node and Coordinating Node interfaces are found in the
83
 * D1NodeService super class.
84
 *
85
 */
86
public class CNodeService extends D1NodeService implements CNAuthorization,
87
    CNCore, CNRead, CNReplication {
88

    
89
  /* the logger instance */
90
  private Logger logMetacat = null;
91

    
92
  /**
93
   * singleton accessor
94
   */
95
  public static CNodeService getInstance(HttpServletRequest request) { 
96
    return new CNodeService(request);
97
  }
98
  
99
  /**
100
   * Constructor, private for singleton access
101
   */
102
  private CNodeService(HttpServletRequest request) {
103
    super(request);
104
    logMetacat = Logger.getLogger(CNodeService.class);
105
        
106
  }
107
    
108
  /**
109
   * Set the replication policy for an object given the object identifier
110
   * 
111
   * @param session - the Session object containing the credentials for the Subject
112
   * @param pid - the object identifier for the given object
113
   * @param policy - the replication policy to be applied
114
   * 
115
   * @return true or false
116
   * 
117
   * @throws NotImplemented
118
   * @throws NotAuthorized
119
   * @throws ServiceFailure
120
   * @throws InvalidRequest
121
   * 
122
   */
123
  @Override
124
  public boolean setReplicationPolicy(Session session, Identifier pid,
125
      ReplicationPolicy policy, long serialVersion) 
126
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
127
      InvalidRequest, InvalidToken {
128
      
129
      // The lock to be used for this identifier
130
      Lock lock = null;
131
      
132
      // get the subject
133
      Subject subject = session.getSubject();
134
      
135
      // are we allowed to do this?
136
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
137
        throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION + 
138
            " not allowed by " + subject.getValue() + " on " + pid.getValue());  
139
      }
140
      
141
      SystemMetadata systemMetadata = null;
142
      try {
143
          lock = HazelcastService.getInstance().getLock(pid.getValue());
144
          lock.lock();
145

    
146
          try {
147
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
148
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
149
                  
150
              }
151
            
152

    
153
              // does the request have the most current system metadata?
154
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
155
                 String msg = "The requested system metadata version number " + 
156
                     serialVersion + " differs from the current version at " +
157
                     systemMetadata.getSerialVersion().longValue() +
158
                     ". Please get the latest copy in order to modify it.";
159
                 throw new InvalidRequest("4883", msg);
160
              }
161
              
162
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
163
              throw new NotFound("4884", "No record found for: " + pid.getValue());
164
            
165
          }
166
          
167
          // set the new policy
168
          systemMetadata.setReplicationPolicy(policy);
169
          
170
          // update the metadata
171
          try {
172
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
173
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
174
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
175
            
176
          } catch (Exception e) {
177
              throw new ServiceFailure("4882", e.getMessage());
178
          
179
          }
180
          
181
    } catch (Exception e) {
182
        throw new ServiceFailure("4882", e.getMessage());
183
        
184
    } finally {
185
        lock.unlock();
186
        logMetacat.debug("Unlocked identifier " + pid.getValue());
187
        
188
    }
189
    
190
      return true;
191
  }
192

    
193
  /**
194
   * Set the replication status for an object given the object identifier
195
   * 
196
   * @param session - the Session object containing the credentials for the Subject
197
   * @param pid - the object identifier for the given object
198
   * @param status - the replication status to be applied
199
   * 
200
   * @return true or false
201
   * 
202
   * @throws NotImplemented
203
   * @throws NotAuthorized
204
   * @throws ServiceFailure
205
   * @throws InvalidRequest
206
   * @throws InvalidToken
207
   * @throws NotFound
208
   * 
209
   */
210
  @Override
211
  public boolean setReplicationStatus(Session session, Identifier pid,
212
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
213
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
214
      InvalidRequest, NotFound {
215
      
216
      // The lock to be used for this identifier
217
      Lock lock = null;
218
      
219
      boolean allowed = false;
220
      int replicaEntryIndex = -1;
221
      List<Replica> replicas = null;
222
      // get the subject
223
      Subject subject = session.getSubject();
224
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
225
          " is " + status.toString());
226
      
227
      SystemMetadata systemMetadata = null;
228

    
229
      try {
230
          lock = HazelcastService.getInstance().getLock(pid.getValue());
231
          lock.lock();
232
          try {      
233
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
234

    
235
              if ( systemMetadata == null ) {
236
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
237
                  
238
              }
239
              replicas = systemMetadata.getReplicaList();
240
              int count = 0;
241
              
242
              if ( replicas == null || replicas.size() < 1 ) {
243
                  logMetacat.debug("no replicas to evaluate");
244
                  throw new InvalidRequest("4730", "There are no replicas to update.");
245
                  
246
              }
247

    
248
              // find the target replica index in the replica list
249
              for (Replica replica: replicas) {
250
                  String replicaNodeStr = replica.getReplicaMemberNode().getValue();
251
                  String targetNodeStr = targetNode.getValue();
252
                  logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
253
                  
254
                  if (replicaNodeStr.equals(targetNodeStr)) {
255
                      replicaEntryIndex = count;
256
                      logMetacat.debug("replica entry index is: " + replicaEntryIndex);
257
                      break;
258
                  }
259
                  count++;
260
                  
261
              }
262

    
263
              // are we allowed to do this? only CNs and target MNs are allowed
264
              CNode cn = D1Client.getCN();
265
              List<Node> nodes = cn.listNodes().getNodeList();
266
              
267
              // find the node in the node list
268
              for ( Node node : nodes ) {
269
                  
270
                  NodeReference nodeReference = node.getIdentifier();
271
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
272
                  
273
                  // allow target MN certs and CN certs
274
                  if (targetNode.getValue().equals(nodeReference.getValue()) ||
275
                      node.getType() == NodeType.CN) {
276
                      List<Subject> nodeSubjects = node.getSubjectList();
277
                      
278
                      // check if the session subject is in the node subject list
279
                      for (Subject nodeSubject : nodeSubjects) {
280
                          if ( nodeSubject.equals(subject) ) {
281
                              allowed = true; // subject of session == target node subject
282
                              break;
283
                              
284
                          }
285
                      }                 
286
                  }
287
              }
288

    
289
              if ( !allowed ) {
290
                  String msg = "The subject identified by " + subject.getValue() +
291
                    " does not have permission to set the replication status for " +
292
                    "the replica identified by " + targetNode.getValue() + ".";
293
                  logMetacat.info(msg);
294
                  throw new NotAuthorized("4720", msg);
295
                  
296
              }
297
              
298
              // was there a failure? log it
299
              if ( failure != null && status == ReplicationStatus.FAILED ) {
300
                 String msg = "The replication request of the object identified by " + 
301
                     pid.getValue() + " failed.  The error message was " +
302
                     failure.getMessage() + ".";
303
              }
304
              
305
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
306
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
307
                " : " + e.getMessage());
308
            
309
          }
310
          
311
          // set the status for the replica
312
          if ( replicaEntryIndex != -1 ) {
313
              Replica targetReplica = replicas.get(replicaEntryIndex);
314
              targetReplica.setReplicationStatus(status);
315
              logMetacat.debug("Set the replication status for " + 
316
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
317
                  targetReplica.getReplicationStatus());
318
              
319
          } else {
320
              throw new InvalidRequest("4730", "There are no replicas to update.");
321

    
322
          }
323
          
324
          systemMetadata.setReplicaList(replicas);
325
                
326
          // update the metadata
327
          try {
328
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
329
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
330
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
331
              
332
              if ( status == ReplicationStatus.FAILED && failure != null ) {
333
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
334
                      " on target node " + targetNode + ". The exception was: " +
335
                      failure.getMessage());
336
              }
337
          } catch (Exception e) {
338
              throw new ServiceFailure("4700", e.getMessage());
339
          
340
          }
341
          
342
    } catch (Exception e) {
343
        lock.unlock();
344
        logMetacat.debug("Unlocked identifier " + pid.getValue());
345
        
346
    }
347
      
348
      return true;
349
  }
350

    
351
  /**
352
   * Test that the specified relationship between pidOfSubject and pidOfObject exists
353
   * 
354
   * @param session - the Session object containing the credentials for the Subject
355
   * @param node - the node information for the given node be modified
356
   * 
357
   * @return true if the relationship exists
358
   * 
359
   * @throws InvalidToken
360
   * @throws ServiceFailure
361
   * @throws NotAuthorized
362
   * @throws NotFound
363
   * @throws InvalidRequest
364
   * @throws NotImplemented
365
   */
366
  @Override
367
  public boolean assertRelation(Session session, Identifier pidOfSubject, 
368
    String relationship, Identifier pidOfObject) 
369
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
370
    InvalidRequest, NotImplemented {
371
    
372
    // The lock to be used for thyis identifier
373
    Lock lock = null;
374

    
375
    boolean asserted = false;
376
        
377
    // are we allowed to do this?
378
    if (!isAuthorized(session, pidOfSubject, Permission.READ)) {
379
      throw new NotAuthorized("4881", Permission.READ + " not allowed on " + pidOfSubject.getValue());  
380
    }
381
    
382
    SystemMetadata systemMetadata = null;
383
    try {
384
        lock = HazelcastService.getInstance().getLock(pidOfSubject.getValue());
385
        lock.lock();
386
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pidOfSubject);
387
        
388
        
389
        // check relationships
390
        // TODO: use ORE map
391
        if (relationship.equalsIgnoreCase("describes")) {
392
          
393
        }
394
        
395
        if (relationship.equalsIgnoreCase("describedBy")) {
396
          
397
        }
398
        
399
        if (relationship.equalsIgnoreCase("derivedFrom")) {
400
          
401
        }
402
        
403
        if (relationship.equalsIgnoreCase("obsoletes")) {
404
            Identifier pid = systemMetadata.getObsoletes();
405
            if (pid.getValue().equals(pidOfObject.getValue())) {
406
              asserted = true;
407
            
408
        }
409
          //return systemMetadata.getObsoleteList().contains(pidOfObject);
410
        }
411
        if (relationship.equalsIgnoreCase("obsoletedBy")) {
412
            Identifier pid = systemMetadata.getObsoletedBy();
413
            if (pid.getValue().equals(pidOfObject.getValue())) {
414
              asserted = true;
415
            
416
        }
417
          //return systemMetadata.getObsoletedByList().contains(pidOfObject);
418
        }
419
              
420
    } catch (Exception e) {
421
        throw new ServiceFailure("4270", "Could not assert relation for: " + 
422
            pidOfSubject.getValue() +
423
            ". The error message was: " + e.getMessage());
424
      
425
    } finally {
426
        lock.unlock();
427
        logMetacat.debug("Unlocked identifier " + pidOfSubject.getValue());
428

    
429
    }
430
        
431
    return asserted;
432
  }
433
  
434
  /**
435
   * Return the checksum of the object given the identifier 
436
   * 
437
   * @param session - the Session object containing the credentials for the Subject
438
   * @param pid - the object identifier for the given object
439
   * 
440
   * @return checksum - the checksum of the object
441
   * 
442
   * @throws InvalidToken
443
   * @throws ServiceFailure
444
   * @throws NotAuthorized
445
   * @throws NotFound
446
   * @throws NotImplemented
447
   */
448
  @Override
449
  public Checksum getChecksum(Session session, Identifier pid)
450
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
451
    NotImplemented {
452
        
453
    // The lock to be used for thyis identifier
454
    Lock lock = null;
455
    
456
    if (!isAuthorized(session, pid, Permission.READ)) {
457
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
458
    }
459
    
460
    SystemMetadata systemMetadata = null;
461
    Checksum checksum = null;
462
    
463
    try {
464
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
465
        checksum = systemMetadata.getChecksum();
466

    
467
    } catch (Exception e) {
468
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
469
            pid.getValue() + ". The error message was: " + e.getMessage());
470
      
471
    }
472
    
473
    return checksum;
474
  }
475

    
476
  /**
477
   * Resolve the location of a given object
478
   * 
479
   * @param session - the Session object containing the credentials for the Subject
480
   * @param pid - the object identifier for the given object
481
   * 
482
   * @return objectLocationList - the list of nodes known to contain the object
483
   * 
484
   * @throws InvalidToken
485
   * @throws ServiceFailure
486
   * @throws NotAuthorized
487
   * @throws NotFound
488
   * @throws NotImplemented
489
   */
490
  @Override
491
  public ObjectLocationList resolve(Session session, Identifier pid)
492
    throws InvalidToken, ServiceFailure, NotAuthorized,
493
    NotFound, NotImplemented {
494

    
495
    throw new NotImplemented("4131", "resolve not implemented");
496

    
497
  }
498

    
499
  /**
500
   * Search the metadata catalog for identifiers that match the criteria
501
   * 
502
   * @param session - the Session object containing the credentials for the Subject
503
   * @param queryType - An identifier for the type of query expression 
504
   *                    provided in the query
505
   * @param query -  The criteria for matching the characteristics of the 
506
   *                 metadata objects of interest
507
   * 
508
   * @return objectList - the list of objects matching the criteria
509
   * 
510
   * @throws InvalidToken
511
   * @throws ServiceFailure
512
   * @throws NotAuthorized
513
   * @throws InvalidRequest
514
   * @throws NotImplemented
515
   */
516
  @Override
517
  public ObjectList search(Session session, String queryType, String query)
518
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
519
    NotImplemented {
520

    
521
    ObjectList objectList = null;
522
    try {
523
        objectList = 
524
          IdentifierManager.getInstance().querySystemMetadata(
525
              null, //startTime, 
526
              null, //endTime,
527
              null, //objectFormat, 
528
              false, //replicaStatus, 
529
              0, //start, 
530
              -1 //count
531
              );
532
        
533
    } catch (Exception e) {
534
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
535
    }
536

    
537
      return objectList;
538
      
539
    //throw new NotImplemented("4281", "search not implemented");
540
    
541
    // the code block below is from an older implementation
542
    
543
    /*  This block commented out because of the EcoGrid circular dependency.
544
         *  For now, query will not be supported until the circularity can be
545
         *  resolved, probably by moving the ecogrid query syntax transformers
546
         *  directly into the Metacat codebase.  MBJ 2010-02-03
547
         
548
        try {
549
            EcogridQueryParser parser = new EcogridQueryParser(request
550
                    .getReader());
551
            parser.parseXML();
552
            QueryType queryType = parser.getEcogridQuery();
553
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
554
                new EcogridJavaToMetacatJavaQueryTransformer();
555
            QuerySpecification metacatQuery = queryTransformer
556
                    .transform(queryType);
557

    
558
            DBQuery metacat = new DBQuery();
559

    
560
            boolean useXMLIndex = (new Boolean(PropertyService
561
                    .getProperty("database.usexmlindex"))).booleanValue();
562
            String xmlquery = "query"; // we don't care the query in resultset,
563
            // the query can be anything
564
            PrintWriter out = null; // we don't want metacat result, so set out null
565

    
566
            // parameter: queryspecification, user, group, usingIndexOrNot
567
            StringBuffer result = metacat.createResultDocument(xmlquery,
568
                    metacatQuery, out, username, groupNames, useXMLIndex);
569

    
570
            // create result set transfer       
571
            String saxparser = PropertyService.getProperty("xml.saxparser");
572
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
573
                    new StringReader(result.toString()), saxparser, queryType
574
                            .getNamespace().get_value());
575
            ResultsetType records = metacatResultsetParser.getEcogridResult();
576

    
577
            System.out
578
                    .println(EcogridResultsetTransformer.toXMLString(records));
579
            response.setContentType("text/xml");
580
            out = response.getWriter();
581
            out.print(EcogridResultsetTransformer.toXMLString(records));
582

    
583
        } catch (Exception e) {
584
            e.printStackTrace();
585
        }*/
586
    
587

    
588
  }
589
  
590
  /**
591
   * Returns the object format registered in the DataONE Object Format 
592
   * Vocabulary for the given format identifier
593
   * 
594
   * @param fmtid - the identifier of the format requested
595
   * 
596
   * @return objectFormat - the object format requested
597
   * 
598
   * @throws ServiceFailure
599
   * @throws NotFound
600
   * @throws InsufficientResources
601
   * @throws NotImplemented
602
   */
603
  @Override
604
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
605
    throws ServiceFailure, NotFound, NotImplemented {
606
     
607
      return ObjectFormatService.getInstance().getFormat(fmtid);
608
      
609
  }
610

    
611
  /**
612
   * Returns a list of all object formats registered in the DataONE Object 
613
   * Format Vocabulary
614
    * 
615
   * @return objectFormatList - The list of object formats registered in 
616
   *                            the DataONE Object Format Vocabulary
617
   * 
618
   * @throws ServiceFailure
619
   * @throws NotImplemented
620
   * @throws InsufficientResources
621
   */
622
  @Override
623
  public ObjectFormatList listFormats() 
624
    throws ServiceFailure, NotImplemented {
625

    
626
    return ObjectFormatService.getInstance().listFormats();
627
  }
628

    
629
  /**
630
   * Returns a list of nodes that have been registered with the DataONE infrastructure
631
    * 
632
   * @return nodeList - List of nodes from the registry
633
   * 
634
   * @throws ServiceFailure
635
   * @throws NotImplemented
636
   */
637
  @Override
638
  public NodeList listNodes() 
639
    throws NotImplemented, ServiceFailure {
640

    
641
    throw new NotImplemented("4800", "listNodes not implemented");
642
  }
643

    
644
  /**
645
   * Provides a mechanism for adding system metadata independently of its 
646
   * associated object, such as when adding system metadata for data objects.
647
    * 
648
   * @param session - the Session object containing the credentials for the Subject
649
   * @param pid - The identifier of the object to register the system metadata against
650
   * @param sysmeta - The system metadata to be registered
651
   * 
652
   * @return true if the registration succeeds
653
   * 
654
   * @throws NotImplemented
655
   * @throws NotAuthorized
656
   * @throws ServiceFailure
657
   * @throws InvalidRequest
658
   * @throws InvalidSystemMetadata
659
   */
660
  @Override
661
  public Identifier registerSystemMetadata(Session session, Identifier pid,
662
      SystemMetadata sysmeta) 
663
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
664
      InvalidSystemMetadata {
665

    
666
      // The lock to be used for this identifier
667
      Lock lock = null;
668

    
669
      // TODO: control who can call this?
670
      if (session == null) {
671
          //TODO: many of the thrown exceptions do not use the correct error codes
672
          //check these against the docs and correct them
673
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
674
                  "  If you are not logged in, please do so and retry the request.");
675
      }
676
      
677
      // verify that guid == SystemMetadata.getIdentifier()
678
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
679
          "|" + sysmeta.getIdentifier().getValue());
680
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
681
          throw new InvalidRequest("4863", 
682
              "The identifier in method call (" + pid.getValue() + 
683
              ") does not match identifier in system metadata (" +
684
              sysmeta.getIdentifier().getValue() + ").");
685
      }
686

    
687
      logMetacat.debug("Checking if identifier exists...");
688
      // Check that the identifier does not already exist
689
      if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
690
          throw new InvalidRequest("4863", 
691
              "The identifier is already in use by an existing object.");
692
      
693
      }
694
      
695
      // insert the system metadata into the object store
696
      logMetacat.debug("Starting to insert SystemMetadata...");
697
      try {
698
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
699
          sysmeta.setSerialVersion(BigInteger.ONE);
700
          sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
701
          HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
702
          
703
      } catch (Exception e) {
704
        logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
705
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
706
              e.getClass() + ": " + e.getMessage());
707
          
708
      } finally {
709
        lock.unlock();
710
        logMetacat.debug("Unlocked identifier " + pid.getValue());
711

    
712
      }
713
      
714
      logMetacat.debug("Returning from registerSystemMetadata");
715
      EventLog.getInstance().log(request.getRemoteAddr(), 
716
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
717
          pid.getValue(), "registerSystemMetadata");
718
      return pid;
719
  }
720
  
721
  /**
722
   * Given an optional scope and format, reserves and returns an identifier 
723
   * within that scope and format that is unique and will not be 
724
   * used by any other sessions. 
725
    * 
726
   * @param session - the Session object containing the credentials for the Subject
727
   * @param pid - The identifier of the object to register the system metadata against
728
   * @param scope - An optional string to be used to qualify the scope of 
729
   *                the identifier namespace, which is applied differently 
730
   *                depending on the format requested. If scope is not 
731
   *                supplied, a default scope will be used.
732
   * @param format - The optional name of the identifier format to be used, 
733
   *                  drawn from a DataONE-specific vocabulary of identifier 
734
   *                 format names, including several common syntaxes such 
735
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
736
   *                 format is not supplied by the caller, the CN service 
737
   *                 will use a default identifier format, which may change 
738
   *                 over time.
739
   * 
740
   * @return true if the registration succeeds
741
   * 
742
   * @throws InvalidToken
743
   * @throws ServiceFailure
744
   * @throws NotAuthorized
745
   * @throws IdentifierNotUnique
746
   * @throws NotImplemented
747
   */
748
  @Override
749
  public Identifier reserveIdentifier(Session session, Identifier pid)
750
  throws InvalidToken, ServiceFailure,
751
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
752

    
753
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
754
  }
755
  
756
  @Override
757
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
758
  throws InvalidToken, ServiceFailure,
759
        NotAuthorized, NotImplemented, InvalidRequest {
760
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
761
  }
762
  
763
  /**
764
    * Checks whether the pid is reserved by the subject in the session param
765
    * If the reservation is held on the pid by the subject, we return true.
766
    * 
767
   * @param session - the Session object containing the Subject
768
   * @param pid - The identifier to check
769
   * 
770
   * @return true if the reservation exists for the subject/pid
771
   * 
772
   * @throws InvalidToken
773
   * @throws ServiceFailure
774
   * @throws NotFound - when the pid is not found (in use or in reservation)
775
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
776
   * @throws IdentifierNotUnique - when the pid is in use
777
   * @throws NotImplemented
778
   */
779

    
780
  @Override
781
  public boolean hasReservation(Session session, Identifier pid) 
782
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
783
      NotImplemented, InvalidRequest {
784
  
785
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
786
  }
787

    
788
  /**
789
   * Changes ownership (RightsHolder) of the specified object to the 
790
   * subject specified by userId
791
    * 
792
   * @param session - the Session object containing the credentials for the Subject
793
   * @param pid - Identifier of the object to be modified
794
   * @param userId - The subject that will be taking ownership of the specified object.
795
   *
796
   * @return pid - the identifier of the modified object
797
   * 
798
   * @throws ServiceFailure
799
   * @throws InvalidToken
800
   * @throws NotFound
801
   * @throws NotAuthorized
802
   * @throws NotImplemented
803
   * @throws InvalidRequest
804
   */  
805
  @Override
806
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
807
      long serialVersion)
808
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
809
      NotImplemented, InvalidRequest {
810
      
811
      // The lock to be used for this identifier
812
      Lock lock = null;
813

    
814
      // get the subject
815
      Subject subject = session.getSubject();
816
      
817
      // are we allowed to do this?
818
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
819
        throw new NotAuthorized("4440", "not allowed by " + subject.getValue() + " on " + pid.getValue());  
820
      }
821
      
822
      SystemMetadata systemMetadata = null;
823
      try {
824
          lock = HazelcastService.getInstance().getLock(pid.getValue());
825
          
826
          try {
827
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
828
              
829
              // does the request have the most current system metadata?
830
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
831
                 String msg = "The requested system metadata version number " + 
832
                     serialVersion + " differs from the current version at " +
833
                     systemMetadata.getSerialVersion().longValue() +
834
                     ". Please get the latest copy in order to modify it.";
835
                 throw new InvalidRequest("4442", msg);
836
              }
837
              
838
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
839
              throw new NotFound("4460", "No record found for: " + pid.getValue());
840
              
841
          }
842
              
843
          // set the new rights holder
844
          systemMetadata.setRightsHolder(userId);
845
          
846
          // update the metadata
847
          try {
848
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
849
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
850
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
851
              
852
          } catch (Exception e) {
853
          throw new ServiceFailure("4490", e.getMessage());
854
          
855
          }
856
          
857
      } catch (Exception e) {
858
          throw new ServiceFailure("4490", e.getMessage());
859
          
860
      } finally {
861
          lock.unlock();
862
          logMetacat.debug("Unlocked identifier " + pid.getValue());
863
      
864
      }
865
      
866
    return pid;
867
  }
868

    
869
  /**
870
   * Verify that a replication task is authorized by comparing the target node's
871
   * Subject (from the X.509 certificate-derived Session) with the list of 
872
   * subjects in the known, pending replication tasks map.
873
   * 
874
   * @param originatingNodeSession - Session information that contains the 
875
   *                                 identity of the calling user
876
   * @param targetNodeSubject - Subject identifying the target node
877
   * @param pid - the identifier of the object to be replicated
878
   * @param replicatePermission - the execute permission to be granted
879
   * 
880
   * @throws ServiceFailure
881
   * @throws NotImplemented
882
   * @throws InvalidToken
883
   * @throws NotAuthorized
884
   * @throws InvalidRequest
885
   * @throws NotFound
886
   */
887
  @Override
888
  public boolean isNodeAuthorized(Session originatingNodeSession, 
889
    Subject targetNodeSubject, Identifier pid) 
890
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
891
    NotFound, InvalidRequest {
892

    
893
    // The lock to be used for this identifier
894
    Lock lock = null;
895
    
896
    boolean isAllowed = false;
897
    SystemMetadata sysmeta = null;
898
    NodeReference targetNode = null;
899
    
900
    try {
901
      // get the target node reference from the nodes list
902
      CNode cn = D1Client.getCN();
903
      List<Node> nodes = cn.listNodes().getNodeList();
904
      
905
      if ( nodes != null ) {
906
        for (Node node : nodes) {
907
            
908
            for (Subject nodeSubject : node.getSubjectList()) {
909
                
910
                if ( nodeSubject.equals(targetNodeSubject) ) {
911
                    targetNode = node.getIdentifier();
912
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
913
                    break;
914
                }
915
            }
916
            
917
            if ( targetNode != null) { break; }
918
        }
919
        
920
      } else {
921
          String msg = "Couldn't get the node list from the CN";
922
          logMetacat.debug(msg);
923
          throw new ServiceFailure("4872", msg);
924
          
925
      }
926
      
927
      // can't find a node listed with the given subject
928
      if ( targetNode == null ) {
929
          String msg = "There is no Member Node registered with a node subject " +
930
              "matching " + targetNodeSubject.getValue();
931
          logMetacat.info(msg);
932
          throw new ServiceFailure("4872", msg);
933
          
934
      }
935
      
936
      //lock, get, and unlock the pid
937
      lock = HazelcastService.getInstance().getLock(pid.getValue());
938
      lock.lock();
939
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
940
      
941
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
942

    
943
      if ( sysmeta != null ) {
944
          
945
          List<Replica> replicaList = sysmeta.getReplicaList();
946
          
947
          if ( replicaList != null ) {
948
              
949
              // find the replica with the status set to 'requested'
950
              for (Replica replica : replicaList) {
951
                  ReplicationStatus status = replica.getReplicationStatus();
952
                  NodeReference listedNode = replica.getReplicaMemberNode();
953
                  if ( listedNode != null && targetNode != null ) {
954
                      logMetacat.debug("Comparing " + listedNode.getValue()
955
                              + " to " + targetNode.getValue());
956
                      
957
                      if (listedNode.getValue().equals(targetNode.getValue())
958
                              && status.equals(ReplicationStatus.REQUESTED)) {
959
                          isAllowed = true;
960
                          break;
961

    
962
                      }
963
                  }
964
              }
965
          }
966
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
967
              "to replicate: " + isAllowed + " for " + pid.getValue());
968

    
969
          
970
      } else {
971
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
972
          " is null.");          
973
          
974
      }
975

    
976
    } catch (RuntimeException e) {
977
    	  ServiceFailure sf = new ServiceFailure("4872", 
978
                "Runtime Exception: Couldn't determine if node is allowed: " + 
979
                e.getCause().getMessage());
980
    	  sf.initCause(e);
981
        throw sf;
982
        
983
    } finally {
984
      // always unlock the pid
985
      lock.unlock();
986
      logMetacat.debug("Unlocked identifier " + pid.getValue());
987

    
988
    }
989
      
990
    return isAllowed;
991
    
992
  }
993

    
994
  /**
995
   * Adds a new object to the Node, where the object is a science metadata object.
996
   * 
997
   * @param session - the Session object containing the credentials for the Subject
998
   * @param pid - The object identifier to be created
999
   * @param object - the object bytes
1000
   * @param sysmeta - the system metadata that describes the object  
1001
   * 
1002
   * @return pid - the object identifier created
1003
   * 
1004
   * @throws InvalidToken
1005
   * @throws ServiceFailure
1006
   * @throws NotAuthorized
1007
   * @throws IdentifierNotUnique
1008
   * @throws UnsupportedType
1009
   * @throws InsufficientResources
1010
   * @throws InvalidSystemMetadata
1011
   * @throws NotImplemented
1012
   * @throws InvalidRequest
1013
   */
1014
  public Identifier create(Session session, Identifier pid, InputStream object,
1015
    SystemMetadata sysmeta) 
1016
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1017
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1018
    NotImplemented, InvalidRequest {
1019
      
1020
      
1021
      // The lock to be used for this identifier
1022
      Lock lock = null;
1023
      
1024
      try {
1025
        // are we allowed?
1026
          boolean isAllowed = false;
1027
          CNode cn = D1Client.getCN();
1028
          NodeList nodeList = cn.listNodes();
1029
          
1030
          for (Node node : nodeList.getNodeList()) {
1031
              if ( node.getType().equals(NodeType.CN) ) {
1032
                  
1033
                  List<Subject> subjects = node.getSubjectList();
1034
                  for (Subject subject : subjects) {
1035
                     if (subject.equals(session.getSubject())) {
1036
                         isAllowed = true;
1037
                         break;
1038
                     }
1039
                  }
1040
              }
1041
          }
1042

    
1043
          // proceed if we're called by a CN
1044
          if ( isAllowed ) {
1045
              // create the coordinating node version of the document      
1046
              lock = HazelcastService.getInstance().getLock(pid.getValue());
1047
              lock.lock();
1048
              sysmeta.setSerialVersion(BigInteger.ONE);
1049
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1050
              pid = super.create(session, pid, object, sysmeta);
1051

    
1052
          } else {
1053
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1054
                  " isn't allowed to call create() on a Coordinating Node.";
1055
              logMetacat.info(msg);
1056
              throw new NotAuthorized("1100", msg);
1057
          }
1058
          
1059
      } catch (RuntimeException e) {
1060
          // Convert Hazelcast runtime exceptions to service failures
1061
          String msg = "There was a problem creating the object identified by " +
1062
              pid.getValue() + ". There error message was: " + e.getMessage();
1063
          throw new ServiceFailure("4893", msg);
1064
          
1065
      } finally {
1066
    	  if (lock != null) {
1067
	          lock.unlock();
1068
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1069
    	  }
1070
      }
1071
      
1072
      return pid;
1073

    
1074
  }
1075

    
1076
  /**
1077
   * Set access for a given object using the object identifier and a Subject
1078
   * under a given Session.
1079
   * 
1080
   * @param session - the Session object containing the credentials for the Subject
1081
   * @param pid - the object identifier for the given object to apply the policy
1082
   * @param policy - the access policy to be applied
1083
   * 
1084
   * @return true if the application of the policy succeeds
1085
   * @throws InvalidToken
1086
   * @throws ServiceFailure
1087
   * @throws NotFound
1088
   * @throws NotAuthorized
1089
   * @throws NotImplemented
1090
   * @throws InvalidRequest
1091
   */
1092
  public boolean setAccessPolicy(Session session, Identifier pid, 
1093
      AccessPolicy accessPolicy, long serialVersion) 
1094
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1095
      NotImplemented, InvalidRequest {
1096
      
1097
      // The lock to be used for this identifier
1098
      Lock lock = null;
1099
      
1100
      boolean success = false;
1101
      
1102
      // get the subject
1103
      Subject subject = session.getSubject();
1104
      
1105
      // are we allowed to do this?
1106
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1107
          throw new NotAuthorized("4420", "not allowed by " + subject.getValue() + 
1108
          " on " + pid.getValue());  
1109
      }
1110
      
1111
      SystemMetadata systemMetadata = null;
1112
      try {
1113
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1114
          lock.lock();
1115
          
1116
          try {
1117
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1118

    
1119
              // does the request have the most current system metadata?
1120
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1121
                 String msg = "The requested system metadata version number " + 
1122
                     serialVersion + " differs from the current version at " +
1123
                     systemMetadata.getSerialVersion().longValue() +
1124
                     ". Please get the latest copy in order to modify it.";
1125
                 throw new InvalidRequest("4402", msg);
1126
              }
1127
              
1128
          } catch (Exception e) {
1129
              // convert Hazelcast RuntimeException to NotFound
1130
              throw new NotFound("4400", "No record found for: " + pid);
1131
            
1132
          }
1133
              
1134
          // set the access policy
1135
          systemMetadata.setAccessPolicy(accessPolicy);
1136
          
1137
          // update the system metadata
1138
          try {
1139
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1140
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1141
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1142
            
1143
          } catch (Exception e) {
1144
              // convert Hazelcast RuntimeException to ServiceFailure
1145
              throw new ServiceFailure("4430", e.getMessage());
1146
            
1147
          }
1148
          
1149
      } catch (Exception e) {
1150
          throw new ServiceFailure("4430", e.getMessage());
1151
          
1152
      } finally {
1153
          lock.unlock();
1154
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1155
        
1156
      }
1157

    
1158
    
1159
    // TODO: how do we know if the map was persisted?
1160
    success = true;
1161
    
1162
    return success;
1163
  }
1164

    
1165
  /**
1166
   * Full replacement of replication metadata in the system metadata for the 
1167
   * specified object, changes date system metadata modified
1168
   * 
1169
   * @param session - the Session object containing the credentials for the Subject
1170
   * @param pid - the object identifier for the given object to apply the policy
1171
   * @param replica - the replica to be updated
1172
   * @return
1173
   * @throws NotImplemented
1174
   * @throws NotAuthorized
1175
   * @throws ServiceFailure
1176
   * @throws InvalidRequest
1177
   * @throws NotFound
1178
   */
1179
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1180
      Replica replica, long serialVersion) 
1181
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1182
      NotFound {
1183
      
1184
      // The lock to be used for this identifier
1185
      Lock lock = null;
1186
      
1187
      // get the subject
1188
      Subject subject = session.getSubject();
1189
      
1190
      // are we allowed to do this?
1191
      try {
1192
        // what is the controlling permission?
1193
        if (!isAuthorized(session, pid, Permission.WRITE)) {
1194
            throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1195
            " on " + pid.getValue());  
1196
        }
1197
        
1198
      } catch (InvalidToken e) {
1199
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1200
                  " on " + pid.getValue());  
1201
          
1202
      }
1203

    
1204
      SystemMetadata systemMetadata = null;
1205
      try {
1206
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1207
          lock.lock();
1208

    
1209
          try {      
1210
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1211

    
1212
              // does the request have the most current system metadata?
1213
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1214
                 String msg = "The requested system metadata version number " + 
1215
                     serialVersion + " differs from the current version at " +
1216
                     systemMetadata.getSerialVersion().longValue() +
1217
                     ". Please get the latest copy in order to modify it.";
1218
                 throw new InvalidRequest("4853", msg);
1219
              }
1220
              
1221
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
1222
            throw new NotFound("4854", "No record found for: " + pid.getValue() +
1223
                " : " + e.getMessage());
1224
            
1225
          }
1226
              
1227
          // set the status for the replica
1228
          List<Replica> replicas = systemMetadata.getReplicaList();
1229
          NodeReference replicaNode = replica.getReplicaMemberNode();
1230
          int index = 0;
1231
          for (Replica listedReplica: replicas) {
1232
              
1233
              // remove the replica that we are replacing
1234
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1235
                  replicas.remove(index);
1236
                  break;
1237
                  
1238
              }
1239
              index++;
1240
          }
1241
          
1242
          // add the new replica item
1243
          replicas.add(replica);
1244
          systemMetadata.setReplicaList(replicas);
1245
          
1246
          // update the metadata
1247
          try {
1248
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1249
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1250
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1251
            
1252
          } catch (Exception e) {
1253
              throw new ServiceFailure("4852", e.getMessage());
1254
          
1255
          }
1256
          
1257
    } catch (Exception e) {
1258
        throw new ServiceFailure("4852", e.getMessage());
1259

    
1260
    } finally {
1261
        lock.unlock();
1262
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1263
        
1264
    }
1265
    
1266
      return true;
1267
      
1268
  }
1269
  
1270
    @Override
1271
  public ObjectList listObjects(Session session, Date startTime, 
1272
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1273
      Integer start, Integer count)
1274
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1275
      ServiceFailure {
1276
      
1277
      ObjectList objectList = null;
1278
        try {
1279
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1280
        } catch (Exception e) {
1281
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1282
        }
1283

    
1284
        return objectList;
1285
  }
1286

    
1287
	@Override
1288
	public ChecksumAlgorithmList listChecksumAlgorithms()
1289
			throws ServiceFailure, NotImplemented {
1290
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1291
		cal.addAlgorithm("MD5");
1292
		cal.addAlgorithm("SHA-1");
1293
		return null;
1294
	}
1295
    
1296
}
(1-1/5)