Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.Calendar;
29
import java.util.Date;
30
import java.util.List;
31
import java.util.Set;
32

    
33
import javax.servlet.http.HttpServletRequest;
34

    
35
import org.apache.log4j.Logger;
36
import org.dataone.client.CNode;
37
import org.dataone.client.D1Client;
38
import org.dataone.service.cn.v1.CNAuthorization;
39
import org.dataone.service.cn.v1.CNCore;
40
import org.dataone.service.cn.v1.CNRead;
41
import org.dataone.service.cn.v1.CNReplication;
42
import org.dataone.service.exceptions.IdentifierNotUnique;
43
import org.dataone.service.exceptions.InsufficientResources;
44
import org.dataone.service.exceptions.InvalidRequest;
45
import org.dataone.service.exceptions.InvalidSystemMetadata;
46
import org.dataone.service.exceptions.InvalidToken;
47
import org.dataone.service.exceptions.NotAuthorized;
48
import org.dataone.service.exceptions.NotFound;
49
import org.dataone.service.exceptions.NotImplemented;
50
import org.dataone.service.exceptions.ServiceFailure;
51
import org.dataone.service.exceptions.UnsupportedType;
52
import org.dataone.service.types.v1.AccessPolicy;
53
import org.dataone.service.types.v1.Checksum;
54
import org.dataone.service.types.v1.Identifier;
55
import org.dataone.service.types.v1.Node;
56
import org.dataone.service.types.v1.NodeList;
57
import org.dataone.service.types.v1.NodeReference;
58
import org.dataone.service.types.v1.NodeType;
59
import org.dataone.service.types.v1.ObjectFormat;
60
import org.dataone.service.types.v1.ObjectFormatIdentifier;
61
import org.dataone.service.types.v1.ObjectFormatList;
62
import org.dataone.service.types.v1.ObjectList;
63
import org.dataone.service.types.v1.ObjectLocationList;
64
import org.dataone.service.types.v1.Permission;
65
import org.dataone.service.types.v1.Replica;
66
import org.dataone.service.types.v1.ReplicationPolicy;
67
import org.dataone.service.types.v1.ReplicationStatus;
68
import org.dataone.service.types.v1.Session;
69
import org.dataone.service.types.v1.Subject;
70
import org.dataone.service.types.v1.SystemMetadata;
71

    
72
import com.hazelcast.core.ILock;
73

    
74
import edu.ucsb.nceas.metacat.EventLog;
75
import edu.ucsb.nceas.metacat.IdentifierManager;
76
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
77

    
78
/**
79
 * Represents Metacat's implementation of the DataONE Coordinating Node 
80
 * service API. Methods implement the various CN* interfaces, and methods common
81
 * to both Member Node and Coordinating Node interfaces are found in the
82
 * D1NodeService super class.
83
 *
84
 */
85
public class CNodeService extends D1NodeService implements CNAuthorization,
86
    CNCore, CNRead, CNReplication {
87

    
88
  /* the logger instance */
89
  private Logger logMetacat = null;
90

    
91
  /**
92
   * singleton accessor
93
   */
94
  public static CNodeService getInstance(HttpServletRequest request) { 
95
    return new CNodeService(request);
96
  }
97
  
98
  /**
99
   * Constructor, private for singleton access
100
   */
101
  private CNodeService(HttpServletRequest request) {
102
    super(request);
103
    logMetacat = Logger.getLogger(CNodeService.class);
104
        
105
  }
106
    
107
  /**
108
   * Set the replication policy for an object given the object identifier
109
   * 
110
   * @param session - the Session object containing the credentials for the Subject
111
   * @param pid - the object identifier for the given object
112
   * @param policy - the replication policy to be applied
113
   * 
114
   * @return true or false
115
   * 
116
   * @throws NotImplemented
117
   * @throws NotAuthorized
118
   * @throws ServiceFailure
119
   * @throws InvalidRequest
120
   * 
121
   */
122
  @Override
123
  public boolean setReplicationPolicy(Session session, Identifier pid,
124
      ReplicationPolicy policy, long serialVersion) 
125
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
126
      InvalidRequest, InvalidToken {
127
      
128
      // The lock to be used for thyis identifier
129
      ILock lock = null;
130
      
131
      // get the subject
132
      Subject subject = session.getSubject();
133
      
134
      // are we allowed to do this?
135
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
136
        throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION + 
137
            " not allowed by " + subject.getValue() + " on " + pid.getValue());  
138
      }
139
      
140
      SystemMetadata systemMetadata = null;
141
      try {
142
          lock = HazelcastService.getInstance().getLock(pid.getValue());
143
          lock.lock();
144
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
145
        
146

    
147
          // does the request have the most current system metadata?
148
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
149
             String msg = "The requested system metadata version number " + 
150
                 serialVersion + " differs from the current version at " +
151
                 systemMetadata.getSerialVersion().longValue() +
152
                 ". Please get the latest copy in order to modify it.";
153
             throw new InvalidRequest("4883", msg);
154
          }
