Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.math.BigInteger;
29
import java.security.NoSuchAlgorithmException;
30
import java.sql.SQLException;
31
import java.util.Calendar;
32
import java.util.Date;
33
import java.util.List;
34
import java.util.Timer;
35

    
36
import javax.servlet.http.HttpServletRequest;
37

    
38
import org.apache.commons.io.IOUtils;
39
import org.apache.log4j.Logger;
40
import org.dataone.client.CNode;
41
import org.dataone.client.D1Client;
42
import org.dataone.client.MNode;
43
import org.dataone.client.auth.CertificateManager;
44
import org.dataone.configuration.Settings;
45
import org.dataone.service.exceptions.BaseException;
46
import org.dataone.service.exceptions.IdentifierNotUnique;
47
import org.dataone.service.exceptions.InsufficientResources;
48
import org.dataone.service.exceptions.InvalidRequest;
49
import org.dataone.service.exceptions.InvalidSystemMetadata;
50
import org.dataone.service.exceptions.InvalidToken;
51
import org.dataone.service.exceptions.NotAuthorized;
52
import org.dataone.service.exceptions.NotFound;
53
import org.dataone.service.exceptions.NotImplemented;
54
import org.dataone.service.exceptions.ServiceFailure;
55
import org.dataone.service.exceptions.SynchronizationFailed;
56
import org.dataone.service.exceptions.UnsupportedType;
57
import org.dataone.service.mn.tier1.v1.MNCore;
58
import org.dataone.service.mn.tier1.v1.MNRead;
59
import org.dataone.service.mn.tier2.v1.MNAuthorization;
60
import org.dataone.service.mn.tier3.v1.MNStorage;
61
import org.dataone.service.mn.tier4.v1.MNReplication;
62
import org.dataone.service.types.v1.Checksum;
63
import org.dataone.service.types.v1.Event;
64
import org.dataone.service.types.v1.Group;
65
import org.dataone.service.types.v1.Identifier;
66
import org.dataone.service.types.v1.Log;
67
import org.dataone.service.types.v1.LogEntry;
68
import org.dataone.service.types.v1.MonitorInfo;
69
import org.dataone.service.types.v1.MonitorList;
70
import org.dataone.service.types.v1.Node;
71
import org.dataone.service.types.v1.NodeList;
72
import org.dataone.service.types.v1.NodeReference;
73
import org.dataone.service.types.v1.NodeState;
74
import org.dataone.service.types.v1.NodeType;
75
import org.dataone.service.types.v1.ObjectFormatIdentifier;
76
import org.dataone.service.types.v1.ObjectList;
77
import org.dataone.service.types.v1.Permission;
78
import org.dataone.service.types.v1.Ping;
79
import org.dataone.service.types.v1.ReplicationStatus;
80
import org.dataone.service.types.v1.Schedule;
81
import org.dataone.service.types.v1.Service;
82
import org.dataone.service.types.v1.Services;
83
import org.dataone.service.types.v1.Session;
84
import org.dataone.service.types.v1.Subject;
85
import org.dataone.service.types.v1.Synchronization;
86
import org.dataone.service.types.v1.SystemMetadata;
87
import org.dataone.service.types.v1.util.ChecksumUtil;
88
import org.dataone.service.util.Constants;
89

    
90
import edu.ucsb.nceas.metacat.DocumentImpl;
91
import edu.ucsb.nceas.metacat.EventLog;
92
import edu.ucsb.nceas.metacat.IdentifierManager;
93
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
94
import edu.ucsb.nceas.metacat.MetacatHandler;
95
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
96
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
97
import edu.ucsb.nceas.metacat.properties.PropertyService;
98
import edu.ucsb.nceas.metacat.util.SystemUtil;
99
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
100

    
101
/**
102
 * Represents Metacat's implementation of the DataONE Member Node 
103
 * service API. Methods implement the various MN* interfaces, and methods common
104
 * to both Member Node and Coordinating Node interfaces are found in the
105
 * D1NodeService base class.
106
 * 
107
 * Implements:
108
 * MNCore.ping()
109
 * MNCore.getLogRecords()
110
 * MNCore.getObjectStatistics()
111
 * MNCore.getOperationStatistics()
112
 * MNCore.getStatus()
113
 * MNCore.getCapabilities()
114
 * MNRead.get()
115
 * MNRead.getSystemMetadata()
116
 * MNRead.describe()
117
 * MNRead.getChecksum()
118
 * MNRead.listObjects()
119
 * MNRead.synchronizationFailed()
120
 * MNAuthorization.isAuthorized()
121
 * MNAuthorization.setAccessPolicy()
122
 * MNStorage.create()
123
 * MNStorage.update()
124
 * MNStorage.delete()
125
 * MNReplication.replicate()
126
 * 
127
 */
