Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.InputStream;
27
import java.math.BigInteger;
28
import java.util.Calendar;
29
import java.util.Date;
30
import java.util.List;
31
import java.util.Set;
32

    
33
import javax.servlet.http.HttpServletRequest;
34

    
35
import org.apache.log4j.Logger;
36
import org.dataone.client.CNode;
37
import org.dataone.client.D1Client;
38
import org.dataone.service.cn.v1.CNAuthorization;
39
import org.dataone.service.cn.v1.CNCore;
40
import org.dataone.service.cn.v1.CNRead;
41
import org.dataone.service.cn.v1.CNReplication;
42
import org.dataone.service.exceptions.BaseException;
43
import org.dataone.service.exceptions.IdentifierNotUnique;
44
import org.dataone.service.exceptions.InsufficientResources;
45
import org.dataone.service.exceptions.InvalidRequest;
46
import org.dataone.service.exceptions.InvalidSystemMetadata;
47
import org.dataone.service.exceptions.InvalidToken;
48
import org.dataone.service.exceptions.NotAuthorized;
49
import org.dataone.service.exceptions.NotFound;
50
import org.dataone.service.exceptions.NotImplemented;
51
import org.dataone.service.exceptions.ServiceFailure;
52
import org.dataone.service.exceptions.UnsupportedType;
53
import org.dataone.service.types.v1.AccessPolicy;
54
import org.dataone.service.types.v1.Checksum;
55
import org.dataone.service.types.v1.ChecksumAlgorithmList;
56
import org.dataone.service.types.v1.Identifier;
57
import org.dataone.service.types.v1.Node;
58
import org.dataone.service.types.v1.NodeList;
59
import org.dataone.service.types.v1.NodeReference;
60
import org.dataone.service.types.v1.NodeType;
61
import org.dataone.service.types.v1.ObjectFormat;
62
import org.dataone.service.types.v1.ObjectFormatIdentifier;
63
import org.dataone.service.types.v1.ObjectFormatList;
64
import org.dataone.service.types.v1.ObjectList;
65
import org.dataone.service.types.v1.ObjectLocationList;
66
import org.dataone.service.types.v1.Permission;
67
import org.dataone.service.types.v1.Replica;
68
import org.dataone.service.types.v1.ReplicationPolicy;
69
import org.dataone.service.types.v1.ReplicationStatus;
70
import org.dataone.service.types.v1.Session;
71
import org.dataone.service.types.v1.Subject;
72
import org.dataone.service.types.v1.SystemMetadata;
73

    
74
import com.hazelcast.core.ILock;
75

    
76
import edu.ucsb.nceas.metacat.EventLog;
77
import edu.ucsb.nceas.metacat.IdentifierManager;
78
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
79

    
80
/**
81
 * Represents Metacat's implementation of the DataONE Coordinating Node 
82
 * service API. Methods implement the various CN* interfaces, and methods common
83
 * to both Member Node and Coordinating Node interfaces are found in the
84
 * D1NodeService super class.
85
 *
86
 */
87
public class CNodeService extends D1NodeService implements CNAuthorization,
88
    CNCore, CNRead, CNReplication {
89

    
90
  /* the logger instance */
91
  private Logger logMetacat = null;
92

    
93
  /**
94
   * singleton accessor
95
   */
96
  public static CNodeService getInstance(HttpServletRequest request) { 
97
    return new CNodeService(request);
98
  }
99
  
100
  /**
101
   * Constructor, private for singleton access
102
   */
103
  private CNodeService(HttpServletRequest request) {
104
    super(request);
105
    logMetacat = Logger.getLogger(CNodeService.class);
106
        
107
  }
108
    
109
  /**
110
   * Set the replication policy for an object given the object identifier
111
   * 
112
   * @param session - the Session object containing the credentials for the Subject
113
   * @param pid - the object identifier for the given object
114
   * @param policy - the replication policy to be applied
115
   * 
116
   * @return true or false
117
   * 
118
   * @throws NotImplemented
119
   * @throws NotAuthorized
120
   * @throws ServiceFailure
121
   * @throws InvalidRequest
122
   * 
123
   */
124
  @Override
125
  public boolean setReplicationPolicy(Session session, Identifier pid,
126
      ReplicationPolicy policy, long serialVersion) 
127
      throws NotImplemented, NotFound, NotAuthorized, ServiceFailure, 
128
      InvalidRequest, InvalidToken {
129
      
130
      // The lock to be used for this identifier
131
      ILock lock = null;
132
      
133
      // get the subject
134
      Subject subject = session.getSubject();
135
      
136
      // are we allowed to do this?
137
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
138
        throw new NotAuthorized("4881", Permission.CHANGE_PERMISSION + 
139
            " not allowed by " + subject.getValue() + " on " + pid.getValue());  
140
      }
141
      
142
      SystemMetadata systemMetadata = null;
143
      try {
144
          if ( HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid) ) {
145
              lock = HazelcastService.getInstance().getLock(pid.getValue());
146
              lock.lock();
147
              systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
148
              
149
          }
150
        
151

    
152
          // does the request have the most current system metadata?
153
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
154
             String msg = "The requested system metadata version number " + 
155
                 serialVersion + " differs from the current version at " +
156
                 systemMetadata.getSerialVersion().longValue() +
157
                 ". Please get the latest copy in order to modify it.";
158
             throw new InvalidRequest("4883", msg);
159
          }
160
          
161
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
162
        throw new NotFound("4884", "No record found for: " + pid.getValue());
163
        
164
      }
165
          
166
      // set the new policy
167
      systemMetadata.setReplicationPolicy(policy);
168
      
169
      // update the metadata