155
          
156
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
157
        throw new NotFound("4884", "No record found for: " + pid.getValue());
158
        
159
      }
160
          
161
      // set the new policy
162
      systemMetadata.setReplicationPolicy(policy);
163
      
164
      // update the metadata
165
      try {
166
        systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
167
        systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
168
        HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
169
        
170
      } catch (Exception e) {
171
          throw new ServiceFailure("4882", e.getMessage());
172
      
173
      } finally {
174
          lock.unlock();
175
          
176
      }
177
    
178
      return true;
179
  }
180

    
181
  /**
182
   * Set the replication status for an object given the object identifier
183
   * 
184
   * @param session - the Session object containing the credentials for the Subject
185
   * @param pid - the object identifier for the given object
186
   * @param status - the replication status to be applied
187
   * 
188
   * @return true or false
189
   * 
190
   * @throws NotImplemented
191
   * @throws NotAuthorized
192
   * @throws ServiceFailure
193
   * @throws InvalidRequest
194
   * @throws InvalidToken
195
   * @throws NotFound
196
   * 
197
   */
198
  @Override
199
  public boolean setReplicationStatus(Session session, Identifier pid,
200
      NodeReference targetNode, ReplicationStatus status, long serialVersion) 
201
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
202
      InvalidRequest, NotFound {
203
      
204
      // The lock to be used for this identifier
205
      ILock lock = null;
206
      
207
      boolean allowed = false;
208
      int replicaEntryIndex = -1;
209
      List<Replica> replicas = null;
210
      // get the subject
211
      Subject subject = session.getSubject();
212
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
213
          " is " + status.toString());
214
      
215
      SystemMetadata systemMetadata = null;
216
      try {      
217
          lock = HazelcastService.getInstance().getLock(pid.getValue());
218
          lock.lock();
219
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
220

    
221
          if ( systemMetadata == null ) {
222
              logMetacat.debug("systemMetadata is null for " + pid.getValue());
223
              
224
          }
225
          replicas = systemMetadata.getReplicaList();
226
          int count = 0;
227
          
228
          if ( replicas == null || replicas.size() < 1 ) {
229
              logMetacat.debug("no replicas to evaluate");
230
              throw new InvalidRequest("4730", "There are no replicas to update.");
231
              
232
          }
233

    
234
          // find the target replica index in the replica list
235
          for (Replica replica: replicas) {
236
              String replicaNodeStr = replica.getReplicaMemberNode().getValue();
237
              String targetNodeStr = targetNode.getValue();
238
              logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
239
              
240
              if (replicaNodeStr.equals(targetNodeStr)) {
241
                  replicaEntryIndex = count;
242
                  logMetacat.debug("replica entry index is: " + replicaEntryIndex);
243
                  break;
244
              }
245
              count++;
246
              
247
          }
248

    
249
          // are we allowed to do this? only CNs and target MNs are allowed
250
          CNode cn = D1Client.getCN();
251
          List<Node> nodes = cn.listNodes().getNodeList();
252
          
253
          // find the node in the node list
254
          for ( Node node : nodes ) {
255
              
256
              NodeReference nodeReference = node.getIdentifier();
257
              logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
258
              
259
              // allow target MN certs and CN certs
260
              if (targetNode.getValue().equals(nodeReference.getValue()) ||
261
                  node.getType() == NodeType.CN) {
262
                  List<Subject> nodeSubjects = node.getSubjectList();
263
                  
264
                  // check if the session subject is in the node subject list
265
                  for (Subject nodeSubject : nodeSubjects) {
266
                      if ( nodeSubject.equals(subject) ) {
267
                          allowed = true; // subject of session == target node subject
268
                          break;
269
                          
270
                      }
271
                  }                 
272
              }
273
          }
274

    
275
          if ( !allowed ) {
276
              String msg = "The subject identified by " + subject.getValue() +
277
                " does not have permission to set the replication status for " +
278
                "the replica identified by " + targetNode.getValue() + ".";
279
              logMetacat.info(msg);
280
              throw new NotAuthorized("4720", msg);
281
              
282
          }
283
          
284
          // does the request have the most current system metadata?
285
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
286
             String msg = "The requested system metadata version number " + 
287
                 serialVersion + " differs from the current version at " +
288
                 systemMetadata.getSerialVersion().longValue() +
289
                 ". Please get the latest copy in order to modify it.";
290
             throw new InvalidRequest("4730", msg);
291
          }
292
          
293
      } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
294
        throw new NotFound("4740", "No record found for: " + pid.getValue() +
295
            " : " + e.getMessage());
296
        
297
      }
298
          
299
      // set the status for the replica