128
public class MNodeService extends D1NodeService 
129
    implements MNAuthorization, MNCore, MNRead, MNReplication, MNStorage {
130

    
131
    /* the logger instance */
132
    private Logger logMetacat = null;
133
    
134
    /* A reference to a remote Memeber Node */
135
    private MNode mn;
136
    
137
    /* A reference to a Coordinating Node */
138
    private CNode cn;
139

    
140

    
141
    /**
142
     * Singleton accessor to get an instance of MNodeService.
143
     * 
144
     * @return instance - the instance of MNodeService
145
     */
146
    public static MNodeService getInstance(HttpServletRequest request) {
147
        return new MNodeService(request);
148
    }
149

    
150
    /**
151
     * Constructor, private for singleton access
152
     */
153
    private MNodeService(HttpServletRequest request) {
154
        super(request);
155
        logMetacat = Logger.getLogger(MNodeService.class);
156
        
157
        // set the Member Node certificate file location
158
        CertificateManager.getInstance().setCertificateLocation(Settings.getConfiguration().getString("D1Client.certificate.file"));
159
    }
160

    
161
    /**
162
     * Deletes an object from the Member Node, where the object is either a 
163
     * data object or a science metadata object.
164
     * 
165
     * @param session - the Session object containing the credentials for the Subject
166
     * @param pid - The object identifier to be deleted
167
     * 
168
     * @return pid - the identifier of the object used for the deletion
169
     * 
170
     * @throws InvalidToken
171
     * @throws ServiceFailure
172
     * @throws NotAuthorized
173
     * @throws NotFound
174
     * @throws NotImplemented
175
     * @throws InvalidRequest
176
     */
177
    @Override
178
    public Identifier delete(Session session, Identifier pid) 
179
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
180

    
181
        String localId = null;
182
        boolean allowed = false;
183
        String username = Constants.SUBJECT_PUBLIC;
184
        String[] groupnames = null;
185
        if (session == null) {
186
        	throw new InvalidToken("1330", "No session has been provided");
187
        } else {
188
            username = session.getSubject().getValue();
189
            if (session.getSubjectInfo() != null) {
190
                List<Group> groupList = session.getSubjectInfo().getGroupList();
191
                if (groupList != null) {
192
                    groupnames = new String[groupList.size()];
193
                    for (int i = 0; i > groupList.size(); i++) {
194
                        groupnames[i] = groupList.get(i).getGroupName();
195
                    }
196
                }
197
            }
198
        }
199

    
200
        // do we have a valid pid?
201
        if (pid == null || pid.getValue().trim().equals("")) {
202
            throw new ServiceFailure("1350", "The provided identifier was invalid.");
203
        }
204

    
205
        // check for the existing identifier
206
        try {
207
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
208
        } catch (McdbDocNotFoundException e) {
209
            throw new NotFound("1340", "The object with the provided " + "identifier was not found.");
210
        }
211

    
212
        // does the subject have DELETE (a D1 CHANGE_PERMISSION level) priveleges on the pid?
213
        try {
214
			allowed = isAuthorized(session, pid, Permission.CHANGE_PERMISSION);
215
		} catch (InvalidRequest e) {
216
            throw new ServiceFailure("1350", e.getDescription());
217
		}
218
            
219

    
220
        if (allowed) {
221
            try {
222
                // delete the document
223
                DocumentImpl.delete(localId, username, groupnames, null);
224
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, localId, Event.DELETE.xmlValue());
225

    
226
                // archive it
227
                SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
228
                sysMeta.setArchived(true);
229
                HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
230
                
231
                // remove the system metadata for it
232
                //HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
233
                
234
            } catch (McdbDocNotFoundException e) {
235
                throw new NotFound("1340", "The provided identifier was invalid.");
236

    
237
            } catch (SQLException e) {
238
                throw new ServiceFailure("1350", "There was a problem deleting the object." + "The error message was: " + e.getMessage());
239

    
240
            } catch (InsufficientKarmaException e) {
241
                throw new NotAuthorized("1320", "The provided identity does not have " + "permission to DELETE objects on the Member Node.");
242

    
243
            } catch (Exception e) { // for some reason DocumentImpl throws a general Exception
244
                throw new ServiceFailure("1350", "There was a problem deleting the object." + "The error message was: " + e.getMessage());
245
            }
246

    
247
        } else {
248
            throw new NotAuthorized("1320", "The provided identity does not have " + "permission to DELETE objects on the Member Node.");
249
        }
250

    
251
        return pid;
252
    }
253

    
254
    /**
255
     * Updates an existing object by creating a new object identified by 
256
     * newPid on the Member Node which explicitly obsoletes the object 
257
     * identified by pid through appropriate changes to the SystemMetadata 
258
     * of pid and newPid
259
     * 
260
     * @param session - the Session object containing the credentials for the Subject
261
     * @param pid - The identifier of the object to be updated
262
     * @param object - the new object bytes
263
     * @param sysmeta - the new system metadata describing the object
264
     * 
265
     * @return newPid - the identifier of the new object
266
     * 
267
     * @throws InvalidToken
268
     * @throws ServiceFailure
269
     * @throws NotAuthorized
270
     * @throws NotFound
271
     * @throws NotImplemented
272
     * @throws IdentifierNotUnique
273
     * @throws UnsupportedType
274
     * @throws InsufficientResources
275
     * @throws InvalidSystemMetadata
276
     * @throws InvalidRequest
277
     */
278
    @Override
279
    public Identifier update(Session session, Identifier pid, InputStream object, 
280
        Identifier newPid, SystemMetadata sysmeta) 
281
        throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
282
        UnsupportedType, InsufficientResources, NotFound, 
