Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.Calendar;
29
import java.util.Date;
30
import java.util.List;
31
import java.util.Set;
32
import java.util.concurrent.locks.Lock;
33

    
34
import javax.servlet.http.HttpServletRequest;
35

    
36
import org.apache.log4j.Logger;
37
import org.dataone.client.CNode;
38
import org.dataone.client.D1Client;
39
import org.dataone.service.cn.v1.CNAuthorization;
40
import org.dataone.service.cn.v1.CNCore;
41
import org.dataone.service.cn.v1.CNRead;
42
import org.dataone.service.cn.v1.CNReplication;
43
import org.dataone.service.exceptions.BaseException;
44
import org.dataone.service.exceptions.IdentifierNotUnique;
45
import org.dataone.service.exceptions.InsufficientResources;
46
import org.dataone.service.exceptions.InvalidRequest;
47
import org.dataone.service.exceptions.InvalidSystemMetadata;
48
import org.dataone.service.exceptions.InvalidToken;
49
import org.dataone.service.exceptions.NotAuthorized;
50
import org.dataone.service.exceptions.NotFound;
51
import org.dataone.service.exceptions.NotImplemented;
52
import org.dataone.service.exceptions.ServiceFailure;
53
import org.dataone.service.exceptions.UnsupportedType;
54
import org.dataone.service.types.v1.AccessPolicy;
55
import org.dataone.service.types.v1.Checksum;
56
import org.dataone.service.types.v1.ChecksumAlgorithmList;
57
import org.dataone.service.types.v1.Identifier;
58
import org.dataone.service.types.v1.Node;
59
import org.dataone.service.types.v1.NodeList;
60
import org.dataone.service.types.v1.NodeReference;
61
import org.dataone.service.types.v1.NodeType;
62
import org.dataone.service.types.v1.ObjectFormat;
63
import org.dataone.service.types.v1.ObjectFormatIdentifier;
64
import org.dataone.service.types.v1.ObjectFormatList;
65
import org.dataone.service.types.v1.ObjectList;
66
import org.dataone.service.types.v1.ObjectLocationList;
67
import org.dataone.service.types.v1.Permission;
68
import org.dataone.service.types.v1.Replica;
69
import org.dataone.service.types.v1.ReplicationPolicy;
70
import org.dataone.service.types.v1.ReplicationStatus;
71
import org.dataone.service.types.v1.Session;
72
import org.dataone.service.types.v1.Subject;
73
import org.dataone.service.types.v1.SystemMetadata;
74

    
75
import edu.ucsb.nceas.metacat.EventLog;
76
import edu.ucsb.nceas.metacat.IdentifierManager;
77
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
78

    
79
/**
80
 * Represents Metacat's implementation of the DataONE Coordinating Node 
81
 * service API. Methods implement the various CN* interfaces, and methods common
82
 * to both Member Node and Coordinating Node interfaces are found in the
83
 * D1NodeService super class.
84
 *
85
 */
86
public class CNodeService extends D1NodeService implements CNAuthorization,
87
    CNCore, CNRead, CNReplication {
88

    
89
  /* the logger instance */
90
  private Logger logMetacat = null;
91

    
92
  /**
93
   * singleton accessor
94
   */
95
  public static CNodeService getInstance(HttpServletRequest request) { 
96
    return new CNodeService(request);
97
  }
98
  
99
  /**
100
   * Constructor, private for singleton access
101
   */
102
  private CNodeService(HttpServletRequest request) {
103
    super(request);
104
    logMetacat = Logger.getLogger(CNodeService.class);
105
        
106
  }
107
    
108
  /**
109
   * Set the replication policy for an object given the object identifier
110
   * 
111
   * @param session - the Session object containing the credentials for the Subject
112
   * @param pid - the object identifier for the given object
113
   * @param policy - the replication policy to be applied
114
   * 
115
   * @return true or false
116
   * 
117
   * @throws NotImplemented
118
   * @throws NotAuthorized
119
   * @throws ServiceFailure
120
   * @throws InvalidRequest
121
   * 
122
   */
123
  @Override
124
  public boolean setReplicationPolicy(Session session, Identifier pid,
125
      ReplicationPolicy policy, long serialVersion) 
126
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
127
      InvalidRequest, InvalidToken {
128
      
129
      // The lock to be used for this identifier
130
      Lock lock = null;
131
      
132
      // get the subject
133
      Subject subject = session.getSubject();
134
      
135
      // are we allowed to do this?
136
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
137
        throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION + 
138
            " not allowed by " + subject.getValue() + " on " + pid.getValue());  
139
      }
140
      
141
      SystemMetadata systemMetadata = null;
142
      try {
143
          lock = HazelcastService.getInstance().getLock(pid.getValue());
144
          lock.lock();
145

    
146
          try {
147
              if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
148
                  systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
149
                  
150
              }
151
            
152

    
153
              // does the request have the most current system metadata?
154
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
155
                 String msg = "The requested system metadata version number " + 
156
                     serialVersion + " differs from the current version at " +
157
                     systemMetadata.getSerialVersion().longValue() +
158
                     ". Please get the latest copy in order to modify it.";
159
                 throw new InvalidRequest("4883", msg);
160
              }
161
              