300
      if ( replicaEntryIndex != -1 ) {
301
          Replica targetReplica = replicas.get(replicaEntryIndex);
302
          targetReplica.setReplicationStatus(status);
303
          logMetacat.debug("Set the replication status for " + 
304
              targetReplica.getReplicaMemberNode().getValue() + " to " +
305
              targetReplica.getReplicationStatus());
306
          
307
      } else {
308
          throw new InvalidRequest("4730", "There are no replicas to update.");
309

    
310
      }
311
      
312
      systemMetadata.setReplicaList(replicas);
313
            
314
      // update the metadata
315
      try {
316
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
317
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
318
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
319
        
320
      } catch (Exception e) {
321
          throw new ServiceFailure("4700", e.getMessage());
322
      
323
      } finally {
324
          lock.unlock();
325
          
326
      }
327
      
328
      return true;
329
  }
330

    
331
  /**
332
   * Test that the specified relationship between pidOfSubject and pidOfObject exists
333
   * 
334
   * @param session - the Session object containing the credentials for the Subject
335
   * @param node - the node information for the given node be modified
336
   * 
337
   * @return true if the relationship exists
338
   * 
339
   * @throws InvalidToken
340
   * @throws ServiceFailure
341
   * @throws NotAuthorized
342
   * @throws NotFound
343
   * @throws InvalidRequest
344
   * @throws NotImplemented
345
   */
346
  @Override
347
  public boolean assertRelation(Session session, Identifier pidOfSubject, 
348
    String relationship, Identifier pidOfObject) 
349
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
350
    InvalidRequest, NotImplemented {
351
    
352
    // The lock to be used for thyis identifier
353
    ILock lock = null;
354

    
355
    boolean asserted = false;
356
        
357
    // are we allowed to do this?
358
    if (!isAuthorized(session, pidOfSubject, Permission.READ)) {
359
      throw new NotAuthorized("4881", Permission.READ + " not allowed on " + pidOfSubject.getValue());  
360
    }
361
    
362
    SystemMetadata systemMetadata = null;
363
    try {
364
        lock = HazelcastService.getInstance().getLock(pidOfSubject.getValue());
365
        lock.lock();
366
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pidOfSubject);
367
        
368
        
369
        // check relationships
370
        // TODO: use ORE map
371
        if (relationship.equalsIgnoreCase("describes")) {
372
          
373
        }
374
        
375
        if (relationship.equalsIgnoreCase("describedBy")) {
376
          
377
        }
378
        
379
        if (relationship.equalsIgnoreCase("derivedFrom")) {
380
          
381
        }
382
        
383
        if (relationship.equalsIgnoreCase("obsoletes")) {
384
            Identifier pid = systemMetadata.getObsoletes();
385
            if (pid.getValue().equals(pidOfObject.getValue())) {
386
              asserted = true;
387
            
388
        }
389
          //return systemMetadata.getObsoleteList().contains(pidOfObject);
390
        }
391
        if (relationship.equalsIgnoreCase("obsoletedBy")) {
392
            Identifier pid = systemMetadata.getObsoletedBy();
393
            if (pid.getValue().equals(pidOfObject.getValue())) {
394
              asserted = true;
395
            
396
        }
397
          //return systemMetadata.getObsoletedByList().contains(pidOfObject);
398
        }
399
              
400
    } catch (Exception e) {
401
        throw new ServiceFailure("4270", "Could not assert relation for: " + 
402
            pidOfSubject.getValue() +
403
            ". The error message was: " + e.getMessage());
404
      
405
    } finally {
406
        lock.unlock();
407

    
408
    }
409
        
410
    return asserted;
411
  }
412
  
413
  /**
414
   * Return the checksum of the object given the identifier 
415
   * 
416
   * @param session - the Session object containing the credentials for the Subject
417
   * @param pid - the object identifier for the given object
418
   * 
419
   * @return checksum - the checksum of the object
420
   * 
421
   * @throws InvalidToken
422
   * @throws ServiceFailure
423
   * @throws NotAuthorized
424
   * @throws NotFound
425
   * @throws NotImplemented
426
   */
427
  @Override
428
  public Checksum getChecksum(Session session, Identifier pid)
429
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
430
    NotImplemented {
431
        
432
    // The lock to be used for thyis identifier
433
    ILock lock = null;
434
    
435
    if (!isAuthorized(session, pid, Permission.READ)) {
436
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
437
    }
438
    
439
    SystemMetadata systemMetadata = null;
440
    Checksum checksum = null;
441
    
442
    try {
443
        lock = HazelcastService.getInstance().getLock(pid.getValue());
444
        lock.lock();
445
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
446
        checksum = systemMetadata.getChecksum();
447

    
448
    } catch (Exception e) {
449
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
450
            pid.getValue() + ". The error message was: " + e.getMessage());
451
      
452
    } finally {
453
        lock.unlock();
454
        
455
    }
456
    
457
    return checksum;
458
  }
459

    
460
  /**
461
   * Resolve the location of a given object
462
   * 
463
   * @param session - the Session object containing the credentials for the Subject
464
   * @param pid - the object identifier for the given object
465
   * 
466
   * @return objectLocationList - the list of nodes known to contain the object
467
   * 
468
   * @throws InvalidToken
469
   * @throws ServiceFailure
470
   * @throws NotAuthorized
471
   * @throws NotFound
472
   * @throws NotImplemented
473
   */