283
        InvalidSystemMetadata, NotImplemented, InvalidRequest {
284

    
285
        String localId = null;
286
        boolean allowed = false;
287
        boolean isScienceMetadata = false;
288
        
289
        if (session == null) {
290
        	throw new InvalidToken("1210", "No session has been provided");
291
        }
292
        Subject subject = session.getSubject();
293

    
294
        // do we have a valid pid?
295
        if (pid == null || pid.getValue().trim().equals("")) {
296
            throw new InvalidRequest("1202", "The provided identifier was invalid.");
297
            
298
        }
299

    
300
        // check for the existing identifier
301
        try {
302
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
303
            
304
        } catch (McdbDocNotFoundException e) {
305
            throw new InvalidRequest("1202", "The object with the provided " + 
306
                "identifier was not found.");
307
            
308
        }
309
        
310
        // set the originating node
311
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
312
        sysmeta.setOriginMemberNode(originMemberNode);
313
        
314
        // set the submitter to match the certificate
315
        sysmeta.setSubmitter(subject);
316
        // set the dates
317
        Date now = Calendar.getInstance().getTime();
318
        sysmeta.setDateSysMetadataModified(now);
319
        sysmeta.setDateUploaded(now);
320

    
321
        // does the subject have WRITE ( == update) priveleges on the pid?
322
        allowed = isAuthorized(session, pid, Permission.WRITE);
323

    
324
        if (allowed) {
325
        	
326
        	// check quality of SM
327
        	if (sysmeta.getObsoletedBy() != null) {
328
        		throw new InvalidSystemMetadata("1300", "Cannot include obsoletedBy when updating object");
329
        	}
330
        	if (sysmeta.getObsoletes() != null && !sysmeta.getObsoletes().getValue().equals(pid.getValue())) {
331
        		throw new InvalidSystemMetadata("1300", "The identifier provided in obsoletes does not match old Identifier");
332
        	}
333

    
334
            // get the existing system metadata for the object
335
            SystemMetadata existingSysMeta = getSystemMetadata(session, pid);
336

    
337
            // add the newPid to the obsoletedBy list for the existing sysmeta
338
            existingSysMeta.setObsoletedBy(newPid);
339

    
340
            // then update the existing system metadata
341
            updateSystemMetadata(existingSysMeta);
342

    
343
            // prep the new system metadata, add pid to the affected lists
344
            sysmeta.setObsoletes(pid);
345
            //sysmeta.addDerivedFrom(pid);
346

    
347
            isScienceMetadata = isScienceMetadata(sysmeta);
348

    
349
            // do we have XML metadata or a data object?
350
            if (isScienceMetadata) {
351

    
352
                // update the science metadata XML document
353
                // TODO: handle non-XML metadata/data documents (like netCDF)
354
                // TODO: don't put objects into memory using stream to string
355
                String objectAsXML = "";
356
                try {
357
                    objectAsXML = IOUtils.toString(object, "UTF-8");
358
                    localId = insertOrUpdateDocument(objectAsXML, newPid, session, "update");
359
                    // register the newPid and the generated localId
360
                    if (newPid != null) {
361
                        IdentifierManager.getInstance().createMapping(newPid.getValue(), localId);
362

    
363
                    }
364

    
365
                } catch (IOException e) {
366
                    String msg = "The Node is unable to create the object. " + "There was a problem converting the object to XML";
367
                    logMetacat.info(msg);
368
                    throw new ServiceFailure("1310", msg + ": " + e.getMessage());
369

    
370
                }
371

    
372
            } else {
373

    
374
                // update the data object
375
                localId = insertDataObject(object, newPid, session);
376

    
377
            }
378

    
379
            // and insert the new system metadata
380
            insertSystemMetadata(sysmeta);
381

    
382
            // log the update event
383
            EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), subject.getValue(), localId, Event.UPDATE.toString());
384

    
385
        } else {
386
            throw new NotAuthorized("1200", "The provided identity does not have " + "permission to UPDATE the object identified by " + pid.getValue()
387
                    + " on the Member Node.");
388
        }
389

    
390
        return newPid;
391
    }
392

    
393
    public Identifier create(Session session, Identifier pid, InputStream object, SystemMetadata sysmeta) throws InvalidToken, ServiceFailure, NotAuthorized,
394
            IdentifierNotUnique, UnsupportedType, InsufficientResources, InvalidSystemMetadata, NotImplemented, InvalidRequest {
395

    
396
        // check for null session
397
        if (session == null) {
398
          throw new InvalidToken("1110", "Session is required to WRITE to the Node.");
399
        }
400
        // set the submitter to match the certificate
401
        sysmeta.setSubmitter(session.getSubject());
402
        // set the originating node
403
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
404
        sysmeta.setOriginMemberNode(originMemberNode);
405
        sysmeta.setArchived(false);
406

    
407
        // set the dates
408
        Date now = Calendar.getInstance().getTime();
409
        sysmeta.setDateSysMetadataModified(now);
410
        sysmeta.setDateUploaded(now);
411
        
412
        // set the serial version
413
        sysmeta.setSerialVersion(BigInteger.ZERO);
414
        
415
        // call the shared impl
416
        return super.create(session, pid, object, sysmeta);
417
    }
418

    
419
    /**
420
     * Called by a Coordinating Node to request that the Member Node create a 
421
     * copy of the specified object by retrieving it from another Member 
422
     * Node and storing it locally so that it can be made accessible to 
423
     * the DataONE system.
424
     * 
425
     * @param session - the Session object containing the credentials for the Subject
426
     * @param sysmeta - Copy of the CN held system metadata for the object
427
     * @param sourceNode - A reference to node from which the content should be 
428
     *                     retrieved. The reference should be resolved by 
429
     *                     checking the CN node registry.
430
     * 
431
     * @return true if the replication succeeds
432
     * 
433
     * @throws ServiceFailure
434
     * @throws NotAuthorized
435
     * @throws NotImplemented
436
     * @throws UnsupportedType
437
     * @throws InsufficientResources
438
     * @throws InvalidRequest
439
     */
440
    @Override
441
    public boolean replicate(Session session, SystemMetadata sysmeta,
442
            NodeReference sourceNode) throws NotImplemented, ServiceFailure,
443
            NotAuthorized, InvalidRequest, InsufficientResources,