162
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
163
              throw new NotFound("4884", "No record found for: " + pid.getValue());
164
            
165
          }
166
          
167
          // set the new policy
168
          systemMetadata.setReplicationPolicy(policy);
169
          
170
          // update the metadata
171
          try {
172
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
173
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
174
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
175
            
176
          } catch (Exception e) {
177
              throw new ServiceFailure("4882", e.getMessage());
178
          
179
          }
180
          
181
    } catch (Exception e) {
182
        throw new ServiceFailure("4882", e.getMessage());
183
        
184
    } finally {
185
        lock.unlock();
186
        logMetacat.debug("Unlocked identifier " + pid.getValue());
187
        
188
    }
189
    
190
      return true;
191
  }
192

    
193
  /**
194
   * Set the replication status for an object given the object identifier
195
   * 
196
   * @param session - the Session object containing the credentials for the Subject
197
   * @param pid - the object identifier for the given object
198
   * @param status - the replication status to be applied
199
   * 
200
   * @return true or false
201
   * 
202
   * @throws NotImplemented
203
   * @throws NotAuthorized
204
   * @throws ServiceFailure
205
   * @throws InvalidRequest
206
   * @throws InvalidToken
207
   * @throws NotFound
208
   * 
209
   */
210
  @Override
211
  public boolean setReplicationStatus(Session session, Identifier pid,
212
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
213
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
214
      InvalidRequest, NotFound {
215
      
216
      // The lock to be used for this identifier
217
      Lock lock = null;
218
      
219
      boolean allowed = false;
220
      int replicaEntryIndex = -1;
221
      List<Replica> replicas = null;
222
      // get the subject
223
      Subject subject = session.getSubject();
224
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
225
          " is " + status.toString());
226
      
227
      SystemMetadata systemMetadata = null;
228

    
229
      try {
230
          lock = HazelcastService.getInstance().getLock(pid.getValue());
231
          lock.lock();
232
          try {      
233
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
234

    
235
              if ( systemMetadata == null ) {
236
                  logMetacat.debug("systemMetadata is null for " + pid.getValue());
237
                  
238
              }
239
              replicas = systemMetadata.getReplicaList();
240
              int count = 0;
241
              
242
              if ( replicas == null || replicas.size() < 1 ) {
243
                  logMetacat.debug("no replicas to evaluate");
244
                  throw new InvalidRequest("4730", "There are no replicas to update.");
245
                  
246
              }
247

    
248
              // find the target replica index in the replica list
249
              for (Replica replica: replicas) {
250
                  String replicaNodeStr = replica.getReplicaMemberNode().getValue();
251
                  String targetNodeStr = targetNode.getValue();
252
                  logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
253
                  
254
                  if (replicaNodeStr.equals(targetNodeStr)) {
255
                      replicaEntryIndex = count;
256
                      logMetacat.debug("replica entry index is: " + replicaEntryIndex);
257
                      break;
258
                  }
259
                  count++;
260
                  
261
              }
262

    
263
              // are we allowed to do this? only CNs and target MNs are allowed
264
              CNode cn = D1Client.getCN();
265
              List<Node> nodes = cn.listNodes().getNodeList();
266
              
267
              // find the node in the node list
268
              for ( Node node : nodes ) {
269
                  
270
                  NodeReference nodeReference = node.getIdentifier();
271
                  logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
272
                  
273
                  // allow target MN certs and CN certs
274
                  if (targetNode.getValue().equals(nodeReference.getValue()) ||
275
                      node.getType() == NodeType.CN) {
276
                      List<Subject> nodeSubjects = node.getSubjectList();
277
                      
278
                      // check if the session subject is in the node subject list
279
                      for (Subject nodeSubject : nodeSubjects) {
280
                          if ( nodeSubject.equals(subject) ) {
281
                              allowed = true; // subject of session == target node subject
282
                              break;
283
                              
284
                          }
285
                      }                 
286
                  }
287
              }
288

    
289
              if ( !allowed ) {
290
                  String msg = "The subject identified by " + subject.getValue() +
291
                    " does not have permission to set the replication status for " +
292
                    "the replica identified by " + targetNode.getValue() + ".";
293
                  logMetacat.info(msg);
294
                  throw new NotAuthorized("4720", msg);
295
                  
296
              }
297
              
298
              // was there a failure? log it
299
              if ( failure != null && status == ReplicationStatus.FAILED ) {
300
                 String msg = "The replication request of the object identified by " + 
301
                     pid.getValue() + " failed.  The error message was " +
302
                     failure.getMessage() + ".";
303
              }
304
              
305
          } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
306
            throw new NotFound("4740", "No record found for: " + pid.getValue() +
307
                " : " + e.getMessage());
308
            
309
          }
310
          
311
          // set the status for the replica
312
          if ( replicaEntryIndex != -1 ) {
313
              Replica targetReplica = replicas.get(replicaEntryIndex);
314
              targetReplica.setReplicationStatus(status);
315
              logMetacat.debug("Set the replication status for " + 
316
                  targetReplica.getReplicaMemberNode().getValue() + " to " +
317
                  targetReplica.getReplicationStatus());
318
              
319
          } else {
320
              throw new InvalidRequest("4730", "There are no replicas to update.");
321

    
322
          }
323
          
324
          systemMetadata.setReplicaList(replicas);
325
                
326
          // update the metadata
327
          try {
328
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
329
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
330
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
331
              
332
              if ( status == ReplicationStatus.FAILED && failure != null ) {
333
                  logMetacat.warn("Replication failed for identifier " + pid.getValue() +
334
                      " on target node " + targetNode + ". The exception was: " +
335
                      failure.getMessage());
336
              }
337
          } catch (Exception e) {
338
              throw new ServiceFailure("4700", e.getMessage());
339
          
340
          }
341
          
342
    } catch (Exception e) {
343
        lock.unlock();
344
        logMetacat.debug("Unlocked identifier " + pid.getValue());
345
        
346
    }