474
  @Override
475
  public ObjectLocationList resolve(Session session, Identifier pid)
476
    throws InvalidToken, ServiceFailure, NotAuthorized,
477
    NotFound, NotImplemented {
478

    
479
    throw new NotImplemented("4131", "resolve not implemented");
480

    
481
  }
482

    
483
  /**
484
   * Search the metadata catalog for identifiers that match the criteria
485
   * 
486
   * @param session - the Session object containing the credentials for the Subject
487
   * @param queryType - An identifier for the type of query expression 
488
   *                    provided in the query
489
   * @param query -  The criteria for matching the characteristics of the 
490
   *                 metadata objects of interest
491
   * 
492
   * @return objectList - the list of objects matching the criteria
493
   * 
494
   * @throws InvalidToken
495
   * @throws ServiceFailure
496
   * @throws NotAuthorized
497
   * @throws InvalidRequest
498
   * @throws NotImplemented
499
   */
500
  @Override
501
  public ObjectList search(Session session, String queryType, String query)
502
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
503
    NotImplemented {
504

    
505
    ObjectList objectList = null;
506
    try {
507
        objectList = 
508
          IdentifierManager.getInstance().querySystemMetadata(
509
              null, //startTime, 
510
              null, //endTime,
511
              null, //objectFormat, 
512
              false, //replicaStatus, 
513
              0, //start, 
514
              -1 //count
515
              );
516
        
517
    } catch (Exception e) {
518
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
519
    }
520

    
521
      return objectList;
522
      
523
    //throw new NotImplemented("4281", "search not implemented");
524
    
525
    // the code block below is from an older implementation
526
    
527
    /*  This block commented out because of the EcoGrid circular dependency.
528
         *  For now, query will not be supported until the circularity can be
529
         *  resolved, probably by moving the ecogrid query syntax transformers
530
         *  directly into the Metacat codebase.  MBJ 2010-02-03
531
         
532
        try {
533
            EcogridQueryParser parser = new EcogridQueryParser(request
534
                    .getReader());
535
            parser.parseXML();
536
            QueryType queryType = parser.getEcogridQuery();
537
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
538
                new EcogridJavaToMetacatJavaQueryTransformer();
539
            QuerySpecification metacatQuery = queryTransformer
540
                    .transform(queryType);
541

    
542
            DBQuery metacat = new DBQuery();
543

    
544
            boolean useXMLIndex = (new Boolean(PropertyService
545
                    .getProperty("database.usexmlindex"))).booleanValue();
546
            String xmlquery = "query"; // we don't care the query in resultset,
547
            // the query can be anything
548
            PrintWriter out = null; // we don't want metacat result, so set out null
549

    
550
            // parameter: queryspecification, user, group, usingIndexOrNot
551
            StringBuffer result = metacat.createResultDocument(xmlquery,
552
                    metacatQuery, out, username, groupNames, useXMLIndex);
553

    
554
            // create result set transfer       
555
            String saxparser = PropertyService.getProperty("xml.saxparser");
556
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
557
                    new StringReader(result.toString()), saxparser, queryType
558
                            .getNamespace().get_value());
559
            ResultsetType records = metacatResultsetParser.getEcogridResult();
560

    
561
            System.out
562
                    .println(EcogridResultsetTransformer.toXMLString(records));
563
            response.setContentType("text/xml");
564
            out = response.getWriter();
565
            out.print(EcogridResultsetTransformer.toXMLString(records));
566

    
567
        } catch (Exception e) {
568
            e.printStackTrace();
569
        }*/
570
    
571

    
572
  }
573
  
574
  /**
575
   * Returns the object format registered in the DataONE Object Format 
576
   * Vocabulary for the given format identifier
577
   * 
578
   * @param fmtid - the identifier of the format requested
579
   * 
580
   * @return objectFormat - the object format requested
581
   * 
582
   * @throws ServiceFailure
583
   * @throws NotFound
584
   * @throws InsufficientResources
585
   * @throws NotImplemented
586
   */
587
  @Override
588
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
589
    throws ServiceFailure, NotFound, InsufficientResources,
590
    NotImplemented {
591
     
592
      return ObjectFormatService.getInstance().getFormat(fmtid);
593
      
594
  }
595

    
596
  /**
597
   * Returns a list of all object formats registered in the DataONE Object 
598
   * Format Vocabulary
599
    * 
600
   * @return objectFormatList - The list of object formats registered in 
601
   *                            the DataONE Object Format Vocabulary
602
   * 
603
   * @throws ServiceFailure
604
   * @throws NotImplemented
605
   * @throws InsufficientResources
606
   */
607
  @Override
608
  public ObjectFormatList listFormats() 
609
    throws ServiceFailure, InsufficientResources, 