170
      try {
171
        systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
172
        systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
173
        HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
174
        
175
      } catch (Exception e) {
176
          throw new ServiceFailure("4882", e.getMessage());
177
      
178
      } finally {
179
          lock.unlock();
180
          logMetacat.debug("Unlocked identifier " + pid.getValue());
181

    
182
          
183
      }
184
    
185
      return true;
186
  }
187

    
188
  /**
189
   * Set the replication status for an object given the object identifier
190
   * 
191
   * @param session - the Session object containing the credentials for the Subject
192
   * @param pid - the object identifier for the given object
193
   * @param status - the replication status to be applied
194
   * 
195
   * @return true or false
196
   * 
197
   * @throws NotImplemented
198
   * @throws NotAuthorized
199
   * @throws ServiceFailure
200
   * @throws InvalidRequest
201
   * @throws InvalidToken
202
   * @throws NotFound
203
   * 
204
   */
205
  @Override
206
  public boolean setReplicationStatus(Session session, Identifier pid,
207
      NodeReference targetNode, ReplicationStatus status, BaseException failure) 
208
      throws ServiceFailure, NotImplemented, InvalidToken, NotAuthorized, 
209
      InvalidRequest, NotFound {
210
      
211
      // The lock to be used for this identifier
212
      ILock lock = null;
213
      
214
      boolean allowed = false;
215
      int replicaEntryIndex = -1;
216
      List<Replica> replicas = null;
217
      // get the subject
218
      Subject subject = session.getSubject();
219
      logMetacat.debug("ReplicationStatus for identifier " + pid.getValue() +
220
          " is " + status.toString());
221
      
222
      SystemMetadata systemMetadata = null;
223
      try {      
224
          lock = HazelcastService.getInstance().getLock(pid.getValue());
225
          lock.lock();
226
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
227

    
228
          if ( systemMetadata == null ) {
229
              logMetacat.debug("systemMetadata is null for " + pid.getValue());
230
              
231
          }
232
          replicas = systemMetadata.getReplicaList();
233
          int count = 0;
234
          
235
          if ( replicas == null || replicas.size() < 1 ) {
236
              logMetacat.debug("no replicas to evaluate");
237
              throw new InvalidRequest("4730", "There are no replicas to update.");
238
              
239
          }
240

    
241
          // find the target replica index in the replica list
242
          for (Replica replica: replicas) {
243
              String replicaNodeStr = replica.getReplicaMemberNode().getValue();
244
              String targetNodeStr = targetNode.getValue();
245
              logMetacat.debug("Comparing " + replicaNodeStr + " to " + targetNodeStr);
246
              
247
              if (replicaNodeStr.equals(targetNodeStr)) {
248
                  replicaEntryIndex = count;
249
                  logMetacat.debug("replica entry index is: " + replicaEntryIndex);
250
                  break;
251
              }
252
              count++;
253
              
254
          }
255

    
256
          // are we allowed to do this? only CNs and target MNs are allowed
257
          CNode cn = D1Client.getCN();
258
          List<Node> nodes = cn.listNodes().getNodeList();
259
          
260
          // find the node in the node list
261
          for ( Node node : nodes ) {
262
              
263
              NodeReference nodeReference = node.getIdentifier();
264
              logMetacat.debug("In setReplicationStatus(), Node reference is: " + nodeReference.getValue());
265
              
266
              // allow target MN certs and CN certs
267
              if (targetNode.getValue().equals(nodeReference.getValue()) ||
268
                  node.getType() == NodeType.CN) {
269
                  List<Subject> nodeSubjects = node.getSubjectList();
270
                  
271
                  // check if the session subject is in the node subject list
272
                  for (Subject nodeSubject : nodeSubjects) {
273
                      if ( nodeSubject.equals(subject) ) {
274
                          allowed = true; // subject of session == target node subject
275
                          break;
276
                          
277
                      }
278
                  }                 
279
              }
280
          }
281

    
282
          if ( !allowed ) {
283
              String msg = "The subject identified by " + subject.getValue() +
284
                " does not have permission to set the replication status for " +
285
                "the replica identified by " + targetNode.getValue() + ".";
286
              logMetacat.info(msg);
287
              throw new NotAuthorized("4720", msg);
288
              
289
          }
290
          
291
          // was there a failure? log it
292
          if ( failure != null && status == ReplicationStatus.FAILED ) {
293
             String msg = "The replication request of the object identified by " + 
294
                 pid.getValue() + " failed.  The error message was " +
295
                 failure.getMessage() + ".";
296
          }
297
          
298
      } catch (RuntimeException e) { // Catch is generic since HZ throws RuntimeException
299
        throw new NotFound("4740", "No record found for: " + pid.getValue() +
300
            " : " + e.getMessage());
301
        
302
      }
303
          
304
      // set the status for the replica
305
      if ( replicaEntryIndex != -1 ) {
306
          Replica targetReplica = replicas.get(replicaEntryIndex);
307
          targetReplica.setReplicationStatus(status);
308
          logMetacat.debug("Set the replication status for " + 
309
              targetReplica.getReplicaMemberNode().getValue() + " to " +
310
              targetReplica.getReplicationStatus());
311
          
312
      } else {
313
          throw new InvalidRequest("4730", "There are no replicas to update.");
314

    
315
      }
316
      
317
      systemMetadata.setReplicaList(replicas);
318
            
319
      // update the metadata
320
      try {
321
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
322
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
323
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
324
          
325
          if ( status == ReplicationStatus.FAILED && failure != null ) {
326
              logMetacat.warn("Replication failed for identifier " + pid.getValue() +
327
                  " on target node " + targetNode + ". The exception was: " +
328
                  failure.getMessage());
329
          }
330
      } catch (Exception e) {
331
          throw new ServiceFailure("4700", e.getMessage());
332
      
333
      } finally {
334
          lock.unlock();
335
          logMetacat.debug("Unlocked identifier " + pid.getValue());
336
         
337
      }
338
      
339
      return true;
340
  }