347
      
348
      return true;
349
  }
350

    
351
  /**
352
   * Test that the specified relationship between pidOfSubject and pidOfObject exists
353
   * 
354
   * @param session - the Session object containing the credentials for the Subject
355
   * @param node - the node information for the given node be modified
356
   * 
357
   * @return true if the relationship exists
358
   * 
359
   * @throws InvalidToken
360
   * @throws ServiceFailure
361
   * @throws NotAuthorized
362
   * @throws NotFound
363
   * @throws InvalidRequest
364
   * @throws NotImplemented
365
   */
366
  @Override
367
  public boolean assertRelation(Session session, Identifier pidOfSubject, 
368
    String relationship, Identifier pidOfObject) 
369
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
370
    InvalidRequest, NotImplemented {
371
    
372
    // The lock to be used for thyis identifier
373
    Lock lock = null;
374

    
375
    boolean asserted = false;
376
        
377
    // are we allowed to do this?
378
    if (!isAuthorized(session, pidOfSubject, Permission.READ)) {
379
      throw new NotAuthorized("4881", Permission.READ + " not allowed on " + pidOfSubject.getValue());  
380
    }
381
    
382
    SystemMetadata systemMetadata = null;
383
    try {
384
        lock = HazelcastService.getInstance().getLock(pidOfSubject.getValue());
385
        lock.lock();
386
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pidOfSubject);
387
        
388
        
389
        // check relationships
390
        // TODO: use ORE map
391
        if (relationship.equalsIgnoreCase("describes")) {
392
          
393
        }
394
        
395
        if (relationship.equalsIgnoreCase("describedBy")) {
396
          
397
        }
398
        
399
        if (relationship.equalsIgnoreCase("derivedFrom")) {
400
          
401
        }
402
        
403
        if (relationship.equalsIgnoreCase("obsoletes")) {
404
            Identifier pid = systemMetadata.getObsoletes();
405
            if (pid.getValue().equals(pidOfObject.getValue())) {
406
              asserted = true;
407
            
408
        }
409
          //return systemMetadata.getObsoleteList().contains(pidOfObject);
410
        }
411
        if (relationship.equalsIgnoreCase("obsoletedBy")) {
412
            Identifier pid = systemMetadata.getObsoletedBy();
413
            if (pid.getValue().equals(pidOfObject.getValue())) {
414
              asserted = true;
415
            
416
        }
417
          //return systemMetadata.getObsoletedByList().contains(pidOfObject);
418
        }
419
              
420
    } catch (Exception e) {
421
        throw new ServiceFailure("4270", "Could not assert relation for: " + 
422
            pidOfSubject.getValue() +
423
            ". The error message was: " + e.getMessage());
424
      
425
    } finally {
426
        lock.unlock();
427
        logMetacat.debug("Unlocked identifier " + pidOfSubject.getValue());
428

    
429
    }
430
        
431
    return asserted;
432
  }
433
  
434
  /**
435
   * Return the checksum of the object given the identifier 
436
   * 
437
   * @param session - the Session object containing the credentials for the Subject
438
   * @param pid - the object identifier for the given object
439
   * 
440
   * @return checksum - the checksum of the object
441
   * 
442
   * @throws InvalidToken
443
   * @throws ServiceFailure
444
   * @throws NotAuthorized
445
   * @throws NotFound
446
   * @throws NotImplemented
447
   */
448
  @Override
449
  public Checksum getChecksum(Session session, Identifier pid)
450
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
451
    NotImplemented {
452
        
453
    // The lock to be used for thyis identifier
454
    Lock lock = null;
455
    
456
    if (!isAuthorized(session, pid, Permission.READ)) {
457
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
458
    }
459
    
460
    SystemMetadata systemMetadata = null;
461
    Checksum checksum = null;
462
    
463
    try {
464
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
465
        checksum = systemMetadata.getChecksum();
466

    
467
    } catch (Exception e) {
468
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
469
            pid.getValue() + ". The error message was: " + e.getMessage());
470
      
471
    }
472
    
473
    return checksum;
474
  }
475

    
476
  /**
477
   * Resolve the location of a given object
478
   * 
479
   * @param session - the Session object containing the credentials for the Subject
480
   * @param pid - the object identifier for the given object
481
   * 
482
   * @return objectLocationList - the list of nodes known to contain the object
483
   * 
484
   * @throws InvalidToken
485
   * @throws ServiceFailure
486
   * @throws NotAuthorized
487
   * @throws NotFound
488
   * @throws NotImplemented
489
   */
490
  @Override
491
  public ObjectLocationList resolve(Session session, Identifier pid)