610
    NotImplemented {
611

    
612
    return ObjectFormatService.getInstance().listFormats();
613
  }
614

    
615
  /**
616
   * Returns a list of nodes that have been registered with the DataONE infrastructure
617
    * 
618
   * @return nodeList - List of nodes from the registry
619
   * 
620
   * @throws ServiceFailure
621
   * @throws NotImplemented
622
   */
623
  @Override
624
  public NodeList listNodes() 
625
    throws NotImplemented, ServiceFailure {
626

    
627
    throw new NotImplemented("4800", "listNodes not implemented");
628
  }
629

    
630
  /**
631
   * Provides a mechanism for adding system metadata independently of its 
632
   * associated object, such as when adding system metadata for data objects.
633
    * 
634
   * @param session - the Session object containing the credentials for the Subject
635
   * @param pid - The identifier of the object to register the system metadata against
636
   * @param sysmeta - The system metadata to be registered
637
   * 
638
   * @return true if the registration succeeds
639
   * 
640
   * @throws NotImplemented
641
   * @throws NotAuthorized
642
   * @throws ServiceFailure
643
   * @throws InvalidRequest
644
   * @throws InvalidSystemMetadata
645
   */
646
  @Override
647
  public Identifier registerSystemMetadata(Session session, Identifier pid,
648
      SystemMetadata sysmeta) 
649
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
650
      InvalidSystemMetadata {
651

    
652
      // The lock to be used for this identifier
653
      ILock lock = null;
654

    
655
      // TODO: control who can call this?
656
      if (session == null) {
657
          //TODO: many of the thrown exceptions do not use the correct error codes
658
          //check these against the docs and correct them
659
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
660
                  "  If you are not logged in, please do so and retry the request.");
661
      }
662
      
663
      // verify that guid == SystemMetadata.getIdentifier()
664
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
665
          "|" + sysmeta.getIdentifier().getValue());
666
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
667
          throw new InvalidRequest("4863", 
668
              "The identifier in method call (" + pid.getValue() + 
669
              ") does not match identifier in system metadata (" +
670
              sysmeta.getIdentifier().getValue() + ").");
671
      }
672

    
673
      logMetacat.debug("Checking if identifier exists...");
674
      // Check that the identifier does not already exist
675
      if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
676
          throw new InvalidRequest("4863", 
677
              "The identifier is already in use by an existing object.");
678
      
679
      }
680
      
681
      // insert the system metadata into the object store
682
      logMetacat.debug("Starting to insert SystemMetadata...");
683
      try {
684
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
685
          sysmeta.setSerialVersion(BigInteger.ONE);
686
          sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
687
          HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
688
          
689
      } catch (Exception e) {
690
        logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
691
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
692
              e.getClass() + ": " + e.getMessage());
693
          
694
      } finally {
695
        lock.unlock();
696

    
697
      }
698
      
699
      logMetacat.debug("Returning from registerSystemMetadata");
700
      EventLog.getInstance().log(request.getRemoteAddr(), 
701
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
702
          pid.getValue(), "registerSystemMetadata");
703
      return pid;
704
  }
705
  
706
  /**
707
   * Given an optional scope and format, reserves and returns an identifier 
708
   * within that scope and format that is unique and will not be 
709
   * used by any other sessions. 
710
    * 
711
   * @param session - the Session object containing the credentials for the Subject
712
   * @param pid - The identifier of the object to register the system metadata against
713
   * @param scope - An optional string to be used to qualify the scope of 
714
   *                the identifier namespace, which is applied differently 
715
   *                depending on the format requested. If scope is not 
716
   *                supplied, a default scope will be used.
717
   * @param format - The optional name of the identifier format to be used, 
718
   *                  drawn from a DataONE-specific vocabulary of identifier 
719
   *                 format names, including several common syntaxes such 
720
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
721
   *                 format is not supplied by the caller, the CN service 
722
   *                 will use a default identifier format, which may change 
723
   *                 over time.
724
   * 
725
   * @return true if the registration succeeds
726
   * 
727
   * @throws InvalidToken
728
   * @throws ServiceFailure
729
   * @throws NotAuthorized
730
   * @throws IdentifierNotUnique
731
   * @throws NotImplemented
732
   */
733
  @Override
734
  public Identifier reserveIdentifier(Session session, Identifier pid)
735
  throws InvalidToken, ServiceFailure,
736
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
737

    
738
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
739
  }
740
  
741
  @Override
742
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
743
  throws InvalidToken, ServiceFailure,
744
        NotAuthorized, NotImplemented, InvalidRequest {
745
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
746
  }
747
  
748
  /**
749
    * Checks whether the pid is reserved by the subject in the session param
750
    * If the reservation is held on the pid by the subject, we return true.
751
    * 
752
   * @param session - the Session object containing the Subject
753
   * @param pid - The identifier to check
754
   * 
755
   * @return true if the reservation exists for the subject/pid
756
   * 
757
   * @throws InvalidToken
758
   * @throws ServiceFailure
759
   * @throws NotFound - when the pid is not found (in use or in reservation)
760
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
761
   * @throws IdentifierNotUnique - when the pid is in use
762
   * @throws NotImplemented
763
   */