444
            UnsupportedType {
445

    
446
        if (session != null && sysmeta != null && sourceNode != null) {
447
            logMetacat.info("MNodeService.replicate() called with parameters: \n" +
448
                            "\tSession.Subject      = "                           +
449
                            session.getSubject().getValue() + "\n"                +
450
                            "\tSystemMetadata       = " + sysmeta.toString()      +
451
                            "\n" + "\tSource NodeReference ="                     +
452
                            sourceNode.getValue());
453
        }
454
        boolean result = false;
455
        String nodeIdStr = null;
456
        NodeReference nodeId = null;
457

    
458
        // get the referenced object
459
        Identifier pid = sysmeta.getIdentifier();
460

    
461
        // get from the membernode
462
        // TODO: switch credentials for the server retrieval?
463
        this.mn = D1Client.getMN(sourceNode);
464
        this.cn = D1Client.getCN();
465
        InputStream object = null;
466
        Session thisNodeSession = null;
467
        SystemMetadata localSystemMetadata = null;
468
        BaseException failure = null;
469
        String localId = null;
470
        
471
        // TODO: check credentials
472
        // cannot be called by public
473
        if (session.getSubject() == null) {
474
            String msg = "No session was provided.";
475
            failure = new NotAuthorized("2152", msg);
476
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
477
            logMetacat.info(msg);
478
            return true;
479
        }
480

    
481

    
482
        // get the local node id
483
        try {
484
            nodeIdStr = PropertyService.getProperty("dataone.memberNodeId");
485
            nodeId = new NodeReference();
486
            nodeId.setValue(nodeIdStr);
487

    
488
        } catch (PropertyNotFoundException e1) {
489
            String msg = "Couldn't get dataone.memberNodeId property: " + e1.getMessage();
490
            failure = new ServiceFailure("2151", msg);
491
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
492
            logMetacat.error(msg);
493
            return true;
494

    
495
        }
496
        
497

    
498
        try {
499
            // do we already have a replica?
500
            try {
501
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
502

    
503
                String msg = "Can't read the object bytes properly, replica is invalid.";
504
                ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
505
                
506
                // if we have a local id, get the local object
507
                try {
508
                    object = MetacatHandler.read(localId);
509
                    
510
                } catch (Exception e) {
511
                    // let the CN know that the replication failed
512
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);  
513
                    throw serviceFailure;
514
                    
515
                }
516

    
517
            } catch (McdbDocNotFoundException e) {
518
                logMetacat.info("No replica found. Continuing.");
519
                
520
            }
521
            
522
            // no local replica, get a replica
523
            if ( object == null ) {
524
                // session should be null to use the default certificate
525
                // location set in the Certificate manager
526
                object = mn.getReplica(thisNodeSession, pid);
527
                logMetacat.info("MNodeService.replicate() called for identifier "
528
                                + pid.getValue());
529

    
530
            }
531

    
532
        } catch (InvalidToken e) {            
533
            String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
534
            failure = new ServiceFailure("2151", msg);
535
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
536
            logMetacat.error(msg);
537
            throw new ServiceFailure("2151", msg);
538

    
539
        } catch (NotFound e) {
540
            String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
541
            failure = new ServiceFailure("2151", msg);
542
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
543
            logMetacat.error(msg);
544
            throw new ServiceFailure("2151", msg);
545

    
546
        }
547

    
548
        // verify checksum on the object, if supported
549
        if (object.markSupported()) {
550
            Checksum givenChecksum = sysmeta.getChecksum();
551
            Checksum computedChecksum = null;
552
            try {
553
                computedChecksum = ChecksumUtil.checksum(object,
554
                        givenChecksum.getAlgorithm());
555
                object.reset();
556

    
557
            } catch (Exception e) {
558
                String msg = "Error computing checksum on replica: "
559
                        + e.getMessage();
560
                ServiceFailure sf = new ServiceFailure("2151", msg);
561
                sf.initCause(e);
562
                throw sf;
563
            }
564
            if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
565
                logMetacat.debug("Given    checksum for " + pid.getValue() + 
566
                    "is " + givenChecksum.getValue());
567
                logMetacat.debug("Computed checksum for " + pid.getValue() + 
568
                    "is " + computedChecksum.getValue());
569
                throw new ServiceFailure("2151",
570
                        "Computed checksum does not match declared checksum");
571
            }
572
        }
573

    
574
        // add it to local store
575
        Identifier retPid;
576
        try {
577
            // skip the MN.create -- this mutates the system metadata and we
578
            // dont want it to
579
            if ( localId == null ) {
580
                
581
                retPid = super.create(session, pid, object, sysmeta);
582
                result = (retPid.getValue().equals(pid.getValue()));
583
            }
584
            
585
        } catch (InvalidToken e) {
586
            String msg = "Could not save object to local store (InvalidToken): " + e.getMessage();
587
            failure = new ServiceFailure("2151", msg);
588
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
589
            logMetacat.error(msg);
590
            throw new ServiceFailure("2151", msg);
591
        
592
        } catch (IdentifierNotUnique e) {
593
            String msg = "Could not save object to local store (IdentifierNotUnique): " + e.getMessage();
594
            failure = new ServiceFailure("2151", msg);
595
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
596
            logMetacat.error(msg);
597
            throw new ServiceFailure("2151", msg);
598
        
599
        } catch (InvalidSystemMetadata e) {
600
            String msg = "Could not save object to local store (InvalidSystemMetadata): " + e.getMessage();
601
            failure = new ServiceFailure("2151", msg);
602
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
603
            logMetacat.error(msg);
604
            throw new ServiceFailure("2151", msg);
605
            
606
        }
607

    
608
        // finish by setting the replication status
609
        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
610
        return result;
611

    
612
    }
613

    
614
    /**
615
     * Return the object identified by the given object identifier
616
     * 
617
     * @param session - the Session object containing the credentials for the Subject
618
     * @param pid - the object identifier for the given object
619
     * 
620
     * @return inputStream - the input stream of the given object
621
     * 
622
     * @throws InvalidToken
623
     * @throws ServiceFailure
624
     * @throws NotAuthorized
625
     * @throws InvalidRequest
626
     * @throws NotImplemented
627
     */
628
    @Override
629
    public InputStream get(Session session, Identifier pid) 
630
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
631

    
632
        return super.get(session, pid);
633

    
634
    }