341

    
342
  /**
343
   * Test that the specified relationship between pidOfSubject and pidOfObject exists
344
   * 
345
   * @param session - the Session object containing the credentials for the Subject
346
   * @param node - the node information for the given node be modified
347
   * 
348
   * @return true if the relationship exists
349
   * 
350
   * @throws InvalidToken
351
   * @throws ServiceFailure
352
   * @throws NotAuthorized
353
   * @throws NotFound
354
   * @throws InvalidRequest
355
   * @throws NotImplemented
356
   */
357
  @Override
358
  public boolean assertRelation(Session session, Identifier pidOfSubject, 
359
    String relationship, Identifier pidOfObject) 
360
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
361
    InvalidRequest, NotImplemented {
362
    
363
    // The lock to be used for thyis identifier
364
    ILock lock = null;
365

    
366
    boolean asserted = false;
367
        
368
    // are we allowed to do this?
369
    if (!isAuthorized(session, pidOfSubject, Permission.READ)) {
370
      throw new NotAuthorized("4881", Permission.READ + " not allowed on " + pidOfSubject.getValue());  
371
    }
372
    
373
    SystemMetadata systemMetadata = null;
374
    try {
375
        lock = HazelcastService.getInstance().getLock(pidOfSubject.getValue());
376
        lock.lock();
377
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pidOfSubject);
378
        
379
        
380
        // check relationships
381
        // TODO: use ORE map
382
        if (relationship.equalsIgnoreCase("describes")) {
383
          
384
        }
385
        
386
        if (relationship.equalsIgnoreCase("describedBy")) {
387
          
388
        }
389
        
390
        if (relationship.equalsIgnoreCase("derivedFrom")) {
391
          
392
        }
393
        
394
        if (relationship.equalsIgnoreCase("obsoletes")) {
395
            Identifier pid = systemMetadata.getObsoletes();
396
            if (pid.getValue().equals(pidOfObject.getValue())) {
397
              asserted = true;
398
            
399
        }
400
          //return systemMetadata.getObsoleteList().contains(pidOfObject);
401
        }
402
        if (relationship.equalsIgnoreCase("obsoletedBy")) {
403
            Identifier pid = systemMetadata.getObsoletedBy();
404
            if (pid.getValue().equals(pidOfObject.getValue())) {
405
              asserted = true;
406
            
407
        }
408
          //return systemMetadata.getObsoletedByList().contains(pidOfObject);
409
        }
410
              
411
    } catch (Exception e) {
412
        throw new ServiceFailure("4270", "Could not assert relation for: " + 
413
            pidOfSubject.getValue() +
414
            ". The error message was: " + e.getMessage());
415
      
416
    } finally {
417
        lock.unlock();
418
        logMetacat.debug("Unlocked identifier " + pidOfSubject.getValue());
419

    
420
    }
421
        
422
    return asserted;
423
  }
424
  
425
  /**
426
   * Return the checksum of the object given the identifier 
427
   * 
428
   * @param session - the Session object containing the credentials for the Subject
429
   * @param pid - the object identifier for the given object
430
   * 
431
   * @return checksum - the checksum of the object
432
   * 
433
   * @throws InvalidToken
434
   * @throws ServiceFailure
435
   * @throws NotAuthorized
436
   * @throws NotFound
437
   * @throws NotImplemented
438
   */
439
  @Override
440
  public Checksum getChecksum(Session session, Identifier pid)
441
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, 
442
    NotImplemented {
443
        
444
    // The lock to be used for thyis identifier
445
    ILock lock = null;
446
    
447
    if (!isAuthorized(session, pid, Permission.READ)) {
448
        throw new NotAuthorized("1400", Permission.READ + " not allowed on " + pid.getValue());  
449
    }
450
    
451
    SystemMetadata systemMetadata = null;
452
    Checksum checksum = null;
453
    
454
    try {
455
        lock = HazelcastService.getInstance().getLock(pid.getValue());
456
        lock.lock();
457
        systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
458
        checksum = systemMetadata.getChecksum();
459

    
460
    } catch (Exception e) {
461
        throw new ServiceFailure("1410", "An error occurred getting the checksum for " + 
462
            pid.getValue() + ". The error message was: " + e.getMessage());
463
      
464
    } finally {
465
        lock.unlock();
466
        logMetacat.debug("Unlocked identifier " + pid.getValue());
467
        
468
    }
469
    
470
    return checksum;
471
  }
472

    
473
  /**
474
   * Resolve the location of a given object
475
   * 
476
   * @param session - the Session object containing the credentials for the Subject
477
   * @param pid - the object identifier for the given object
478
   * 
479
   * @return objectLocationList - the list of nodes known to contain the object
480
   * 
481
   * @throws InvalidToken
482
   * @throws ServiceFailure
483
   * @throws NotAuthorized
484
   * @throws NotFound
485
   * @throws NotImplemented
486
   */
487
  @Override
488
  public ObjectLocationList resolve(Session session, Identifier pid)
489
    throws InvalidToken, ServiceFailure, NotAuthorized,
490
    NotFound, NotImplemented {
491

    
492
    throw new NotImplemented("4131", "resolve not implemented");
493

    
494
  }