764

    
765
  @Override
766
  public boolean hasReservation(Session session, Identifier pid) 
767
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
768
      NotImplemented, InvalidRequest {
769
  
770
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
771
  }
772

    
773
  /**
774
   * Changes ownership (RightsHolder) of the specified object to the 
775
   * subject specified by userId
776
    * 
777
   * @param session - the Session object containing the credentials for the Subject
778
   * @param pid - Identifier of the object to be modified
779
   * @param userId - The subject that will be taking ownership of the specified object.
780
   *
781
   * @return pid - the identifier of the modified object
782
   * 
783
   * @throws ServiceFailure
784
   * @throws InvalidToken
785
   * @throws NotFound
786
   * @throws NotAuthorized
787
   * @throws NotImplemented
788
   * @throws InvalidRequest
789
   */  
790
  @Override
791
  public Identifier setOwner(Session session, Identifier pid, Subject userId,
792
      long serialVersion)
793
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
794
      NotImplemented, InvalidRequest {
795
      
796
      // The lock to be used for this identifier
797
      ILock lock = null;
798

    
799
      // get the subject
800
      Subject subject = session.getSubject();
801
      
802
      // are we allowed to do this?
803
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
804
        throw new NotAuthorized("4440", "not allowed by " + subject.getValue() + " on " + pid.getValue());  
805
      }
806
      
807
      SystemMetadata systemMetadata = null;
808
      try {
809
          lock = HazelcastService.getInstance().getLock(pid.getValue());
810
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
811
          
812
          // does the request have the most current system metadata?
813
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
814
             String msg = "The requested system metadata version number " + 
815
                 serialVersion + " differs from the current version at " +
816
                 systemMetadata.getSerialVersion().longValue() +
817
                 ". Please get the latest copy in order to modify it.";
818
             throw new InvalidRequest("4442", msg);
819
          }
820
          
821
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
822
          throw new NotFound("4460", "No record found for: " + pid.getValue());
823
          
824
      }
825
          
826
      // set the new rights holder
827
      systemMetadata.setRightsHolder(userId);
828
      
829
      // update the metadata
830
      try {
831
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
832
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
833
          HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
834
          
835
      } catch (Exception e) {
836
      throw new ServiceFailure("4490", e.getMessage());
837
      
838
      } finally {
839
          lock.unlock();
840
      }
841
      
842
      return pid;
843
  }
844

    
845
  /**
846
   * Verify that a replication task is authorized by comparing the target node's
847
   * Subject (from the X.509 certificate-derived Session) with the list of 
848
   * subjects in the known, pending replication tasks map.
849
   * 
850
   * @param originatingNodeSession - Session information that contains the 
851
   *                                 identity of the calling user
852
   * @param targetNodeSubject - Subject identifying the target node
853
   * @param pid - the identifier of the object to be replicated
854
   * @param replicatePermission - the execute permission to be granted
855
   * 
856
   * @throws ServiceFailure
857
   * @throws NotImplemented
858
   * @throws InvalidToken
859
   * @throws NotAuthorized
860
   * @throws InvalidRequest
861
   * @throws NotFound
862
   */
863
  @Override
864
  public boolean isNodeAuthorized(Session originatingNodeSession, 
865
    Subject targetNodeSubject, Identifier pid, Permission replicatePermission) 
866
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
867
    NotFound, InvalidRequest {
868

    
869
    // The lock to be used for this identifier
870
    ILock lock = null;
871
    
872
    boolean isAllowed = false;
873
    SystemMetadata sysmeta = null;
874
    NodeReference targetNode = null;
875
    
876
    try {
877
      // get the target node reference from the nodes list
878
      CNode cn = D1Client.getCN();
879
      List<Node> nodes = cn.listNodes().getNodeList();
880
      
881
      if ( nodes != null ) {
882
        for (Node node : nodes) {
883
            
884
            for (Subject nodeSubject : node.getSubjectList()) {
885
                
886
                if ( nodeSubject.equals(targetNodeSubject) ) {
887
                    targetNode = node.getIdentifier();
888
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
889
                    break;
890
                }
891
            }
892
            
893
            if ( targetNode != null) { break; }
894
        }
895
        
896
      } else {
897
          String msg = "Couldn't get the node list from the CN";
898
          logMetacat.debug(msg);
899
          throw new ServiceFailure("4872", msg);
900
          
901
      }
902
      //lock, get, and unlock the pid
903
      lock = HazelcastService.getInstance().getLock(pid.getValue());
904
      lock.lock();
905
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
906
      
907
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
908

    
909
      if ( sysmeta != null ) {
910
          
911
          List<Replica> replicaList = sysmeta.getReplicaList();
912
          
913
          if ( replicaList != null ) {
914
              
915
              // find the replica with the status set to 'requested'
916
              for (Replica replica : replicaList) {
917
                  ReplicationStatus status = replica.getReplicationStatus();
918
                  NodeReference listedNode = replica.getReplicaMemberNode();
919
                  logMetacat.debug("Comparing " + listedNode.getValue() + " to " + 
920
                      targetNode.getValue());
921
                  if (listedNode.getValue().equals(targetNode.getValue())
922
                          && status.equals(ReplicationStatus.REQUESTED)) {
923
                      isAllowed = true;
924
                      break;
925

    
926
                  }
927
              }
928
          }
929
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
930
              "to replicate: " + isAllowed + " for " + pid.getValue());
931

    
932
          
933
      } else {
934
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
935
          " is null.");          
936
          
937
      }