635

    
636
    /**
637
     * Returns a Checksum for the specified object using an accepted hashing algorithm
638
     * 
639
     * @param session - the Session object containing the credentials for the Subject
640
     * @param pid - the object identifier for the given object
641
     * @param algorithm -  the name of an algorithm that will be used to compute 
642
     *                     a checksum of the bytes of the object
643
     * 
644
     * @return checksum - the checksum of the given object
645
     * 
646
     * @throws InvalidToken
647
     * @throws ServiceFailure
648
     * @throws NotAuthorized
649
     * @throws NotFound
650
     * @throws InvalidRequest
651
     * @throws NotImplemented
652
     */
653
    @Override
654
    public Checksum getChecksum(Session session, Identifier pid, String algorithm) 
655
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
656
        InvalidRequest, NotImplemented {
657

    
658
        Checksum checksum = null;
659

    
660
        InputStream inputStream = get(session, pid);
661

    
662
        try {
663
            checksum = ChecksumUtil.checksum(inputStream, algorithm);
664

    
665
        } catch (NoSuchAlgorithmException e) {
666
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
667
                    + e.getMessage());
668
        } catch (IOException e) {
669
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
670
                    + e.getMessage());
671
        }
672

    
673
        if (checksum == null) {
674
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned.");
675
        }
676

    
677
        return checksum;
678
    }
679

    
680
    /**
681
     * Return the system metadata for a given object
682
     * 
683
     * @param session - the Session object containing the credentials for the Subject
684
     * @param pid - the object identifier for the given object
685
     * 
686
     * @return inputStream - the input stream of the given system metadata object
687
     * 
688
     * @throws InvalidToken
689
     * @throws ServiceFailure
690
     * @throws NotAuthorized
691
     * @throws NotFound
692
     * @throws InvalidRequest
693
     * @throws NotImplemented
694
     */
695
    @Override
696
    public SystemMetadata getSystemMetadata(Session session, Identifier pid) 
697
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
698
        NotImplemented {
699

    
700
        return super.getSystemMetadata(session, pid);
701
    }
702

    
703
    /**
704
     * Retrieve the list of objects present on the MN that match the calling parameters
705
     * 
706
     * @param session - the Session object containing the credentials for the Subject
707
     * @param startTime - Specifies the beginning of the time range from which 
708
     *                    to return object (>=)
709
     * @param endTime - Specifies the beginning of the time range from which 
710
     *                  to return object (>=)
711
     * @param objectFormat - Restrict results to the specified object format
712
     * @param replicaStatus - Indicates if replicated objects should be returned in the list
713
     * @param start - The zero-based index of the first value, relative to the 
714
     *                first record of the resultset that matches the parameters.
715
     * @param count - The maximum number of entries that should be returned in 
716
     *                the response. The Member Node may return less entries 
717
     *                than specified in this value.
718
     * 
719
     * @return objectList - the list of objects matching the criteria
720
     * 
721
     * @throws InvalidToken
722
     * @throws ServiceFailure
723
     * @throws NotAuthorized
724
     * @throws InvalidRequest
725
     * @throws NotImplemented
726
     */
727
    @Override
728
    public ObjectList listObjects(Session session, Date startTime, Date endTime, ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
729
            Integer count) throws NotAuthorized, InvalidRequest, NotImplemented, ServiceFailure, InvalidToken {
730

    
731
        ObjectList objectList = null;
732

    
733
        try {
734
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, objectFormatId, replicaStatus, start, count);
735
        } catch (Exception e) {
736
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
737
        }
738

    
739
        return objectList;
740
    }
741

    
742
    /**
743
     * Return a description of the node's capabilities and services.
744
     * 
745
     * @return node - the technical capabilities of the Member Node
746
     * 
747
     * @throws ServiceFailure
748
     * @throws NotAuthorized
749
     * @throws InvalidRequest
750
     * @throws NotImplemented
751
     */
752
    @Override
753
    public Node getCapabilities() 