495

    
496
  /**
497
   * Search the metadata catalog for identifiers that match the criteria
498
   * 
499
   * @param session - the Session object containing the credentials for the Subject
500
   * @param queryType - An identifier for the type of query expression 
501
   *                    provided in the query
502
   * @param query -  The criteria for matching the characteristics of the 
503
   *                 metadata objects of interest
504
   * 
505
   * @return objectList - the list of objects matching the criteria
506
   * 
507
   * @throws InvalidToken
508
   * @throws ServiceFailure
509
   * @throws NotAuthorized
510
   * @throws InvalidRequest
511
   * @throws NotImplemented
512
   */
513
  @Override
514
  public ObjectList search(Session session, String queryType, String query)
515
    throws InvalidToken, ServiceFailure, NotAuthorized, InvalidRequest,
516
    NotImplemented {
517

    
518
    ObjectList objectList = null;
519
    try {
520
        objectList = 
521
          IdentifierManager.getInstance().querySystemMetadata(
522
              null, //startTime, 
523
              null, //endTime,
524
              null, //objectFormat, 
525
              false, //replicaStatus, 
526
              0, //start, 
527
              -1 //count
528
              );
529
        
530
    } catch (Exception e) {
531
      throw new ServiceFailure("4310", "Error querying system metadata: " + e.getMessage());
532
    }
533

    
534
      return objectList;
535
      
536
    //throw new NotImplemented("4281", "search not implemented");
537
    
538
    // the code block below is from an older implementation
539
    
540
    /*  This block commented out because of the EcoGrid circular dependency.
541
         *  For now, query will not be supported until the circularity can be
542
         *  resolved, probably by moving the ecogrid query syntax transformers
543
         *  directly into the Metacat codebase.  MBJ 2010-02-03
544
         
545
        try {
546
            EcogridQueryParser parser = new EcogridQueryParser(request
547
                    .getReader());
548
            parser.parseXML();
549
            QueryType queryType = parser.getEcogridQuery();
550
            EcogridJavaToMetacatJavaQueryTransformer queryTransformer = 
551
                new EcogridJavaToMetacatJavaQueryTransformer();
552
            QuerySpecification metacatQuery = queryTransformer
553
                    .transform(queryType);
554

    
555
            DBQuery metacat = new DBQuery();
556

    
557
            boolean useXMLIndex = (new Boolean(PropertyService
558
                    .getProperty("database.usexmlindex"))).booleanValue();
559
            String xmlquery = "query"; // we don't care the query in resultset,
560
            // the query can be anything
561
            PrintWriter out = null; // we don't want metacat result, so set out null
562

    
563
            // parameter: queryspecification, user, group, usingIndexOrNot
564
            StringBuffer result = metacat.createResultDocument(xmlquery,
565
                    metacatQuery, out, username, groupNames, useXMLIndex);
566

    
567
            // create result set transfer       
568
            String saxparser = PropertyService.getProperty("xml.saxparser");
569
            MetacatResultsetParser metacatResultsetParser = new MetacatResultsetParser(
570
                    new StringReader(result.toString()), saxparser, queryType
571
                            .getNamespace().get_value());
572
            ResultsetType records = metacatResultsetParser.getEcogridResult();
573

    
574
            System.out
575
                    .println(EcogridResultsetTransformer.toXMLString(records));
576
            response.setContentType("text/xml");
577
            out = response.getWriter();
578
            out.print(EcogridResultsetTransformer.toXMLString(records));
579

    
580
        } catch (Exception e) {
581
            e.printStackTrace();
582
        }*/
583
    
584

    
585
  }
586
  
587
  /**
588
   * Returns the object format registered in the DataONE Object Format 
589
   * Vocabulary for the given format identifier
590
   * 
591
   * @param fmtid - the identifier of the format requested
592
   * 
593
   * @return objectFormat - the object format requested
594
   * 
595
   * @throws ServiceFailure
596
   * @throws NotFound
597
   * @throws InsufficientResources
598
   * @throws NotImplemented
599
   */
600
  @Override
601
  public ObjectFormat getFormat(ObjectFormatIdentifier fmtid)
602
    throws ServiceFailure, NotFound, NotImplemented {
603
     
604
      return ObjectFormatService.getInstance().getFormat(fmtid);
605
      
606
  }
607

    
608
  /**
609
   * Returns a list of all object formats registered in the DataONE Object 
610
   * Format Vocabulary
611
    * 
612
   * @return objectFormatList - The list of object formats registered in 
613
   *                            the DataONE Object Format Vocabulary
614
   * 
615
   * @throws ServiceFailure
616
   * @throws NotImplemented
617
   * @throws InsufficientResources
618
   */
619
  @Override
620
  public ObjectFormatList listFormats() 
621
    throws ServiceFailure, NotImplemented {
622

    
623
    return ObjectFormatService.getInstance().listFormats();
624
  }
625

    
626
  /**
627
   * Returns a list of nodes that have been registered with the DataONE infrastructure
628
    * 
629
   * @return nodeList - List of nodes from the registry
630
   * 
631
   * @throws ServiceFailure
632
   * @throws NotImplemented
633
   */
634
  @Override
635
  public NodeList listNodes() 
636
    throws NotImplemented, ServiceFailure {
637

    
638
    throw new NotImplemented("4800", "listNodes not implemented");
639
  }
640

    
641
  /**
642
   * Provides a mechanism for adding system metadata independently of its 
643
   * associated object, such as when adding system metadata for data objects.
644
    * 
645
   * @param session - the Session object containing the credentials for the Subject
646
   * @param pid - The identifier of the object to register the system metadata against
647
   * @param sysmeta - The system metadata to be registered
648
   * 
649
   * @return true if the registration succeeds
650
   * 
651
   * @throws NotImplemented
652
   * @throws NotAuthorized
653
   * @throws ServiceFailure
654
   * @throws InvalidRequest
655
   * @throws InvalidSystemMetadata
656
   */
657
  @Override