492
    throws InvalidToken, ServiceFailure, NotAuthorized,
493
    NotFound, NotImplemented {
494

    
495
    throw new NotImplemented("4131", "resolve not implemented");
496

    
497
  }
498

    
499
  /**
500
   * Search the metadata catalog for identifiers that match the criteria
501
   * 
502
   * @param session - the Session object containing the credentials for the Subject
503
   * @param queryType - An identifier for the type of query expression 
504
   *                    provided in the query
505
   * @param query -  The criteria for matching the characteristics of the 
506
   *                 metadata objects of interest
507
   * 
508
   * @return objectList - the list of objects matching the criteria
509
   * 
510
   * @throws InvalidToken
511
   * @throws ServiceFailure
512
   * @throws NotAuthorized
513
   * @throws InvalidRequest
514
   * @throws NotImplemented
515
   */
516
  @Override
517
  public ObjectList search(Session session, String queryType, String query)
518
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
519
    NotImplemented {
520

    
521
    ObjectList objectList = null;
522
    try {
523
        objectList = 
524
          IdentifierManager.getInstance().querySystemMetadata(
525
              null, //startTime, 
526
              null, //endTime,
527
              null, //objectFormat, 
528
              false, //replicaStatus, 
529
              0, //start, 
530
              -1 //count
531
              );
532
        
533
    } catch (Exception e) {
534
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
535
    }
536

    
537
      return objectList;
538
      
539
    //throw new NotImplemented("4281", "search not implemented");
540
    
541
    // the code block below is from an older implementation
542
    
543
    /*  This block commented out because of the EcoGrid circular dependency.
544
         *  For now, query will not be supported until the circularity can be
545
         *  resolved, probably by moving the ecogrid query syntax transformers
546
         *  directly into the Metacat codebase.  MBJ 2010-02-03
547
         
548
        try {
549
            EcogridQueryParser parser = new EcogridQueryParser(request
550
                    .getReader());
551
            parser.parseXML();
552
            QueryType queryType = parser.getEcogridQuery();
553
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
554
                new EcogridJavaToMetacatJavaQueryTransformer();
555
            QuerySpecification metacatQuery = queryTransformer
556
                    .transform(queryType);
557

    
558
            DBQuery metacat = new DBQuery();
559

    
560
            boolean useXMLIndex = (new Boolean(PropertyService
561
                    .getProperty("database.usexmlindex"))).booleanValue();
562
            String xmlquery = "query"; // we don't care the query in resultset,
563
            // the query can be anything
564
            PrintWriter out = null; // we don't want metacat result, so set out null
565

    
566
            // parameter: queryspecification, user, group, usingIndexOrNot
567
            StringBuffer result = metacat.createResultDocument(xmlquery,
568
                    metacatQuery, out, username, groupNames, useXMLIndex);
569

    
570
            // create result set transfer       
571
            String saxparser = PropertyService.getProperty("xml.saxparser");
572
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
573
                    new StringReader(result.toString()), saxparser, queryType
574
                            .getNamespace().get_value());
575
            ResultsetType records = metacatResultsetParser.getEcogridResult();
576

    
577
            System.out
578
                    .println(EcogridResultsetTransformer.toXMLString(records));
579
            response.setContentType("text/xml");
580
            out = response.getWriter();
581
            out.print(EcogridResultsetTransformer.toXMLString(records));
582

    
583
        } catch (Exception e) {
584
            e.printStackTrace();
585
        }*/
586
    
587

    
588
  }
589
  
590
  /**
591
   * Returns the object format registered in the DataONE Object Format 
592
   * Vocabulary for the given format identifier
593
   * 
594
   * @param fmtid - the identifier of the format requested
595
   * 
596
   * @return objectFormat - the object format requested
597
   * 
598
   * @throws ServiceFailure
599
   * @throws NotFound
600
   * @throws InsufficientResources
601
   * @throws NotImplemented
602
   */
603
  @Override
604
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
605
    throws ServiceFailure, NotFound, NotImplemented {
606
     
607
      return ObjectFormatService.getInstance().getFormat(fmtid);
608
      
609
  }
610

    
611
  /**
612
   * Returns a list of all object formats registered in the DataONE Object 
613
   * Format Vocabulary
614
    * 
615
   * @return objectFormatList - The list of object formats registered in 
616
   *                            the DataONE Object Format Vocabulary
617
   * 
618
   * @throws ServiceFailure
619
   * @throws NotImplemented
620
   * @throws InsufficientResources
621
   */
622
  @Override
623
  public ObjectFormatList listFormats() 
624
    throws ServiceFailure, NotImplemented {
625

    
626
    return ObjectFormatService.getInstance().listFormats();
627
  }
628

    
629
  /**
630
   * Returns a list of nodes that have been registered with the DataONE infrastructure
631
    * 
632
   * @return nodeList - List of nodes from the registry
633
   * 
634
   * @throws ServiceFailure
635
   * @throws NotImplemented
636
   */
637
  @Override
638
  public NodeList listNodes() 
639
    throws NotImplemented, ServiceFailure {
640

    
641
    throw new NotImplemented("4800", "listNodes not implemented");
642
  }