754
        throws NotImplemented, ServiceFailure {
755

    
756
        String nodeName = null;
757
        String nodeId = null;
758
        String subject = null;
759
        String contactSubject = null;
760
        String nodeDesc = null;
761
        String nodeTypeString = null;
762
        NodeType nodeType = null;
763
        String mnCoreServiceVersion = null;
764
        String mnReadServiceVersion = null;
765
        String mnAuthorizationServiceVersion = null;
766
        String mnStorageServiceVersion = null;
767
        String mnReplicationServiceVersion = null;
768

    
769
        boolean nodeSynchronize = false;
770
        boolean nodeReplicate = false;
771
        boolean mnCoreServiceAvailable = false;
772
        boolean mnReadServiceAvailable = false;
773
        boolean mnAuthorizationServiceAvailable = false;
774
        boolean mnStorageServiceAvailable = false;
775
        boolean mnReplicationServiceAvailable = false;
776

    
777
        try {
778
            // get the properties of the node based on configuration information
779
            nodeName = PropertyService.getProperty("dataone.nodeName");
780
            nodeId = PropertyService.getProperty("dataone.memberNodeId");
781
            subject = PropertyService.getProperty("dataone.subject");
782
            contactSubject = PropertyService.getProperty("dataone.contactSubject");
783
            nodeDesc = PropertyService.getProperty("dataone.nodeDescription");
784
            nodeTypeString = PropertyService.getProperty("dataone.nodeType");
785
            nodeType = NodeType.convert(nodeTypeString);
786
            nodeSynchronize = new Boolean(PropertyService.getProperty("dataone.nodeSynchronize")).booleanValue();
787
            nodeReplicate = new Boolean(PropertyService.getProperty("dataone.nodeReplicate")).booleanValue();
788

    
789
            mnCoreServiceVersion = PropertyService.getProperty("dataone.mnCore.serviceVersion");
790
            mnReadServiceVersion = PropertyService.getProperty("dataone.mnRead.serviceVersion");
791
            mnAuthorizationServiceVersion = PropertyService.getProperty("dataone.mnAuthorization.serviceVersion");
792
            mnStorageServiceVersion = PropertyService.getProperty("dataone.mnStorage.serviceVersion");
793
            mnReplicationServiceVersion = PropertyService.getProperty("dataone.mnReplication.serviceVersion");
794

    
795
            mnCoreServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnCore.serviceAvailable")).booleanValue();
796
            mnReadServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnRead.serviceAvailable")).booleanValue();
797
            mnAuthorizationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnAuthorization.serviceAvailable")).booleanValue();
798
            mnStorageServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnStorage.serviceAvailable")).booleanValue();
799
            mnReplicationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnReplication.serviceAvailable")).booleanValue();
800

    
801
            // Set the properties of the node based on configuration information and
802
            // calls to current status methods
803
            String serviceName = SystemUtil.getContextURL() + "/" + PropertyService.getProperty("dataone.serviceName");
804
            Node node = new Node();
805
            node.setBaseURL(serviceName + "/" + nodeTypeString);
806
            node.setDescription(nodeDesc);
807

    
808
            // set the node's health information
809
            node.setState(NodeState.UP);
810
            
811
            // set the ping response to the current value
812
            Ping canPing = new Ping();
813
            canPing.setSuccess(false);
814
            try {
815
            	Date pingDate = ping();
816
                canPing.setSuccess(pingDate != null);
817
            } catch (BaseException e) {
818
                e.printStackTrace();
819
                // guess it can't be pinged
820
            }
821
            
822
            node.setPing(canPing);
823

    
824
            NodeReference identifier = new NodeReference();
825
            identifier.setValue(nodeId);
826
            node.setIdentifier(identifier);
827
            Subject s = new Subject();
828
            s.setValue(subject);
829
            node.addSubject(s);
830
            Subject contact = new Subject();
831
            contact.setValue(contactSubject);
832
            node.addContactSubject(contact);
833
            node.setName(nodeName);
834
            node.setReplicate(nodeReplicate);
835
            node.setSynchronize(nodeSynchronize);
836

    
837
            // services: MNAuthorization, MNCore, MNRead, MNReplication, MNStorage
838
            Services services = new Services();
839

    
840
            Service sMNCore = new Service();
841
            sMNCore.setName("MNCore");
842
            sMNCore.setVersion(mnCoreServiceVersion);
843
            sMNCore.setAvailable(mnCoreServiceAvailable);
844

    
845
            Service sMNRead = new Service();
846
            sMNRead.setName("MNRead");
847
            sMNRead.setVersion(mnReadServiceVersion);
848
            sMNRead.setAvailable(mnReadServiceAvailable);
849

    
850
            Service sMNAuthorization = new Service();
851
            sMNAuthorization.setName("MNAuthorization");
852
            sMNAuthorization.setVersion(mnAuthorizationServiceVersion);
853
            sMNAuthorization.setAvailable(mnAuthorizationServiceAvailable);
854

    
855
            Service sMNStorage = new Service();
856
            sMNStorage.setName("MNStorage");
857
            sMNStorage.setVersion(mnStorageServiceVersion);
858
            sMNStorage.setAvailable(mnStorageServiceAvailable);
859

    
860
            Service sMNReplication = new Service();
861
            sMNReplication.setName("MNReplication");
862
            sMNReplication.setVersion(mnReplicationServiceVersion);
863
            sMNReplication.setAvailable(mnReplicationServiceAvailable);
864

    
865
            services.addService(sMNRead);
866
            services.addService(sMNCore);
867
            services.addService(sMNAuthorization);
868
            services.addService(sMNStorage);
869
            services.addService(sMNReplication);
870
            node.setServices(services);
871

    
872
            // Set the schedule for synchronization
873
            Synchronization synchronization = new Synchronization();
874
            Schedule schedule = new Schedule();
875
            Date now = new Date();
876
            schedule.setYear(PropertyService.getProperty("dataone.nodeSynchronization.schedule.year"));
877
            schedule.setMon(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mon"));
878
            schedule.setMday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mday"));
879
            schedule.setWday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.wday"));
880
            schedule.setHour(PropertyService.getProperty("dataone.nodeSynchronization.schedule.hour"));
881
            schedule.setMin(PropertyService.getProperty("dataone.nodeSynchronization.schedule.min"));
882
            schedule.setSec(PropertyService.getProperty("dataone.nodeSynchronization.schedule.sec"));
883
            synchronization.setSchedule(schedule);
884
            synchronization.setLastHarvested(now);
885
            synchronization.setLastCompleteHarvest(now);
886
            node.setSynchronization(synchronization);
887

    
888
            node.setType(nodeType);
889
            return node;
890

    
891
        } catch (PropertyNotFoundException pnfe) {
892
            String msg = "MNodeService.getCapabilities(): " + "property not found: " + pnfe.getMessage();
893
            logMetacat.error(msg);
894
            throw new ServiceFailure("2162", msg);
895
        }
896
    }
897

    
898
    /**
899
     * Returns the number of operations that have been serviced by the node 
900
     * over time periods of one and 24 hours.
901
     * 
902
     * @param session - the Session object containing the credentials for the Subject
903
     * @param period - An ISO8601 compatible DateTime range specifying the time 
904
     *                 range for which to return operation statistics.
905
     * @param requestor - Limit to operations performed by given requestor identity.
906
     * @param event -  Enumerated value indicating the type of event being examined
907
     * @param format - Limit to events involving objects of the specified format
908
     * 
909
     * @return the desired log records
910
     * 
911
     * @throws InvalidToken
912
     * @throws ServiceFailure
913
     * @throws NotAuthorized
914
     * @throws InvalidRequest
915
     * @throws NotImplemented
916
     */
917
    public MonitorList getOperationStatistics(Session session, Date startTime, 
918
        Date endTime, Subject requestor, Event event, ObjectFormatIdentifier formatId)
919
        throws NotImplemented, ServiceFailure, NotAuthorized, InsufficientResources, UnsupportedType {
920

    
921
        MonitorList monitorList = new MonitorList();
922

    
923
        try {
924

    
925
            // get log records first
926
            Log logs = getLogRecords(session, startTime, endTime, event, 0, null);
927

    
928
            // TODO: aggregate by day or hour -- needs clarification
929
            int count = 1;
930
            for (LogEntry logEntry : logs.getLogEntryList()) {
931
                Identifier pid = logEntry.getIdentifier();
932
                Date logDate = logEntry.getDateLogged();
933
                // if we are filtering by format
934
                if (formatId != null) {
935
                    SystemMetadata sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
936
                    if (!sysmeta.getFormatId().getValue().equals(formatId.getValue())) {
937
                        // does not match
938
                        continue;
939
                    }
940
                }
941
                MonitorInfo item = new MonitorInfo();
942
                item.setCount(count);
943
                item.setDate(new java.sql.Date(logDate.getTime()));
944
                monitorList.addMonitorInfo(item);
945

    
946
            }
947
        } catch (Exception e) {
948
            e.printStackTrace();
949
            throw new ServiceFailure("2081", "Could not retrieve statistics: " + e.getMessage());
950
        }
951

    
952
        return monitorList;
953

    
954
    }
955

    
956
    /**
957
     * A callback method used by a CN to indicate to a MN that it cannot 
958
     * complete synchronization of the science metadata identified by pid.  Log
959
     * the event in the metacat event log.
960
     * 
961
     * @param session
962
     * @param syncFailed
963
     * 
964
     * @throws ServiceFailure
965
     * @throws NotAuthorized
966
     * @throws NotImplemented
967
     */
968
    @Override
969
    public boolean synchronizationFailed(Session session, SynchronizationFailed syncFailed) 
970
        throws NotImplemented, ServiceFailure, NotAuthorized {
971

    
972
        String localId;
973

    
974
        try {
975
            localId = IdentifierManager.getInstance().getLocalId(syncFailed.getPid());
976
        } catch (McdbDocNotFoundException e) {
977
            throw new ServiceFailure("2161", "The identifier specified by " + syncFailed.getPid() + " was not found on this node.");
978

    
979
        }
980
        // TODO: update the CN URL below when the CNRead.SynchronizationFailed
981
        // method is changed to include the URL as a parameter
982
        logMetacat.debug("Synchronization for the object identified by " + syncFailed.getPid() + " failed from " + syncFailed.getNodeId()
983
                + " Logging the event to the Metacat EventLog as a 'syncFailed' event.");
984
        // TODO: use the event type enum when the SYNCHRONIZATION_FAILED event is added
985
        String principal = Constants.SUBJECT_PUBLIC;
986
        if (session != null && session.getSubject() != null) {
987
          principal = session.getSubject().getValue();
988
        }
989
        try {
990
          EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), principal, localId, "synchronization_failed");
991
        } catch (Exception e) {
992
            throw new ServiceFailure("2161", "Could not log the error for: " + syncFailed.getPid());
993
        }