658
  public Identifier registerSystemMetadata(Session session, Identifier pid,
659
      SystemMetadata sysmeta) 
660
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest, 
661
      InvalidSystemMetadata {
662

    
663
      // The lock to be used for this identifier
664
      ILock lock = null;
665

    
666
      // TODO: control who can call this?
667
      if (session == null) {
668
          //TODO: many of the thrown exceptions do not use the correct error codes
669
          //check these against the docs and correct them
670
          throw new NotAuthorized("4861", "No Session - could not authorize for registration." +
671
                  "  If you are not logged in, please do so and retry the request.");
672
      }
673
      
674
      // verify that guid == SystemMetadata.getIdentifier()
675
      logMetacat.debug("Comparing guid|sysmeta_guid: " + pid.getValue() + 
676
          "|" + sysmeta.getIdentifier().getValue());
677
      if (!pid.getValue().equals(sysmeta.getIdentifier().getValue())) {
678
          throw new InvalidRequest("4863", 
679
              "The identifier in method call (" + pid.getValue() + 
680
              ") does not match identifier in system metadata (" +
681
              sysmeta.getIdentifier().getValue() + ").");
682
      }
683

    
684
      logMetacat.debug("Checking if identifier exists...");
685
      // Check that the identifier does not already exist
686
      if (HazelcastService.getInstance().getSystemMetadataMap().containsKey(pid)) {
687
          throw new InvalidRequest("4863", 
688
              "The identifier is already in use by an existing object.");
689
      
690
      }
691
      
692
      // insert the system metadata into the object store
693
      logMetacat.debug("Starting to insert SystemMetadata...");
694
      try {
695
          lock = HazelcastService.getInstance().getLock(sysmeta.getIdentifier().getValue());
696
          sysmeta.setSerialVersion(BigInteger.ONE);
697
          sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
698
          HazelcastService.getInstance().getSystemMetadataMap().put(sysmeta.getIdentifier(), sysmeta);
699
          
700
      } catch (Exception e) {
701
        logMetacat.error("Problem registering system metadata: " + pid.getValue(), e);
702
          throw new ServiceFailure("4862", "Error inserting system metadata: " + 
703
              e.getClass() + ": " + e.getMessage());
704
          
705
      } finally {
706
        lock.unlock();
707
        logMetacat.debug("Unlocked identifier " + pid.getValue());
708

    
709
      }
710
      
711
      logMetacat.debug("Returning from registerSystemMetadata");
712
      EventLog.getInstance().log(request.getRemoteAddr(), 
713
          request.getHeader("User-Agent"), session.getSubject().getValue(), 
714
          pid.getValue(), "registerSystemMetadata");
715
      return pid;
716
  }
717
  
718
  /**
719
   * Given an optional scope and format, reserves and returns an identifier 
720
   * within that scope and format that is unique and will not be 
721
   * used by any other sessions. 
722
    * 
723
   * @param session - the Session object containing the credentials for the Subject
724
   * @param pid - The identifier of the object to register the system metadata against
725
   * @param scope - An optional string to be used to qualify the scope of 
726
   *                the identifier namespace, which is applied differently 
727
   *                depending on the format requested. If scope is not 
728
   *                supplied, a default scope will be used.
729
   * @param format - The optional name of the identifier format to be used, 
730
   *                  drawn from a DataONE-specific vocabulary of identifier 
731
   *                 format names, including several common syntaxes such 
732
   *                 as DOI, LSID, UUID, and LSRN, among others. If the 
733
   *                 format is not supplied by the caller, the CN service 
734
   *                 will use a default identifier format, which may change 
735
   *                 over time.
736
   * 
737
   * @return true if the registration succeeds
738
   * 
739
   * @throws InvalidToken
740
   * @throws ServiceFailure
741
   * @throws NotAuthorized
742
   * @throws IdentifierNotUnique
743
   * @throws NotImplemented
744
   */
745
  @Override
746
  public Identifier reserveIdentifier(Session session, Identifier pid)
747
  throws InvalidToken, ServiceFailure,
748
        NotAuthorized, IdentifierNotUnique, NotImplemented, InvalidRequest {
749

    
750
    throw new NotImplemented("4191", "reserveIdentifier not implemented on this node");
751
  }
752
  
753
  @Override
754
  public Identifier generateIdentifier(Session session, String scheme, String fragment)
755
  throws InvalidToken, ServiceFailure,
756
        NotAuthorized, NotImplemented, InvalidRequest {
757
    throw new NotImplemented("4191", "generateIdentifier not implemented on this node");
758
  }
759
  
760
  /**
761
    * Checks whether the pid is reserved by the subject in the session param
762
    * If the reservation is held on the pid by the subject, we return true.
763
    * 
764
   * @param session - the Session object containing the Subject
765
   * @param pid - The identifier to check
766
   * 
767
   * @return true if the reservation exists for the subject/pid
768
   * 
769
   * @throws InvalidToken
770
   * @throws ServiceFailure
771
   * @throws NotFound - when the pid is not found (in use or in reservation)
772
   * @throws NotAuthorized - when the subject does not hold a reservation on the pid
773
   * @throws IdentifierNotUnique - when the pid is in use
774
   * @throws NotImplemented
775
   */
776

    
777
  @Override
778
  public boolean hasReservation(Session session, Identifier pid) 
779
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, IdentifierNotUnique, 
780
      NotImplemented, InvalidRequest {
781
  
782
      throw new NotImplemented("4191", "hasReservation not implemented on this node");
783
  }