643

    
644
  /**
645
   * Provides a mechanism for adding system metadata independently of its 
646
   * associated object, such as when adding system metadata for data objects.
647
    * 
648
   * @param session - the Session object containing the credentials for the Subject
649
   * @param pid - The identifier of the object to register the system metadata against
650
   * @param sysmeta - The system metadata to be registered
651
   * 
652
   * @return true if the registration succeeds
653
   * 
654
   * @throws NotImplemented
655
   * @throws NotAuthorized
656
   * @throws ServiceFailure
657
   * @throws InvalidRequest
658
   * @throws InvalidSystemMetadata
659
   */
660
  @Override
661
  public Identifier registerSystemMetadata(Session session, Identifier pid,
662
      SystemMetadata sysmeta) 
663
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
664
      InvalidSystemMetadata {
665

    
666
      // The lock to be used for this identifier
667
      Lock lock = null;
668

    
669
      // TODO: control who can call this?
670
      if (session == null) {
671
          //TODO: many of the thrown exceptions do not use the correct error codes
672
          //check these against the docs and correct them
673
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
674
                  "  If you are not logged in, please do so and retry the request.");
675
      }
676
      
677
      // verify that guid == SystemMetadata.getIdentifier()
678
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
679
          "|" + sysmeta.getIdentifier().getValue());
680
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
681
          throw new InvalidRequest("4863", 
682
              "The identifier in method call (" + pid.getValue() + 
683
              ") does not match identifier in system metadata (" +
684
              sysmeta.getIdentifier().getValue() + ").");
685
      }
686

    
687
      try {
688
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
689
          lock.lock();
690
          logMetacat.debug("Checking if identifier exists...");
691
          // Check that the identifier does not already exist
692
          if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
693
              throw new InvalidRequest("4863", 
694
                  "The identifier is already in use by an existing object.");
695
          
696
          }
697
          
698
          // insert the system metadata into the object store
699
          logMetacat.debug("Starting to insert SystemMetadata...");
700
          try {
701
              sysmeta.setSerialVersion(BigInteger.ONE);
702
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
703
              HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
704
              
705
          } catch (Exception e) {
706
            logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
707
              throw new ServiceFailure("4862", "Error inserting system metadata: " + 
708
                  e.getClass() + ": " + e.getMessage());
709
              
710
          }
711
          
712
      } catch (Exception e) {
713
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
714
                  e.getClass() + ": " + e.getMessage());
715
          
716
      }  finally {
717
          lock.unlock();
718
          logMetacat.debug("Unlocked identifier " + pid.getValue());
719
          
720
      }
721

    
722
      
723
      logMetacat.debug("Returning from registerSystemMetadata");
724
      EventLog.getInstance().log(request.getRemoteAddr(), 
725
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
726
          pid.getValue(), "registerSystemMetadata");
727
      return pid;
728
  }
729
  
730
  /**
731
   * Given an optional scope and format, reserves and returns an identifier 
732
   * within that scope and format that is unique and will not be 
733
   * used by any other sessions. 
734
    * 
735
   * @param session - the Session object containing the credentials for the Subject
736
   * @param pid - The identifier of the object to register the system metadata against
737
   * @param scope - An optional string to be used to qualify the scope of 
738
   *                the identifier namespace, which is applied differently 
739
   *                depending on the format requested. If scope is not 
740
   *                supplied, a default scope will be used.
741
   * @param format - The optional name of the identifier format to be used, 
742
   *                  drawn from a DataONE-specific vocabulary of identifier 
743
   *                 format names, including several common syntaxes such 
744
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
745
   *                 format is not supplied by the caller, the CN service 
746
   *                 will use a default identifier format, which may change 
747
   *                 over time.
748
   * 
749
   * @return true if the registration succeeds
750
   * 
751
   * @throws InvalidToken
752
   * @throws ServiceFailure
753
   * @throws NotAuthorized
754
   * @throws IdentifierNotUnique
755
   * @throws NotImplemented
756
   */
757
  @Override
758
  public Identifier reserveIdentifier(Session session, Identifier pid)
759
  throws InvalidToken, ServiceFailure,
760
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
761

    
762
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
763
  }
764
  
765
  @Override
766
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
767
  throws InvalidToken, ServiceFailure,
768
        NotAuthorized, NotImplemented, InvalidRequest {
769
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
770
  }
771
  
772
  /**
773
    * Checks whether the pid is reserved by the subject in the session param
774
    * If the reservation is held on the pid by the subject, we return true.
775
    * 
776
   * @param session - the Session object containing the Subject
777
   * @param pid - The identifier to check
778
   * 
779
   * @return true if the reservation exists for the subject/pid
780
   * 
781
   * @throws InvalidToken
782
   * @throws ServiceFailure
783
   * @throws NotFound - when the pid is not found (in use or in reservation)
784
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
785
   * @throws IdentifierNotUnique - when the pid is in use
786
   * @throws NotImplemented
787
   */
788

    
789
  @Override
790
  public boolean hasReservation(Session session, Identifier pid) 
791
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
792
      NotImplemented, InvalidRequest {
793
  
794
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
795
  }