994
        //EventLog.getInstance().log("CN URL WILL GO HERE", 
995
        //  session.getSubject().getValue(), localId, Event.SYNCHRONIZATION_FAILED);
996
        return true;
997

    
998
    }
999

    
1000
    /**
1001
     * Essentially a get() but with different logging behavior
1002
     */
1003
    @Override
1004
    public InputStream getReplica(Session session, Identifier pid) 
1005
        throws NotAuthorized, NotImplemented, ServiceFailure, InvalidToken {
1006

    
1007
        logMetacat.info("MNodeService.getReplica() called.");
1008

    
1009
        // cannot be called by public
1010
        if (session == null) {
1011
        	throw new InvalidToken("2183", "No session was provided.");
1012
        }
1013
        
1014
        logMetacat.info("MNodeService.getReplica() called with parameters: \n" +
1015
             "\tSession.Subject      = " + session.getSubject().getValue() + "\n" +
1016
             "\tIdentifier           = " + pid.getValue());
1017

    
1018
        InputStream inputStream = null; // bytes to be returned
1019
        handler = new MetacatHandler(new Timer());
1020
        boolean allowed = false;
1021
        String localId; // the metacat docid for the pid
1022

    
1023
        // get the local docid from Metacat
1024
        try {
1025
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1026
        } catch (McdbDocNotFoundException e) {
1027
            throw new ServiceFailure("2181", "The object specified by " + 
1028
                    pid.getValue() + " does not exist at this node.");
1029
            
1030
        }
1031

    
1032
        Subject targetNodeSubject = session.getSubject();
1033

    
1034
        // check for authorization to replicate, null session to act as this source MN
1035
        try {
1036
            allowed = D1Client.getCN().isNodeAuthorized(null, targetNodeSubject, pid);
1037
        } catch (InvalidToken e1) {
1038
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1039
                + e1.getMessage());
1040
            
1041
        } catch (NotFound e1) {
1042
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1043
                    + e1.getMessage());
1044

    
1045
        } catch (InvalidRequest e1) {
1046
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1047
                    + e1.getMessage());
1048

    
1049
        }
1050

    
1051
        logMetacat.info("Called D1Client.isNodeAuthorized(). Allowed = " + allowed +
1052
            " for identifier " + pid.getValue());
1053

    
1054
        // if the person is authorized, perform the read
1055
        if (allowed) {
1056
            try {
1057
                inputStream = MetacatHandler.read(localId);
1058
            } catch (Exception e) {
1059
                throw new ServiceFailure("1020", "The object specified by " + 
1060
                    pid.getValue() + "could not be returned due to error: " + e.getMessage());
1061
            }
1062
        }
1063

    
1064
        // if we fail to set the input stream
1065
        if (inputStream == null) {
1066
            throw new ServiceFailure("2181", "The object specified by " + 
1067
                pid.getValue() + "does not exist at this node.");
1068
        }