784

    
785
  /**
786
   * Changes ownership (RightsHolder) of the specified object to the 
787
   * subject specified by userId
788
    * 
789
   * @param session - the Session object containing the credentials for the Subject
790
   * @param pid - Identifier of the object to be modified
791
   * @param userId - The subject that will be taking ownership of the specified object.
792
   *
793
   * @return pid - the identifier of the modified object
794
   * 
795
   * @throws ServiceFailure
796
   * @throws InvalidToken
797
   * @throws NotFound
798
   * @throws NotAuthorized
799
   * @throws NotImplemented
800
   * @throws InvalidRequest
801
   */  
802
  @Override
803
  public Identifier setRightsHolder(Session session, Identifier pid, Subject userId,
804
      long serialVersion)
805
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized,
806
      NotImplemented, InvalidRequest {
807
      
808
      // The lock to be used for this identifier
809
      ILock lock = null;
810

    
811
      // get the subject
812
      Subject subject = session.getSubject();
813
      
814
      // are we allowed to do this?
815
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
816
        throw new NotAuthorized("4440", "not allowed by " + subject.getValue() + " on " + pid.getValue());  
817
      }
818
      
819
      SystemMetadata systemMetadata = null;
820
      try {
821
          lock = HazelcastService.getInstance().getLock(pid.getValue());
822
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
823
          
824
          // does the request have the most current system metadata?
825
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
826
             String msg = "The requested system metadata version number " + 
827
                 serialVersion + " differs from the current version at " +
828
                 systemMetadata.getSerialVersion().longValue() +
829
                 ". Please get the latest copy in order to modify it.";
830
             throw new InvalidRequest("4442", msg);
831
          }
832
          
833
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
834
          throw new NotFound("4460", "No record found for: " + pid.getValue());
835
          
836
      }
837
          
838
      // set the new rights holder
839
      systemMetadata.setRightsHolder(userId);
840
      
841
      // update the metadata
842
      try {
843
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
844
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
845
          HazelcastService.getInstance().getSystemMetadataMap().put(pid, systemMetadata);
846
          
847
      } catch (Exception e) {
848
      throw new ServiceFailure("4490", e.getMessage());
849
      
850
      } finally {
851
          lock.unlock();
852
          logMetacat.debug("Unlocked identifier " + pid.getValue());
853

    
854
      }
855
      
856
      return pid;
857
  }
858

    
859
  /**
860
   * Verify that a replication task is authorized by comparing the target node's
861
   * Subject (from the X.509 certificate-derived Session) with the list of 
862
   * subjects in the known, pending replication tasks map.
863
   * 
864
   * @param originatingNodeSession - Session information that contains the 
865
   *                                 identity of the calling user
866
   * @param targetNodeSubject - Subject identifying the target node
867
   * @param pid - the identifier of the object to be replicated
868
   * @param replicatePermission - the execute permission to be granted
869
   * 
870
   * @throws ServiceFailure
871
   * @throws NotImplemented
872
   * @throws InvalidToken
873
   * @throws NotAuthorized
874
   * @throws InvalidRequest
875
   * @throws NotFound
876
   */
877
  @Override
878
  public boolean isNodeAuthorized(Session originatingNodeSession, 
879
    Subject targetNodeSubject, Identifier pid) 
880
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
881
    NotFound, InvalidRequest {
882

    
883
    // The lock to be used for this identifier
884
    ILock lock = null;
885
    
886
    boolean isAllowed = false;
887
    SystemMetadata sysmeta = null;
888
    NodeReference targetNode = null;
889
    
890
    try {
891
      // get the target node reference from the nodes list
892
      CNode cn = D1Client.getCN();
893
      List<Node> nodes = cn.listNodes().getNodeList();
894
      
895
      if ( nodes != null ) {
896
        for (Node node : nodes) {
897
            
898
            for (Subject nodeSubject : node.getSubjectList()) {
899
                
900
                if ( nodeSubject.equals(targetNodeSubject) ) {
901
                    targetNode = node.getIdentifier();
902
                    logMetacat.debug("targetNode is : " + targetNode.getValue());
903
                    break;
904
                }
905
            }
906
            
907
            if ( targetNode != null) { break; }
908
        }
909
        
910
      } else {
911
          String msg = "Couldn't get the node list from the CN";
912
          logMetacat.debug(msg);
913
          throw new ServiceFailure("4872", msg);
914
          
915
      }
916
      
917
      // can't find a node listed with the given subject
918
      if ( targetNode == null ) {
919
          String msg = "There is no Member Node registered with a node subject " +
920
              "matching " + targetNodeSubject.getValue();
921
          logMetacat.info(msg);
922
          throw new ServiceFailure("4872", msg);
923
          
924
      }
925
      
926
      //lock, get, and unlock the pid
927
      lock = HazelcastService.getInstance().getLock(pid.getValue());
928
      lock.lock();
929
      logMetacat.debug("Getting system metadata for identifier " + pid.getValue());
930
      
931
      sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
932

    
933
      if ( sysmeta != null ) {
934
          
935
          List<Replica> replicaList = sysmeta.getReplicaList();
936
          
937
          if ( replicaList != null ) {
938
              
939
              // find the replica with the status set to 'requested'
940
              for (Replica replica : replicaList) {
941
                  ReplicationStatus status = replica.getReplicationStatus();
942
                  NodeReference listedNode = replica.getReplicaMemberNode();
943
                  if ( listedNode != null && targetNode != null ) {
944
                      logMetacat.debug("Comparing " + listedNode.getValue()
945
                              + " to " + targetNode.getValue());
946
                      
947
                      if (listedNode.getValue().equals(targetNode.getValue())
948
                              && status.equals(ReplicationStatus.REQUESTED)) {
949
                          isAllowed = true;
950
                          break;
951

    
952
                      }
953
                  }
954
              }
955
          }
956
          logMetacat.debug("The " + targetNode.getValue() + " is allowed " +
957
              "to replicate: " + isAllowed + " for " + pid.getValue());
958

    
959
          
960
      } else {
961
          logMetacat.debug("System metadata for identifier " + pid.getValue() +
962
          " is null.");          
963
          
964
      }
965

    
966
    } catch (RuntimeException e) {
967
    	  ServiceFailure sf = new ServiceFailure("4872", 
968
                "Runtime Exception: Couldn't determine if node is allowed: " + 
969
                e.getCause().getMessage());
970
    	  sf.initCause(e);
971
        throw sf;
972
        
973
    } finally {
974
      // always unlock the pid
975
      lock.unlock();
976
      logMetacat.debug("Unlocked identifier " + pid.getValue());
977

    
978
    }
979
      
980
    return isAllowed;
981
    
982
  }