938

    
939
    } catch (RuntimeException e) {
940
    	  ServiceFailure sf = new ServiceFailure("4872", 
941
                "Runtime Exception: Couldn't determine if node is allowed: " + 
942
                e.getMessage());
943
    	  sf.initCause(e);
944
        throw sf;
945
        
946
    } finally {
947
      // always unlock the pid
948
      lock.unlock();
949

    
950
    }
951
      
952
    return isAllowed;
953
    
954
  }
955

    
956
  /**
957
   * Adds a new object to the Node, where the object is a science metadata object.
958
   * 
959
   * @param session - the Session object containing the credentials for the Subject
960
   * @param pid - The object identifier to be created
961
   * @param object - the object bytes
962
   * @param sysmeta - the system metadata that describes the object  
963
   * 
964
   * @return pid - the object identifier created
965
   * 
966
   * @throws InvalidToken
967
   * @throws ServiceFailure
968
   * @throws NotAuthorized
969
   * @throws IdentifierNotUnique
970
   * @throws UnsupportedType
971
   * @throws InsufficientResources
972
   * @throws InvalidSystemMetadata
973
   * @throws NotImplemented
974
   * @throws InvalidRequest
975
   */
976
  public Identifier create(Session session, Identifier pid, InputStream object,
977
    SystemMetadata sysmeta) 
978
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
979
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
980
    NotImplemented, InvalidRequest {
981
      
982
      
983
      // The lock to be used for this identifier
984
      ILock lock = null;
985
      
986
      try {
987
        // are we allowed?
988
          boolean isAllowed = false;
989
          CNode cn = D1Client.getCN();
990
          NodeList nodeList = cn.listNodes();
991
          
992
          for (Node node : nodeList.getNodeList()) {
993
              if ( node.getType().equals(NodeType.CN) ) {
994
                  
995
                  List<Subject> subjects = node.getSubjectList();
996
                  for (Subject subject : subjects) {
997
                     if (subject.equals(session.getSubject())) {
998
                         isAllowed = true;
999
                         break;
1000
                     }
1001
                  }
1002
              }
1003
          }
1004

    
1005
          // proceed if we're called by a CN
1006
          if ( isAllowed ) {
1007
              // create the coordinating node version of the document      
1008
              lock = HazelcastService.getInstance().getLock(pid.getValue());
1009
              lock.lock();
1010
              sysmeta.setSerialVersion(BigInteger.ONE);
1011
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1012
              pid = super.create(session, pid, object, sysmeta);
1013

    
1014
          } else {
1015
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1016
                  " isn't allowed to call create() on a Coordinating Node.";
1017
              logMetacat.info(msg);
1018
              throw new NotAuthorized("1100", msg);
1019
          }
1020
          
1021
      } catch (RuntimeException e) {
1022
          // Convert Hazelcast runtime exceptions to service failures
1023
          String msg = "There was a problem creating the object identified by " +
1024
              pid.getValue() + ". There error message was: " + e.getMessage();
1025
          throw new ServiceFailure("4893", msg);
1026
          
1027
      } finally {
1028
          lock.unlock();
1029
      
1030
      }
1031
      
1032
      return pid;
1033

    
1034
  }
1035

    
1036
  /**
1037
   * Set access for a given object using the object identifier and a Subject
1038
   * under a given Session.
1039
   * 
1040
   * @param session - the Session object containing the credentials for the Subject
1041
   * @param pid - the object identifier for the given object to apply the policy
1042
   * @param policy - the access policy to be applied
1043
   * 
1044
   * @return true if the application of the policy succeeds
1045
   * @throws InvalidToken
1046
   * @throws ServiceFailure
1047
   * @throws NotFound
1048
   * @throws NotAuthorized
1049
   * @throws NotImplemented
1050
   * @throws InvalidRequest
1051
   */
1052
  public boolean setAccessPolicy(Session session, Identifier pid, 
1053
      AccessPolicy accessPolicy, long serialVersion) 