796

    
797
  /**
798
   * Changes ownership (RightsHolder) of the specified object to the 
799
   * subject specified by userId
800
    * 
801
   * @param session - the Session object containing the credentials for the Subject
802
   * @param pid - Identifier of the object to be modified
803
   * @param userId - The subject that will be taking ownership of the specified object.
804
   *
805
   * @return pid - the identifier of the modified object
806
   * 
807
   * @throws ServiceFailure
808
   * @throws InvalidToken
809
   * @throws NotFound
810
   * @throws NotAuthorized
811
   * @throws NotImplemented
812
   * @throws InvalidRequest
813
   */  
814
  @Override
815
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
816
      long serialVersion)
817
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
818
      NotImplemented, InvalidRequest {
819
      
820
      // The lock to be used for this identifier
821
      Lock lock = null;
822

    
823
      // get the subject
824
      Subject subject = session.getSubject();
825
      
826
      // are we allowed to do this?
827
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
828
        throw new NotAuthorized("4440", "not allowed by " + subject.getValue() + " on " + pid.getValue());  
829
      }
830
      
831
      SystemMetadata systemMetadata = null;
832
      try {
833
          lock = HazelcastService.getInstance().getLock(pid.getValue());
834
          
835
          try {
836
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
837
              
838
              // does the request have the most current system metadata?
839
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
840
                 String msg = "The requested system metadata version number " + 
841
                     serialVersion + " differs from the current version at " +
842
                     systemMetadata.getSerialVersion().longValue() +
843
                     ". Please get the latest copy in order to modify it.";
844
                 throw new InvalidRequest("4442", msg);
845
              }
846
              
847
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
848
              throw new NotFound("4460", "No record found for: " + pid.getValue());
849
              
850
          }
851
              
852
          // set the new rights holder
853
          systemMetadata.setRightsHolder(userId);
854
          
855
          // update the metadata
856
          try {
857
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
858
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
859
              HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
860
              
861
          } catch (Exception e) {
862
          throw new ServiceFailure("4490", e.getMessage());
863
          
864
          }
865
          
866
      } catch (Exception e) {
867
          throw new ServiceFailure("4490", e.getMessage());
868
          
869
      } finally {
870
          lock.unlock();
871
          logMetacat.debug("Unlocked identifier " + pid.getValue());
872
      
873
      }
874
      
875
    return pid;
876
  }
877

    
878
  /**
879
   * Verify that a replication task is authorized by comparing the target node's
880
   * Subject (from the X.509 certificate-derived Session) with the list of 
881
   * subjects in the known, pending replication tasks map.
882
   * 
883
   * @param originatingNodeSession - Session information that contains the 
884
   *                                 identity of the calling user
885
   * @param targetNodeSubject - Subject identifying the target node
886
   * @param pid - the identifier of the object to be replicated
887
   * @param replicatePermission - the execute permission to be granted
888
   * 
889
   * @throws ServiceFailure
890
   * @throws NotImplemented
891
   * @throws InvalidToken
892
   * @throws NotAuthorized
893
   * @throws InvalidRequest
894
   * @throws NotFound
895
   */
896
  @Override
897
  public boolean isNodeAuthorized(Session originatingNodeSession, 
898
    Subject targetNodeSubject, Identifier pid) 
899
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
900
    NotFound, InvalidRequest {
901

    
902
    // The lock to be used for this identifier
903
    Lock lock = null;
904
    
905
    boolean isAllowed = false;
906
    SystemMetadata sysmeta = null;
907
    NodeReference targetNode = null;
908
    
909
    try {
910
      // get the target node reference from the nodes list
911
      CNode cn = D1Client.getCN();
912
      List<Node> nodes = cn.listNodes().getNodeList();
913
      
914
      if ( nodes != null ) {
915
        for (Node node : nodes) {
916
            
917
            for (Subject nodeSubject : node.getSubjectList()) {
918
                
919
                if ( nodeSubject.equals(targetNodeSubject) ) {
920
                    targetNode = node.getIdentifier();
921
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
922
                    break;
923
                }
924
            }
925
            
926
            if ( targetNode != null) { break; }
927
        }
928
        
929
      } else {
930
          String msg = "Couldn't get the node list from the CN";
931
          logMetacat.debug(msg);
932
          throw new ServiceFailure("4872", msg);
933
          
934
      }
935
      
936
      // can't find a node listed with the given subject
937
      if ( targetNode == null ) {
938
          String msg = "There is no Member Node registered with a node subject " +
939
              "matching " + targetNodeSubject.getValue();
940
          logMetacat.info(msg);
941
          throw new ServiceFailure("4872", msg);
942
          
943
      }
944
      
945
      //lock, get, and unlock the pid
946
      lock = HazelcastService.getInstance().getLock(pid.getValue());
947
      lock.lock();
948
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
949
      
950
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
951

    
952
      if ( sysmeta != null ) {
953
          
954
          List<Replica> replicaList = sysmeta.getReplicaList();
955
          
956
          if ( replicaList != null ) {
957
              
958
              // find the replica with the status set to 'requested'
959
              for (Replica replica : replicaList) {
960
                  ReplicationStatus status = replica.getReplicationStatus();
961
                  NodeReference listedNode = replica.getReplicaMemberNode();
962
                  if ( listedNode != null && targetNode != null ) {
963
                      logMetacat.debug("Comparing " + listedNode.getValue()
964
                              + " to " + targetNode.getValue());
965
                      
966
                      if (listedNode.getValue().equals(targetNode.getValue())
967
                              && status.equals(ReplicationStatus.REQUESTED)) {
968
                          isAllowed = true;
969
                          break;
970

    
971
                      }
972
                  }
973
              }
974
          }
975
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
976
              "to replicate: " + isAllowed + " for " + pid.getValue());
977

    
978
          
979
      } else {
980
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
981
          " is null.");          
982
          
983
      }