983

    
984
  /**
985
   * Adds a new object to the Node, where the object is a science metadata object.
986
   * 
987
   * @param session - the Session object containing the credentials for the Subject
988
   * @param pid - The object identifier to be created
989
   * @param object - the object bytes
990
   * @param sysmeta - the system metadata that describes the object  
991
   * 
992
   * @return pid - the object identifier created
993
   * 
994
   * @throws InvalidToken
995
   * @throws ServiceFailure
996
   * @throws NotAuthorized
997
   * @throws IdentifierNotUnique
998
   * @throws UnsupportedType
999
   * @throws InsufficientResources
1000
   * @throws InvalidSystemMetadata
1001
   * @throws NotImplemented
1002
   * @throws InvalidRequest
1003
   */
1004
  public Identifier create(Session session, Identifier pid, InputStream object,
1005
    SystemMetadata sysmeta) 
1006
    throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
1007
    UnsupportedType, InsufficientResources, InvalidSystemMetadata, 
1008
    NotImplemented, InvalidRequest {
1009
      
1010
      
1011
      // The lock to be used for this identifier
1012
      ILock lock = null;
1013
      
1014
      try {
1015
        // are we allowed?
1016
          boolean isAllowed = false;
1017
          CNode cn = D1Client.getCN();
1018
          NodeList nodeList = cn.listNodes();
1019
          
1020
          for (Node node : nodeList.getNodeList()) {
1021
              if ( node.getType().equals(NodeType.CN) ) {
1022
                  
1023
                  List<Subject> subjects = node.getSubjectList();
1024
                  for (Subject subject : subjects) {
1025
                     if (subject.equals(session.getSubject())) {
1026
                         isAllowed = true;
1027
                         break;
1028
                     }
1029
                  }
1030
              }
1031
          }
1032

    
1033
          // proceed if we're called by a CN
1034
          if ( isAllowed ) {
1035
              // create the coordinating node version of the document      
1036
              lock = HazelcastService.getInstance().getLock(pid.getValue());
1037
              lock.lock();
1038
              sysmeta.setSerialVersion(BigInteger.ONE);
1039
              sysmeta.setDateSysMetadataModified(Calendar.getInstance().getTime());
1040
              pid = super.create(session, pid, object, sysmeta);
1041

    
1042
          } else {
1043
              String msg = "The subject listed as " + session.getSubject().getValue() + 
1044
                  " isn't allowed to call create() on a Coordinating Node.";
1045
              logMetacat.info(msg);
1046
              throw new NotAuthorized("1100", msg);
1047
          }
1048
          
1049
      } catch (RuntimeException e) {
1050
          // Convert Hazelcast runtime exceptions to service failures
1051
          String msg = "There was a problem creating the object identified by " +
1052
              pid.getValue() + ". There error message was: " + e.getMessage();
1053
          throw new ServiceFailure("4893", msg);
1054
          
1055
      } finally {
1056
    	  if (lock != null) {
1057
	          lock.unlock();
1058
	          logMetacat.debug("Unlocked identifier " + pid.getValue());
1059
    	  }
1060
      }
1061
      
1062
      return pid;
1063

    
1064
  }
1065

    
1066
  /**
1067
   * Set access for a given object using the object identifier and a Subject
1068
   * under a given Session.
1069
   * 
1070
   * @param session - the Session object containing the credentials for the Subject
1071
   * @param pid - the object identifier for the given object to apply the policy
1072
   * @param policy - the access policy to be applied
1073
   * 
1074
   * @return true if the application of the policy succeeds
1075
   * @throws InvalidToken
1076
   * @throws ServiceFailure
1077
   * @throws NotFound
1078
   * @throws NotAuthorized
1079
   * @throws NotImplemented
1080
   * @throws InvalidRequest
1081
   */
1082
  public boolean setAccessPolicy(Session session, Identifier pid, 
1083
      AccessPolicy accessPolicy, long serialVersion) 
1084
      throws InvalidToken, ServiceFailure, NotFound, NotAuthorized, 
1085
      NotImplemented, InvalidRequest {
1086
      
1087
      // The lock to be used for this identifier
1088
      ILock lock = null;
1089
      
1090
      boolean success = false;
1091
      
1092
      // get the subject
1093
      Subject subject = session.getSubject();
1094
      
1095
      // are we allowed to do this?
1096
      if (!isAuthorized(session, pid, Permission.CHANGE_PERMISSION)) {
1097
          throw new NotAuthorized("4420", "not allowed by " + subject.getValue() + 
1098
          " on " + pid.getValue());  
1099
      }
1100
      
1101
      SystemMetadata systemMetadata = null;
1102
      try {
1103
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1104
          lock.lock();
1105
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1106

    
1107
          // does the request have the most current system metadata?
1108
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1109
             String msg = "The requested system metadata version number " + 
1110
                 serialVersion + " differs from the current version at " +
1111
                 systemMetadata.getSerialVersion().longValue() +
1112
                 ". Please get the latest copy in order to modify it.";
1113
             throw new InvalidRequest("4402", msg);
1114
          }
1115
          
1116
      } catch (Exception e) {
1117
          // convert Hazelcast RuntimeException to NotFound
1118
          throw new NotFound("4400", "No record found for: " + pid);
1119
        
1120
      }