1069

    
1070
        // log the replica event
1071
        String principal = null;
1072
        if (session.getSubject() != null) {
1073
            principal = session.getSubject().getValue();
1074
        }
1075
        EventLog.getInstance().log(request.getRemoteAddr(), 
1076
            request.getHeader("User-Agent"), principal, localId, "replicate");
1077

    
1078
        return inputStream;
1079
    }
1080

    
1081
    /**
1082
     * A method to notify the Member Node that the authoritative copy of 
1083
     * system metadata on the Coordinating Nodes has changed.
1084
     * 
1085
     * @param session   Session information that contains the identity of the 
1086
     *                  calling user as retrieved from the X.509 certificate 
1087
     *                  which must be traceable to the CILogon service.
1088
     * @param serialVersion   The serialVersion of the system metadata
1089
     * @param dateSysMetaLastModified  The time stamp for when the system metadata was changed
1090
     * @throws NotImplemented
1091
     * @throws ServiceFailure
1092
     * @throws NotAuthorized
1093
     * @throws InvalidRequest
1094
     * @throws InvalidToken
1095
     */
1096
    public boolean systemMetadataChanged(Session session, Identifier pid,
1097
        long serialVersion, Date dateSysMetaLastModified) 
1098
        throws NotImplemented, ServiceFailure, NotAuthorized, InvalidRequest,
1099
        InvalidToken {
1100
        
1101
        SystemMetadata currentLocalSysMeta = null;
1102
        SystemMetadata newSysMeta = null;
1103
        CNode cn = D1Client.getCN();
1104
        NodeList nodeList = null;
1105
        Subject callingSubject = null;
1106
        boolean allowed = false;
1107
        
1108
        // are we allowed to call this?
1109
        callingSubject = session.getSubject();
1110
        nodeList = cn.listNodes();
1111
        
1112
        for(Node node : nodeList.getNodeList()) {
1113
            // must be a CN
1114
            if ( node.getType().equals(NodeType.CN)) {
1115
               List<Subject> subjectList = node.getSubjectList();
1116
               // the calling subject must be in the subject list
1117
               if ( subjectList.contains(callingSubject)) {
1118
                   allowed = true;
1119
                   
1120
               }
1121
               
1122
            }
1123
        }
1124
        
1125
        if (!allowed ) {
1126
            String msg = "The subject identified by " + callingSubject.getValue() +
1127
              " is not authorized to call this service.";
1128
            throw new NotAuthorized("1331", msg);
1129
            
1130
        }
1131
        
1132
        // compare what we have locally to what is sent in the change notification
1133
        try {
1134
            currentLocalSysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1135
             
1136
        } catch (RuntimeException e) {
1137
            String msg = "SystemMetadata for pid " + pid.getValue() +
1138
              " couldn't be updated because it couldn't be found locally: " +
1139
              e.getMessage();
1140
            logMetacat.error(msg);
1141
            ServiceFailure sf = new ServiceFailure("1333", msg);
1142
            sf.initCause(e);
1143
            throw sf; 
1144
        }
1145
        
1146
        if (currentLocalSysMeta.getSerialVersion().longValue() < serialVersion ) {
1147
            try {
1148
                newSysMeta = cn.getSystemMetadata(null, pid);
1149
            } catch (NotFound e) {
1150
                // huh? you just said you had it
1151
            	String msg = "On updating the local copy of system metadata " + 
1152
                "for pid " + pid.getValue() +", the CN reports it is not found." +
1153
                " The error message was: " + e.getMessage();
1154
                logMetacat.error(msg);
1155
                ServiceFailure sf = new ServiceFailure("1333", msg);
1156
                sf.initCause(e);
1157
                throw sf;
1158
            }
1159
            
1160
            // update the local copy of system metadata for the pid
1161
            try {
1162
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1163
                logMetacat.info("Updated local copy of system metadata for pid " +
1164
                    pid.getValue() + " after change notification from the CN.");
1165
                
1166
            } catch (RuntimeException e) {
1167
                String msg = "SystemMetadata for pid " + pid.getValue() +
1168
                  " couldn't be updated: " +
1169
                  e.getMessage();
1170
                logMetacat.error(msg);
1171
                ServiceFailure sf = new ServiceFailure("1333", msg);
1172
                sf.initCause(e);
1173
                throw sf;
1174
            }
1175
        }
1176
        
1177
        return true;
1178
        
1179
    }
1180
    
1181
    /*
1182
     * Set the replication status for the object on the Coordinating Node
1183
     * 
1184
     * @param session - the session for the this target node
1185
     * @param pid - the identifier of the object being updated
1186
     * @param nodeId - the identifier of this target node
1187
     * @param status - the replication status to set
1188
     * @param failure - the exception to include, if any
1189
     */
1190
    private void setReplicationStatus(Session session, Identifier pid, 
1191
        NodeReference nodeId, ReplicationStatus status, BaseException failure) 
1192
        throws ServiceFailure, NotImplemented, NotAuthorized, 
1193
        InvalidRequest {
1194
        
1195
        // call the CN as the MN to set the replication status
1196
        try {
1197
            this.cn = D1Client.getCN();
1198
            this.cn.setReplicationStatus(session, pid, nodeId,
1199
                    status, failure);
1200
            
1201
        } catch (InvalidToken e) {
1202
            throw new ServiceFailure("2151",
1203
                    "Could not set the replication status on the CN (InvalidToken): " + 
1204
                    e.getMessage());
1205
            
1206
        } catch (NotFound e) {
1207
            throw new ServiceFailure("2151",
1208
                    "Could not set the replication status on the CN (NotFound): " + 
1209
                    e.getMessage());
1210
            
1211
        }
1212

    
1213

    
1214
    }
1215
    
1216
}
(3-3/5)