1054
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1055
      NotImplemented, InvalidRequest {
1056
      
1057
      // The lock to be used for this identifier
1058
      ILock lock = null;
1059
      
1060
      boolean success = false;
1061
      
1062
      // get the subject
1063
      Subject subject = session.getSubject();
1064
      
1065
      // are we allowed to do this?
1066
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1067
          throw new NotAuthorized("4420", "not allowed by " + subject.getValue() + 
1068
          " on " + pid.getValue());  
1069
      }
1070
      
1071
      SystemMetadata systemMetadata = null;
1072
      try {
1073
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1074
          lock.lock();
1075
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1076

    
1077
          // does the request have the most current system metadata?
1078
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1079
             String msg = "The requested system metadata version number " + 
1080
                 serialVersion + " differs from the current version at " +
1081
                 systemMetadata.getSerialVersion().longValue() +
1082
                 ". Please get the latest copy in order to modify it.";
1083
             throw new InvalidRequest("4402", msg);
1084
          }
1085
          
1086
      } catch (Exception e) {
1087
          // convert Hazelcast RuntimeException to NotFound
1088
          throw new NotFound("4400", "No record found for: " + pid);
1089
        
1090
      }
1091
          
1092
      // set the access policy
1093
      systemMetadata.setAccessPolicy(accessPolicy);
1094
      
1095
      // update the system metadata
1096
      try {
1097
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1098
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1099
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1100
        
1101
      } catch (Exception e) {
1102
          // convert Hazelcast RuntimeException to ServiceFailure
1103
          throw new ServiceFailure("4430", e.getMessage());
1104
        
1105
      } finally {
1106
          lock.unlock();
1107
        
1108
      }
1109
    
1110
    // TODO: how do we know if the map was persisted?
1111
    success = true;
1112
    
1113
    return success;
1114
  }
1115

    
1116
  /**
1117
   * Full replacement of replication metadata in the system metadata for the 
1118
   * specified object, changes date system metadata modified
1119
   * 
1120
   * @param session - the Session object containing the credentials for the Subject
1121
   * @param pid - the object identifier for the given object to apply the policy
1122
   * @param replica - the replica to be updated
1123
   * @return
1124
   * @throws NotImplemented
1125
   * @throws NotAuthorized
1126
   * @throws ServiceFailure
1127
   * @throws InvalidRequest
1128
   * @throws NotFound
1129
   */
1130
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1131
      Replica replica, long serialVersion) 
1132
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1133
      NotFound {
1134
      
1135
      // The lock to be used for this identifier
1136
      ILock lock = null;
1137
      
1138
      // get the subject
1139
      Subject subject = session.getSubject();
1140
      
1141
      // are we allowed to do this?
1142
      try {
1143
        // what is the controlling permission?
1144
        if (!isAuthorized(session, pid, Permission.WRITE)) {
1145
            throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1146
            " on " + pid.getValue());  
1147
        }
1148
        
1149
      } catch (InvalidToken e) {
1150
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1151
                  " on " + pid.getValue());  
1152
          
1153
      }
1154

    
1155
      SystemMetadata systemMetadata = null;
1156
      try {      
1157
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1158
          lock.lock();
1159
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1160

    
1161
          // does the request have the most current system metadata?
1162
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1163
             String msg = "The requested system metadata version number " + 
1164
                 serialVersion + " differs from the current version at " +
1165
                 systemMetadata.getSerialVersion().longValue() +
1166
                 ". Please get the latest copy in order to modify it.";
1167
             throw new InvalidRequest("4853", msg);
1168
          }
1169
          
1170
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
1171
        throw new NotFound("4854", "No record found for: " + pid.getValue() +
1172
            " : " + e.getMessage());
1173
        
1174
      }
1175
          
1176
      // set the status for the replica
1177
      List<Replica> replicas = systemMetadata.getReplicaList();
1178
      NodeReference replicaNode = replica.getReplicaMemberNode();
1179
      int index = 0;
1180
      for (Replica listedReplica: replicas) {
1181
          
1182
          // remove the replica that we are replacing
1183
          if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1184
              replicas.remove(index);
1185
              break;
1186
              
1187
          }
1188
          index++;
1189
      }
1190
      
1191
      // add the new replica item
1192
      replicas.add(replica);
1193
      systemMetadata.setReplicaList(replicas);
1194
      
1195
      // update the metadata
1196
      try {
1197
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1198
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1199
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1200
        
1201
      } catch (Exception e) {
1202
          throw new ServiceFailure("4852", e.getMessage());
1203
      
1204
      } finally {
1205
          lock.unlock();
1206
          
1207
      }
1208
    
1209
      return true;
1210
      
1211
  }
1212
  
1213
    @Override
1214
    public ObjectList listObjects(Session session, Date startTime, 
1215
            Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1216
            Integer start, Integer count)
1217
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1218
      ServiceFailure {
1219
      
1220
      ObjectList objectList = null;
1221
        try {
1222
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1223
        } catch (Exception e) {
1224
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1225
        }
1226

    
1227
        return objectList;
1228
  }
1229
    
1230
}
(1-1/5)