1121
          
1122
      // set the access policy
1123
      systemMetadata.setAccessPolicy(accessPolicy);
1124
      
1125
      // update the system metadata
1126
      try {
1127
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1128
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1129
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1130
        
1131
      } catch (Exception e) {
1132
          // convert Hazelcast RuntimeException to ServiceFailure
1133
          throw new ServiceFailure("4430", e.getMessage());
1134
        
1135
      } finally {
1136
          lock.unlock();
1137
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1138
        
1139
      }
1140
    
1141
    // TODO: how do we know if the map was persisted?
1142
    success = true;
1143
    
1144
    return success;
1145
  }
1146

    
1147
  /**
1148
   * Full replacement of replication metadata in the system metadata for the 
1149
   * specified object, changes date system metadata modified
1150
   * 
1151
   * @param session - the Session object containing the credentials for the Subject
1152
   * @param pid - the object identifier for the given object to apply the policy
1153
   * @param replica - the replica to be updated
1154
   * @return
1155
   * @throws NotImplemented
1156
   * @throws NotAuthorized
1157
   * @throws ServiceFailure
1158
   * @throws InvalidRequest
1159
   * @throws NotFound
1160
   */
1161
  public boolean updateReplicationMetadata(Session session, Identifier pid,
1162
      Replica replica, long serialVersion) 
1163
      throws NotImplemented, NotAuthorized, ServiceFailure, InvalidRequest,
1164
      NotFound {
1165
      
1166
      // The lock to be used for this identifier
1167
      ILock lock = null;
1168
      
1169
      // get the subject
1170
      Subject subject = session.getSubject();
1171
      
1172
      // are we allowed to do this?
1173
      try {
1174
        // what is the controlling permission?
1175
        if (!isAuthorized(session, pid, Permission.WRITE)) {
1176
            throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1177
            " on " + pid.getValue());  
1178
        }
1179
        
1180
      } catch (InvalidToken e) {
1181
          throw new NotAuthorized("4851", "not allowed by " + subject.getValue() + 
1182
                  " on " + pid.getValue());  
1183
          
1184
      }
1185

    
1186
      SystemMetadata systemMetadata = null;
1187
      try {      
1188
          lock = HazelcastService.getInstance().getLock(pid.getValue());
1189
          lock.lock();
1190
          systemMetadata = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1191

    
1192
          // does the request have the most current system metadata?
1193
          if ( systemMetadata.getSerialVersion().longValue() != serialVersion ) {
1194
             String msg = "The requested system metadata version number " + 
1195
                 serialVersion + " differs from the current version at " +
1196
                 systemMetadata.getSerialVersion().longValue() +
1197
                 ". Please get the latest copy in order to modify it.";
1198
             throw new InvalidRequest("4853", msg);
1199
          }
1200
          
1201
      } catch (Exception e) { // Catch is generic since HZ throws RuntimeException
1202
        throw new NotFound("4854", "No record found for: " + pid.getValue() +
1203
            " : " + e.getMessage());
1204
        
1205
      }
1206
          
1207
      // set the status for the replica
1208
      List<Replica> replicas = systemMetadata.getReplicaList();
1209
      NodeReference replicaNode = replica.getReplicaMemberNode();
1210
      int index = 0;
1211
      for (Replica listedReplica: replicas) {
1212
          
1213
          // remove the replica that we are replacing
1214
          if ( replicaNode.getValue().equals(listedReplica.getReplicaMemberNode().getValue())) {
1215
              replicas.remove(index);
1216
              break;
1217
              
1218
          }
1219
          index++;
1220
      }
1221
      
1222
      // add the new replica item
1223
      replicas.add(replica);
1224
      systemMetadata.setReplicaList(replicas);
1225
      
1226
      // update the metadata
1227
      try {
1228
          systemMetadata.setSerialVersion(systemMetadata.getSerialVersion().add(BigInteger.ONE));
1229
          systemMetadata.setDateSysMetadataModified(Calendar.getInstance().getTime());
1230
          HazelcastService.getInstance().getSystemMetadataMap().put(systemMetadata.getIdentifier(), systemMetadata);
1231
        
1232
      } catch (Exception e) {
1233
          throw new ServiceFailure("4852", e.getMessage());
1234
      
1235
      } finally {
1236
          lock.unlock();
1237
          logMetacat.debug("Unlocked identifier " + pid.getValue());
1238
          
1239
      }
1240
    
1241
      return true;
1242
      
1243
  }
1244
  
1245
    @Override
1246
    public ObjectList listObjects(Session session, Date startTime, 
1247
            Date endTime, ObjectFormatIdentifier formatid, Boolean replicaStatus,
1248
            Integer start, Integer count)
1249
      throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1250
      ServiceFailure {
1251
      
1252
      ObjectList objectList = null;
1253
        try {
1254
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, formatid, replicaStatus, start, count);
1255
        } catch (Exception e) {
1256
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
1257
        }
1258

    
1259
        return objectList;
1260
  }
1261

    
1262
	@Override
1263
	public ChecksumAlgorithmList listChecksumAlgorithms()
1264
			throws ServiceFailure, NotImplemented {
1265
		ChecksumAlgorithmList cal = new ChecksumAlgorithmList();
1266
		cal.addAlgorithm("MD5");
1267
		cal.addAlgorithm("SHA-1");
1268
		return null;
1269
	}
1270
    
1271
}
(1-1/5)