984

    
985
    } catch (RuntimeException e) {
986
    	  ServiceFailure sf = new ServiceFailure("4872", 
987
                "Runtime Exception: Couldn't determine if node is allowed: " + 
988
                e.getCause().getMessage());
989
    	  sf.initCause(e);
990
        throw sf;
991
        
992
    } finally {
993
      // always unlock the pid
994
      lock.unlock();
995
      logMetacat.debug("Unlocked identifier " + pid.getValue());
996

    
997
    }
998
      
999
    return isAllowed;
1000
    
1001
  }
1002

    
1003
  /**
1004
   * Adds a new object to the Node, where the object is a science metadata object.
1005
   * 
1006
   * @param session - the Session object containing the credentials for the Subject
1007
   * @param pid - The object identifier to be created
1008
   * @param object - the object bytes
1009
   * @param sysmeta - the system metadata that describes the object  
1010
   * 
1011
   * @return pid - the object identifier created
1012
   * 
1013
   * @throws InvalidToken
1014
   * @throws ServiceFailure
1015
   * @throws NotAuthorized
1016
   * @throws IdentifierNotUnique
1017
   * @throws UnsupportedType
1018
   * @throws InsufficientResources
1019
   * @throws InvalidSystemMetadata
1020
   * @throws NotImplemented
1021
   * @throws InvalidRequest
1022
   */
1023
  public Identifier create(Session session, Identifier pid, InputStream object,
1024
    SystemMetadata sysmeta) 
1025
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1026
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1027
    NotImplemented, InvalidRequest {
1028
      
1029
      
1030
      // The lock to be used for this identifier
1031
      Lock lock = null;
1032
      
1033
      try {
1034
        // are we allowed?
1035
          boolean isAllowed = false;
1036
          CNode cn = D1Client.getCN();
1037
          NodeList nodeList = cn.listNodes();
1038
          
1039
          for (Node node : nodeList.getNodeList()) {
1040
              if ( node.getType().equals(NodeType.CN) ) {
1041
                  
1042
                  List<Subject> subjects = node.getSubjectList();
1043
                  for (Subject subject : subjects) {
1044
                     if (subject.equals(session.getSubject())) {
1045
                         isAllowed = true;
1046
                         break;
1047
                     }
1048
                  }
1049
              }
1050
          }
1051

    
1052
          // proceed if we're called by a CN
1053
          if ( isAllowed ) {
1054
              // create the coordinating node version of the document      
1055
              lock = HazelcastService.getInstance().getLock(pid.getValue());
1056
              lock.lock();
1057
              sysmeta.setSerialVersion(BigInteger.ONE);
1058
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1059
              pid = super.create(session, pid, object, sysmeta);
1060

    
1061
          } else {
1062
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1063
                  " isn't allowed to call create() on a Coordinating Node.";
1064
              logMetacat.info(msg);
1065
              throw new NotAuthorized("1100", msg);
1066
          }
1067
          
1068
      } catch (RuntimeException e) {
1069
          // Convert Hazelcast runtime exceptions to service failures
1070
          String msg = "There was a problem creating the object identified by " +
1071
              pid.getValue() + ". There error message was: " + e.getMessage();
1072
          throw new ServiceFailure("4893", msg);
1073
          
1074
      } finally {
1075
    	  if (lock != null) {
1076
	          lock.unlock();
1077
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1078
    	  }
1079
      }
1080
      
1081
      return pid;
1082

    
1083
  }
1084

    
1085
  /**
1086
   * Set access for a given object using the object identifier and a Subject
1087
   * under a given Session.
1088
   * 
1089
   * @param session - the Session object containing the credentials for the Subject
1090
   * @param pid - the object identifier for the given object to apply the policy
1091
   * @param policy - the access policy to be applied
1092
   * 
1093
   * @return true if the application of the policy succeeds
1094
   * @throws InvalidToken
1095
   * @throws ServiceFailure
1096
   * @throws NotFound
1097
   * @throws NotAuthorized
1098
   * @throws NotImplemented
1099
   * @throws InvalidRequest
1100
   */
1101
  public boolean setAccessPolicy(Session session, Identifier pid, 
1102
      AccessPolicy accessPolicy, long serialVersion) 
1103
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1104
      NotImplemented, InvalidRequest {
1105
      
1106
      // The lock to be used for this identifier
1107
      Lock lock = null;
1108
      
1109
      boolean success = false;
1110
      
1111
      // get the subject
1112
      Subject subject = session.getSubject();
1113
      
1114
      // are we allowed to do this?
1115
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1116
          throw new NotAuthorized("4420", "not allowed by " + subject.getValue() + 
1117
          " on " + pid.getValue());  
1118
      }
1119
      
1120
      SystemMetadata systemMetadata = null;
1121
      try {
1122
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1123
          lock.lock();
1124
          
1125
          try {
1126
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1127

    
1128
              // does the request have the most current system metadata?
1129
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1130
                 String msg = "The requested system metadata version number " + 
1131
                     serialVersion + " differs from the current version at " +
1132
                     systemMetadata.getSerialVersion().longValue() +
1133
                     ". Please get the latest copy in order to modify it.";
1134
                 throw new InvalidRequest("4402", msg);
1135
              }
1136
              
1137
          } catch (Exception e) {
1138
              // convert Hazelcast RuntimeException to NotFound
1139
              throw new NotFound("4400", "No record found for: " + pid);
1140
            
1141
          }
1142
              
1143
          // set the access policy
1144
          systemMetadata.setAccessPolicy(accessPolicy);
1145
          
1146
          // update the system metadata
1147
          try {
1148
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1149
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1150
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1151
            
1152
          } catch (Exception e) {
1153
              // convert Hazelcast RuntimeException to ServiceFailure
1154
              throw new ServiceFailure("4430", e.getMessage());
1155
            
1156
          }
1157
          
1158
      } catch (Exception e) {
1159
          throw new ServiceFailure("4430", e.getMessage());
1160
          
1161
      } finally {
1162
          lock.unlock();
1163
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1164
        
1165
      }
1166

    
1167
    
1168
    // TODO: how do we know if the map was persisted?
1169
    success = true;
1170
    
1171
    return success;
1172
  }
1173

    
1174
  /**
1175
   * Full replacement of replication metadata in the system metadata for the 
1176
   * specified object, changes date system metadata modified
1177
   * 
1178
   * @param session - the Session object containing the credentials for the Subject
1179
   * @param pid - the object identifier for the given object to apply the policy
1180
   * @param replica - the replica to be updated
1181
   * @return
1182
   * @throws NotImplemented
1183
   * @throws NotAuthorized
1184
   * @throws ServiceFailure
1185
   * @throws InvalidRequest
1186
   * @throws NotFound
1187
   */
1188
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1189
      Replica replica, long serialVersion) 
1190
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1191
      NotFound {
1192
      
1193
      // The lock to be used for this identifier
1194
      Lock lock = null;
1195
      
1196
      // get the subject
1197
      Subject subject = session.getSubject();
1198
      
1199
      // are we allowed to do this?
1200
      try {
1201
        // what is the controlling permission?
1202
        if (!isAuthorized(session, pid, Permission.WRITE)) {
1203
            throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1204
            " on " + pid.getValue());  
1205
        }
1206
        
1207
      } catch (InvalidToken e) {
1208
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1209
                  " on " + pid.getValue());  
1210
          
1211
      }
1212

    
1213
      SystemMetadata systemMetadata = null;
1214
      try {
1215
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1216
          lock.lock();
1217

    
1218
          try {      
1219
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1220

    
1221
              // does the request have the most current system metadata?
1222
              if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1223
                 String msg = "The requested system metadata version number " + 
1224
                     serialVersion + " differs from the current version at " +
1225
                     systemMetadata.getSerialVersion().longValue() +
1226
                     ". Please get the latest copy in order to modify it.";
1227
                 throw new InvalidRequest("4853", msg);
1228
              }
1229
              
1230
          } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
1231
            throw new NotFound("4854", "No record found for: " + pid.getValue() +
1232
                " : " + e.getMessage());
1233
            
1234
          }
1235
              
1236
          // set the status for the replica
1237
          List<Replica> replicas = systemMetadata.getReplicaList();
1238
          NodeReference replicaNode = replica.getReplicaMemberNode();
1239
          int index = 0;
1240
          for (Replica listedReplica: replicas) {
1241
              
1242
              // remove the replica that we are replacing
1243
              if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1244
                  replicas.remove(index);
1245
                  break;
1246
                  
1247
              }
1248
              index++;
1249
          }
1250
          
1251
          // add the new replica item
1252
          replicas.add(replica);
1253
          systemMetadata.setReplicaList(replicas);
1254
          
1255
          // update the metadata
1256
          try {
1257
              systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1258
              systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1259
              HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1260
            
1261
          } catch (Exception e) {
1262
              throw new ServiceFailure("4852", e.getMessage());
1263
          
1264
          }
1265
          
1266
    } catch (Exception e) {
1267
        throw new ServiceFailure("4852", e.getMessage());
1268

    
1269
    } finally {
1270
        lock.unlock();
1271
        logMetacat.debug("Unlocked identifier " + pid.getValue());
1272
        
1273
    }
1274
    
1275
      return true;
1276
      
1277
  }
1278
  
1279
    @Override
1280
  public ObjectList listObjects(Session session, Date startTime, 
1281
      Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1282
      Integer start, Integer count)
1283
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1284
      ServiceFailure {
1285
      
1286
      ObjectList objectList = null;
1287
        try {
1288
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1289
        } catch (Exception e) {
1290
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1291
        }
1292

    
1293
        return objectList;
1294
  }
1295

    
1296
	@Override
1297
	public ChecksumAlgorithmList listChecksumAlgorithms()
1298
			throws ServiceFailure, NotImplemented {
1299
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1300
		cal.addAlgorithm("MD5");
1301
		cal.addAlgorithm("SHA-1");
1302
		return null;
1303
	}
1304
    
1305
}
(1-1/5)