Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.security.NoSuchAlgorithmException;
29
import java.sql.SQLException;
30
import java.util.Calendar;
31
import java.util.Date;
32
import java.util.List;
33
import java.util.Timer;
34

    
35
import javax.servlet.http.HttpServletRequest;
36

    
37
import org.apache.commons.io.IOUtils;
38
import org.apache.log4j.Logger;
39
import org.dataone.client.CNode;
40
import org.dataone.client.D1Client;
41
import org.dataone.client.MNode;
42
import org.dataone.client.auth.CertificateManager;
43
import org.dataone.configuration.Settings;
44
import org.dataone.service.exceptions.BaseException;
45
import org.dataone.service.exceptions.IdentifierNotUnique;
46
import org.dataone.service.exceptions.InsufficientResources;
47
import org.dataone.service.exceptions.InvalidRequest;
48
import org.dataone.service.exceptions.InvalidSystemMetadata;
49
import org.dataone.service.exceptions.InvalidToken;
50
import org.dataone.service.exceptions.NotAuthorized;
51
import org.dataone.service.exceptions.NotFound;
52
import org.dataone.service.exceptions.NotImplemented;
53
import org.dataone.service.exceptions.ServiceFailure;
54
import org.dataone.service.exceptions.SynchronizationFailed;
55
import org.dataone.service.exceptions.UnsupportedType;
56
import org.dataone.service.mn.tier1.v1.MNCore;
57
import org.dataone.service.mn.tier1.v1.MNRead;
58
import org.dataone.service.mn.tier2.v1.MNAuthorization;
59
import org.dataone.service.mn.tier3.v1.MNStorage;
60
import org.dataone.service.mn.tier4.v1.MNReplication;
61
import org.dataone.service.types.v1.Checksum;
62
import org.dataone.service.types.v1.Event;
63
import org.dataone.service.types.v1.Group;
64
import org.dataone.service.types.v1.Identifier;
65
import org.dataone.service.types.v1.Log;
66
import org.dataone.service.types.v1.LogEntry;
67
import org.dataone.service.types.v1.MonitorInfo;
68
import org.dataone.service.types.v1.MonitorList;
69
import org.dataone.service.types.v1.Node;
70
import org.dataone.service.types.v1.NodeList;
71
import org.dataone.service.types.v1.NodeReference;
72
import org.dataone.service.types.v1.NodeState;
73
import org.dataone.service.types.v1.NodeType;
74
import org.dataone.service.types.v1.ObjectFormatIdentifier;
75
import org.dataone.service.types.v1.ObjectList;
76
import org.dataone.service.types.v1.Permission;
77
import org.dataone.service.types.v1.Ping;
78
import org.dataone.service.types.v1.ReplicationStatus;
79
import org.dataone.service.types.v1.Schedule;
80
import org.dataone.service.types.v1.Service;
81
import org.dataone.service.types.v1.Services;
82
import org.dataone.service.types.v1.Session;
83
import org.dataone.service.types.v1.Subject;
84
import org.dataone.service.types.v1.Synchronization;
85
import org.dataone.service.types.v1.SystemMetadata;
86
import org.dataone.service.types.v1.util.ChecksumUtil;
87
import org.dataone.service.util.Constants;
88

    
89
import edu.ucsb.nceas.metacat.DocumentImpl;
90
import edu.ucsb.nceas.metacat.EventLog;
91
import edu.ucsb.nceas.metacat.IdentifierManager;
92
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
93
import edu.ucsb.nceas.metacat.MetacatHandler;
94
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
95
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
96
import edu.ucsb.nceas.metacat.properties.PropertyService;
97
import edu.ucsb.nceas.metacat.util.SystemUtil;
98
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
99

    
100
/**
101
 * Represents Metacat's implementation of the DataONE Member Node 
102
 * service API. Methods implement the various MN* interfaces, and methods common
103
 * to both Member Node and Coordinating Node interfaces are found in the
104
 * D1NodeService base class.
105
 * 
106
 * Implements:
107
 * MNCore.ping()
108
 * MNCore.getLogRecords()
109
 * MNCore.getObjectStatistics()
110
 * MNCore.getOperationStatistics()
111
 * MNCore.getStatus()
112
 * MNCore.getCapabilities()
113
 * MNRead.get()
114
 * MNRead.getSystemMetadata()
115
 * MNRead.describe()
116
 * MNRead.getChecksum()
117
 * MNRead.listObjects()
118
 * MNRead.synchronizationFailed()
119
 * MNAuthorization.isAuthorized()
120
 * MNAuthorization.setAccessPolicy()
121
 * MNStorage.create()
122
 * MNStorage.update()
123
 * MNStorage.delete()
124
 * MNReplication.replicate()
125
 * 
126
 */
127
public class MNodeService extends D1NodeService 
128
    implements MNAuthorization, MNCore, MNRead, MNReplication, MNStorage {
129

    
130
    /* the logger instance */
131
    private Logger logMetacat = null;
132
    
133
    /* A reference to a remote Memeber Node */
134
    private MNode mn;
135
    
136
    /* A reference to a Coordinating Node */
137
    private CNode cn;
138

    
139

    
140
    /**
141
     * Singleton accessor to get an instance of MNodeService.
142
     * 
143
     * @return instance - the instance of MNodeService
144
     */
145
    public static MNodeService getInstance(HttpServletRequest request) {
146
        return new MNodeService(request);
147
    }
148

    
149
    /**
150
     * Constructor, private for singleton access
151
     */
152
    private MNodeService(HttpServletRequest request) {
153
        super(request);
154
        logMetacat = Logger.getLogger(MNodeService.class);
155
        
156
        // set the Member Node certificate file location
157
        CertificateManager.getInstance().setCertificateLocation(Settings.getConfiguration().getString("D1Client.certificate.file"));
158
    }
159

    
160
    /**
161
     * Deletes an object from the Member Node, where the object is either a 
162
     * data object or a science metadata object.
163
     * 
164
     * @param session - the Session object containing the credentials for the Subject
165
     * @param pid - The object identifier to be deleted
166
     * 
167
     * @return pid - the identifier of the object used for the deletion
168
     * 
169
     * @throws InvalidToken
170
     * @throws ServiceFailure
171
     * @throws NotAuthorized
172
     * @throws NotFound
173
     * @throws NotImplemented
174
     * @throws InvalidRequest
175
     */
176
    @Override
177
    public Identifier delete(Session session, Identifier pid) 
178
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
179

    
180
        String localId = null;
181
        boolean allowed = false;
182
        String username = Constants.SUBJECT_PUBLIC;
183
        String[] groupnames = null;
184
        if (session == null) {
185
        	throw new InvalidToken("1330", "No session has been provided");
186
        } else {
187
            username = session.getSubject().getValue();
188
            if (session.getSubjectInfo() != null) {
189
                List<Group> groupList = session.getSubjectInfo().getGroupList();
190
                if (groupList != null) {
191
                    groupnames = new String[groupList.size()];
192
                    for (int i = 0; i > groupList.size(); i++) {
193
                        groupnames[i] = groupList.get(i).getGroupName();
194
                    }
195
                }
196
            }
197
        }
198

    
199
        // do we have a valid pid?
200
        if (pid == null || pid.getValue().trim().equals("")) {
201
            throw new ServiceFailure("1350", "The provided identifier was invalid.");
202
        }
203

    
204
        // check for the existing identifier
205
        try {
206
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
207
        } catch (McdbDocNotFoundException e) {
208
            throw new NotFound("1340", "The object with the provided " + "identifier was not found.");
209
        }
210

    
211
        // does the subject have DELETE (a D1 CHANGE_PERMISSION level) priveleges on the pid?
212
        allowed = isAuthorized(session, pid, Permission.CHANGE_PERMISSION);
213
            
214

    
215
        if (allowed) {
216
            try {
217
                // delete the document
218
                DocumentImpl.delete(localId, username, groupnames, null);
219
                EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), username, localId, Event.DELETE.xmlValue());
220

    
221
                // archive it
222
                SystemMetadata sysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
223
                sysMeta.setArchived(true);
224
                HazelcastService.getInstance().getSystemMetadataMap().put(pid, sysMeta);
225
                
226
                // remove the system metadata for it
227
                //HazelcastService.getInstance().getSystemMetadataMap().remove(pid);
228
                
229
            } catch (McdbDocNotFoundException e) {
230
                throw new NotFound("1340", "The provided identifier was invalid.");
231

    
232
            } catch (SQLException e) {
233
                throw new ServiceFailure("1350", "There was a problem deleting the object." + "The error message was: " + e.getMessage());
234

    
235
            } catch (InsufficientKarmaException e) {
236
                throw new NotAuthorized("1320", "The provided identity does not have " + "permission to DELETE objects on the Member Node.");
237

    
238
            } catch (Exception e) { // for some reason DocumentImpl throws a general Exception
239
                throw new ServiceFailure("1350", "There was a problem deleting the object." + "The error message was: " + e.getMessage());
240
            }
241

    
242
        } else {
243
            throw new NotAuthorized("1320", "The provided identity does not have " + "permission to DELETE objects on the Member Node.");
244
        }
245

    
246
        return pid;
247
    }
248

    
249
    /**
250
     * Updates an existing object by creating a new object identified by 
251
     * newPid on the Member Node which explicitly obsoletes the object 
252
     * identified by pid through appropriate changes to the SystemMetadata 
253
     * of pid and newPid
254
     * 
255
     * @param session - the Session object containing the credentials for the Subject
256
     * @param pid - The identifier of the object to be updated
257
     * @param object - the new object bytes
258
     * @param sysmeta - the new system metadata describing the object
259
     * 
260
     * @return newPid - the identifier of the new object
261
     * 
262
     * @throws InvalidToken
263
     * @throws ServiceFailure
264
     * @throws NotAuthorized
265
     * @throws NotFound
266
     * @throws NotImplemented
267
     * @throws IdentifierNotUnique
268
     * @throws UnsupportedType
269
     * @throws InsufficientResources
270
     * @throws InvalidSystemMetadata
271
     * @throws InvalidRequest
272
     */
273
    @Override
274
    public Identifier update(Session session, Identifier pid, InputStream object, 
275
        Identifier newPid, SystemMetadata sysmeta) 
276
        throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
277
        UnsupportedType, InsufficientResources, NotFound, 
278
        InvalidSystemMetadata, NotImplemented, InvalidRequest {
279

    
280
        String localId = null;
281
        boolean allowed = false;
282
        boolean isScienceMetadata = false;
283
        
284
        if (session == null) {
285
        	throw new InvalidToken("1210", "No session has been provided");
286
        }
287
        Subject subject = session.getSubject();
288

    
289
        // do we have a valid pid?
290
        if (pid == null || pid.getValue().trim().equals("")) {
291
            throw new InvalidRequest("1202", "The provided identifier was invalid.");
292
            
293
        }
294

    
295
        // check for the existing identifier
296
        try {
297
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
298
            
299
        } catch (McdbDocNotFoundException e) {
300
            throw new InvalidRequest("1202", "The object with the provided " + 
301
                "identifier was not found.");
302
            
303
        }
304
        
305
        // set the originating node
306
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
307
        sysmeta.setOriginMemberNode(originMemberNode);
308
        
309
        // set the submitter to match the certificate
310
        sysmeta.setSubmitter(subject);
311
        // set the dates
312
        Date now = Calendar.getInstance().getTime();
313
        sysmeta.setDateSysMetadataModified(now);
314
        sysmeta.setDateUploaded(now);
315

    
316
        // does the subject have WRITE ( == update) priveleges on the pid?
317
        allowed = isAuthorized(session, pid, Permission.WRITE);
318

    
319
        if (allowed) {
320
        	
321
        	// check quality of SM
322
        	if (sysmeta.getObsoletedBy() != null) {
323
        		throw new InvalidSystemMetadata("1300", "Cannot include obsoletedBy when updating object");
324
        	}
325
        	if (sysmeta.getObsoletes() != null && !sysmeta.getObsoletes().getValue().equals(pid.getValue())) {
326
        		throw new InvalidSystemMetadata("1300", "The identifier provided in obsoletes does not match old Identifier");
327
        	}
328

    
329
            // get the existing system metadata for the object
330
            SystemMetadata existingSysMeta = getSystemMetadata(session, pid);
331

    
332
            // add the newPid to the obsoletedBy list for the existing sysmeta
333
            existingSysMeta.setObsoletedBy(newPid);
334

    
335
            // then update the existing system metadata
336
            updateSystemMetadata(existingSysMeta);
337

    
338
            // prep the new system metadata, add pid to the affected lists
339
            sysmeta.setObsoletes(pid);
340
            //sysmeta.addDerivedFrom(pid);
341

    
342
            isScienceMetadata = isScienceMetadata(sysmeta);
343

    
344
            // do we have XML metadata or a data object?
345
            if (isScienceMetadata) {
346

    
347
                // update the science metadata XML document
348
                // TODO: handle non-XML metadata/data documents (like netCDF)
349
                // TODO: don't put objects into memory using stream to string
350
                String objectAsXML = "";
351
                try {
352
                    objectAsXML = IOUtils.toString(object, "UTF-8");
353
                    localId = insertOrUpdateDocument(objectAsXML, newPid, session, "update");
354
                    // register the newPid and the generated localId
355
                    if (newPid != null) {
356
                        IdentifierManager.getInstance().createMapping(newPid.getValue(), localId);
357

    
358
                    }
359

    
360
                } catch (IOException e) {
361
                    String msg = "The Node is unable to create the object. " + "There was a problem converting the object to XML";
362
                    logMetacat.info(msg);
363
                    throw new ServiceFailure("1310", msg + ": " + e.getMessage());
364

    
365
                }
366

    
367
            } else {
368

    
369
                // update the data object
370
                localId = insertDataObject(object, newPid, session);
371

    
372
            }
373

    
374
            // and insert the new system metadata
375
            insertSystemMetadata(sysmeta);
376

    
377
            // log the update event
378
            EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), subject.getValue(), localId, Event.UPDATE.toString());
379

    
380
        } else {
381
            throw new NotAuthorized("1200", "The provided identity does not have " + "permission to UPDATE the object identified by " + pid.getValue()
382
                    + " on the Member Node.");
383
        }
384

    
385
        return newPid;
386
    }
387

    
388
    public Identifier create(Session session, Identifier pid, InputStream object, SystemMetadata sysmeta) throws InvalidToken, ServiceFailure, NotAuthorized,
389
            IdentifierNotUnique, UnsupportedType, InsufficientResources, InvalidSystemMetadata, NotImplemented, InvalidRequest {
390

    
391
        // check for null session
392
        if (session == null) {
393
          throw new InvalidToken("1110", "Session is required to WRITE to the Node.");
394
        }
395
        // set the submitter to match the certificate
396
        sysmeta.setSubmitter(session.getSubject());
397
        // set the originating node
398
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
399
        sysmeta.setOriginMemberNode(originMemberNode);
400
        sysmeta.setArchived(false);
401

    
402
        // set the dates
403
        Date now = Calendar.getInstance().getTime();
404
        sysmeta.setDateSysMetadataModified(now);
405
        sysmeta.setDateUploaded(now);
406
        // call the shared impl
407
        return super.create(session, pid, object, sysmeta);
408
    }
409

    
410
    /**
411
     * Called by a Coordinating Node to request that the Member Node create a 
412
     * copy of the specified object by retrieving it from another Member 
413
     * Node and storing it locally so that it can be made accessible to 
414
     * the DataONE system.
415
     * 
416
     * @param session - the Session object containing the credentials for the Subject
417
     * @param sysmeta - Copy of the CN held system metadata for the object
418
     * @param sourceNode - A reference to node from which the content should be 
419
     *                     retrieved. The reference should be resolved by 
420
     *                     checking the CN node registry.
421
     * 
422
     * @return true if the replication succeeds
423
     * 
424
     * @throws ServiceFailure
425
     * @throws NotAuthorized
426
     * @throws NotImplemented
427
     * @throws UnsupportedType
428
     * @throws InsufficientResources
429
     * @throws InvalidRequest
430
     */
431
    @Override
432
    public boolean replicate(Session session, SystemMetadata sysmeta,
433
            NodeReference sourceNode) throws NotImplemented, ServiceFailure,
434
            NotAuthorized, InvalidRequest, InsufficientResources,
435
            UnsupportedType {
436

    
437
        if (session != null && sysmeta != null && sourceNode != null) {
438
            logMetacat.info("MNodeService.replicate() called with parameters: \n" +
439
                            "\tSession.Subject      = "                           +
440
                            session.getSubject().getValue() + "\n"                +
441
                            "\tSystemMetadata       = " + sysmeta.toString()      +
442
                            "\n" + "\tSource NodeReference ="                     +
443
                            sourceNode.getValue());
444
        }
445
        boolean result = false;
446
        String nodeIdStr = null;
447
        NodeReference nodeId = null;
448

    
449
        // get the referenced object
450
        Identifier pid = sysmeta.getIdentifier();
451

    
452
        // get from the membernode
453
        // TODO: switch credentials for the server retrieval?
454
        this.mn = D1Client.getMN(sourceNode);
455
        this.cn = D1Client.getCN();
456
        InputStream object = null;
457
        Session thisNodeSession = null;
458
        SystemMetadata localSystemMetadata = null;
459
        BaseException failure = null;
460
        String localId = null;
461
        
462
        // TODO: check credentials
463
        // cannot be called by public
464
        if (session.getSubject() == null) {
465
            String msg = "No session was provided.";
466
            failure = new NotAuthorized("2152", msg);
467
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
468
            logMetacat.info(msg);
469
            return true;
470
        }
471

    
472

    
473
        // get the local node id
474
        try {
475
            nodeIdStr = PropertyService.getProperty("dataone.memberNodeId");
476
            nodeId = new NodeReference();
477
            nodeId.setValue(nodeIdStr);
478

    
479
        } catch (PropertyNotFoundException e1) {
480
            String msg = "Couldn't get dataone.memberNodeId property: " + e1.getMessage();
481
            failure = new ServiceFailure("2151", msg);
482
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
483
            logMetacat.error(msg);
484
            return true;
485

    
486
        }
487
        
488

    
489
        try {
490
            // do we already have a replica?
491
            try {
492
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
493

    
494
                String msg = "Can't read the object bytes properly, replica is invalid.";
495
                ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
496
                
497
                // if we have a local id, get the local object
498
                try {
499
                    object = MetacatHandler.read(localId);
500
                    
501
                } catch (Exception e) {
502
                    // let the CN know that the replication failed
503
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);  
504
                    throw serviceFailure;
505
                    
506
                }
507

    
508
            } catch (McdbDocNotFoundException e) {
509
                logMetacat.info("No replica found. Continuing.");
510
                
511
            }
512
            
513
            // no local replica, get a replica
514
            if ( object == null ) {
515
                // session should be null to use the default certificate
516
                // location set in the Certificate manager
517
                object = mn.getReplica(thisNodeSession, pid);
518
                logMetacat.info("MNodeService.replicate() called for identifier "
519
                                + pid.getValue());
520

    
521
            }
522

    
523
        } catch (InvalidToken e) {            
524
            String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
525
            failure = new ServiceFailure("2151", msg);
526
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
527
            logMetacat.error(msg);
528
            throw new ServiceFailure("2151", msg);
529

    
530
        } catch (NotFound e) {
531
            String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
532
            failure = new ServiceFailure("2151", msg);
533
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
534
            logMetacat.error(msg);
535
            throw new ServiceFailure("2151", msg);
536

    
537
        }
538

    
539
        // verify checksum on the object, if supported
540
        if (object.markSupported()) {
541
            Checksum givenChecksum = sysmeta.getChecksum();
542
            Checksum computedChecksum = null;
543
            try {
544
                computedChecksum = ChecksumUtil.checksum(object,
545
                        givenChecksum.getAlgorithm());
546
                object.reset();
547

    
548
            } catch (Exception e) {
549
                String msg = "Error computing checksum on replica: "
550
                        + e.getMessage();
551
                ServiceFailure sf = new ServiceFailure("2151", msg);
552
                sf.initCause(e);
553
                throw sf;
554
            }
555
            if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
556
                logMetacat.debug("Given    checksum for " + pid.getValue() + 
557
                    "is " + givenChecksum.getValue());
558
                logMetacat.debug("Computed checksum for " + pid.getValue() + 
559
                    "is " + computedChecksum.getValue());
560
                throw new ServiceFailure("2151",
561
                        "Computed checksum does not match declared checksum");
562
            }
563
        }
564

    
565
        // add it to local store
566
        Identifier retPid;
567
        try {
568
            // skip the MN.create -- this mutates the system metadata and we
569
            // dont want it to
570
            if ( localId == null ) {
571
                
572
                retPid = super.create(session, pid, object, sysmeta);
573
                result = (retPid.getValue().equals(pid.getValue()));
574
            }
575
            
576
        } catch (InvalidToken e) {
577
            String msg = "Could not save object to local store (InvalidToken): " + e.getMessage();
578
            failure = new ServiceFailure("2151", msg);
579
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
580
            logMetacat.error(msg);
581
            throw new ServiceFailure("2151", msg);
582
        
583
        } catch (IdentifierNotUnique e) {
584
            String msg = "Could not save object to local store (IdentifierNotUnique): " + e.getMessage();
585
            failure = new ServiceFailure("2151", msg);
586
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
587
            logMetacat.error(msg);
588
            throw new ServiceFailure("2151", msg);
589
        
590
        } catch (InvalidSystemMetadata e) {
591
            String msg = "Could not save object to local store (InvalidSystemMetadata): " + e.getMessage();
592
            failure = new ServiceFailure("2151", msg);
593
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
594
            logMetacat.error(msg);
595
            throw new ServiceFailure("2151", msg);
596
            
597
        }
598

    
599
        // finish by setting the replication status
600
        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
601
        return result;
602

    
603
    }
604

    
605
    /**
606
     * Return the object identified by the given object identifier
607
     * 
608
     * @param session - the Session object containing the credentials for the Subject
609
     * @param pid - the object identifier for the given object
610
     * 
611
     * @return inputStream - the input stream of the given object
612
     * 
613
     * @throws InvalidToken
614
     * @throws ServiceFailure
615
     * @throws NotAuthorized
616
     * @throws InvalidRequest
617
     * @throws NotImplemented
618
     */
619
    @Override
620
    public InputStream get(Session session, Identifier pid) 
621
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
622

    
623
        return super.get(session, pid);
624

    
625
    }
626

    
627
    /**
628
     * Returns a Checksum for the specified object using an accepted hashing algorithm
629
     * 
630
     * @param session - the Session object containing the credentials for the Subject
631
     * @param pid - the object identifier for the given object
632
     * @param algorithm -  the name of an algorithm that will be used to compute 
633
     *                     a checksum of the bytes of the object
634
     * 
635
     * @return checksum - the checksum of the given object
636
     * 
637
     * @throws InvalidToken
638
     * @throws ServiceFailure
639
     * @throws NotAuthorized
640
     * @throws NotFound
641
     * @throws InvalidRequest
642
     * @throws NotImplemented
643
     */
644
    @Override
645
    public Checksum getChecksum(Session session, Identifier pid, String algorithm) 
646
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
647
        InvalidRequest, NotImplemented {
648

    
649
        Checksum checksum = null;
650

    
651
        InputStream inputStream = get(session, pid);
652

    
653
        try {
654
            checksum = ChecksumUtil.checksum(inputStream, algorithm);
655

    
656
        } catch (NoSuchAlgorithmException e) {
657
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
658
                    + e.getMessage());
659
        } catch (IOException e) {
660
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
661
                    + e.getMessage());
662
        }
663

    
664
        if (checksum == null) {
665
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned.");
666
        }
667

    
668
        return checksum;
669
    }
670

    
671
    /**
672
     * Return the system metadata for a given object
673
     * 
674
     * @param session - the Session object containing the credentials for the Subject
675
     * @param pid - the object identifier for the given object
676
     * 
677
     * @return inputStream - the input stream of the given system metadata object
678
     * 
679
     * @throws InvalidToken
680
     * @throws ServiceFailure
681
     * @throws NotAuthorized
682
     * @throws NotFound
683
     * @throws InvalidRequest
684
     * @throws NotImplemented
685
     */
686
    @Override
687
    public SystemMetadata getSystemMetadata(Session session, Identifier pid) 
688
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
689
        NotImplemented {
690

    
691
        return super.getSystemMetadata(session, pid);
692
    }
693

    
694
    /**
695
     * Retrieve the list of objects present on the MN that match the calling parameters
696
     * 
697
     * @param session - the Session object containing the credentials for the Subject
698
     * @param startTime - Specifies the beginning of the time range from which 
699
     *                    to return object (>=)
700
     * @param endTime - Specifies the beginning of the time range from which 
701
     *                  to return object (>=)
702
     * @param objectFormat - Restrict results to the specified object format
703
     * @param replicaStatus - Indicates if replicated objects should be returned in the list
704
     * @param start - The zero-based index of the first value, relative to the 
705
     *                first record of the resultset that matches the parameters.
706
     * @param count - The maximum number of entries that should be returned in 
707
     *                the response. The Member Node may return less entries 
708
     *                than specified in this value.
709
     * 
710
     * @return objectList - the list of objects matching the criteria
711
     * 
712
     * @throws InvalidToken
713
     * @throws ServiceFailure
714
     * @throws NotAuthorized
715
     * @throws InvalidRequest
716
     * @throws NotImplemented
717
     */
718
    @Override
719
    public ObjectList listObjects(Session session, Date startTime, Date endTime, ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
720
            Integer count) throws NotAuthorized, InvalidRequest, NotImplemented, ServiceFailure, InvalidToken {
721

    
722
        ObjectList objectList = null;
723

    
724
        try {
725
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, objectFormatId, replicaStatus, start, count);
726
        } catch (Exception e) {
727
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
728
        }
729

    
730
        return objectList;
731
    }
732

    
733
    /**
734
     * Return a description of the node's capabilities and services.
735
     * 
736
     * @return node - the technical capabilities of the Member Node
737
     * 
738
     * @throws ServiceFailure
739
     * @throws NotAuthorized
740
     * @throws InvalidRequest
741
     * @throws NotImplemented
742
     */
743
    @Override
744
    public Node getCapabilities() 
745
        throws NotImplemented, ServiceFailure {
746

    
747
        String nodeName = null;
748
        String nodeId = null;
749
        String subject = null;
750
        String contactSubject = null;
751
        String nodeDesc = null;
752
        String nodeTypeString = null;
753
        NodeType nodeType = null;
754
        String mnCoreServiceVersion = null;
755
        String mnReadServiceVersion = null;
756
        String mnAuthorizationServiceVersion = null;
757
        String mnStorageServiceVersion = null;
758
        String mnReplicationServiceVersion = null;
759

    
760
        boolean nodeSynchronize = false;
761
        boolean nodeReplicate = false;
762
        boolean mnCoreServiceAvailable = false;
763
        boolean mnReadServiceAvailable = false;
764
        boolean mnAuthorizationServiceAvailable = false;
765
        boolean mnStorageServiceAvailable = false;
766
        boolean mnReplicationServiceAvailable = false;
767

    
768
        try {
769
            // get the properties of the node based on configuration information
770
            nodeName = PropertyService.getProperty("dataone.nodeName");
771
            nodeId = PropertyService.getProperty("dataone.memberNodeId");
772
            subject = PropertyService.getProperty("dataone.subject");
773
            contactSubject = PropertyService.getProperty("dataone.contactSubject");
774
            nodeDesc = PropertyService.getProperty("dataone.nodeDescription");
775
            nodeTypeString = PropertyService.getProperty("dataone.nodeType");
776
            nodeType = NodeType.convert(nodeTypeString);
777
            nodeSynchronize = new Boolean(PropertyService.getProperty("dataone.nodeSynchronize")).booleanValue();
778
            nodeReplicate = new Boolean(PropertyService.getProperty("dataone.nodeReplicate")).booleanValue();
779

    
780
            mnCoreServiceVersion = PropertyService.getProperty("dataone.mnCore.serviceVersion");
781
            mnReadServiceVersion = PropertyService.getProperty("dataone.mnRead.serviceVersion");
782
            mnAuthorizationServiceVersion = PropertyService.getProperty("dataone.mnAuthorization.serviceVersion");
783
            mnStorageServiceVersion = PropertyService.getProperty("dataone.mnStorage.serviceVersion");
784
            mnReplicationServiceVersion = PropertyService.getProperty("dataone.mnReplication.serviceVersion");
785

    
786
            mnCoreServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnCore.serviceAvailable")).booleanValue();
787
            mnReadServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnRead.serviceAvailable")).booleanValue();
788
            mnAuthorizationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnAuthorization.serviceAvailable")).booleanValue();
789
            mnStorageServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnStorage.serviceAvailable")).booleanValue();
790
            mnReplicationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnReplication.serviceAvailable")).booleanValue();
791

    
792
            // Set the properties of the node based on configuration information and
793
            // calls to current status methods
794
            String serviceName = SystemUtil.getContextURL() + "/" + PropertyService.getProperty("dataone.serviceName");
795
            Node node = new Node();
796
            node.setBaseURL(serviceName + "/" + nodeTypeString);
797
            node.setDescription(nodeDesc);
798

    
799
            // set the node's health information
800
            node.setState(NodeState.UP);
801
            
802
            // set the ping response to the current value
803
            Ping canPing = new Ping();
804
            canPing.setSuccess(false);
805
            try {
806
            	Date pingDate = ping();
807
                canPing.setSuccess(pingDate != null);
808
            } catch (BaseException e) {
809
                e.printStackTrace();
810
                // guess it can't be pinged
811
            }
812
            
813
            node.setPing(canPing);
814

    
815
            NodeReference identifier = new NodeReference();
816
            identifier.setValue(nodeId);
817
            node.setIdentifier(identifier);
818
            Subject s = new Subject();
819
            s.setValue(subject);
820
            node.addSubject(s);
821
            Subject contact = new Subject();
822
            contact.setValue(contactSubject);
823
            node.addContactSubject(contact);
824
            node.setName(nodeName);
825
            node.setReplicate(nodeReplicate);
826
            node.setSynchronize(nodeSynchronize);
827

    
828
            // services: MNAuthorization, MNCore, MNRead, MNReplication, MNStorage
829
            Services services = new Services();
830

    
831
            Service sMNCore = new Service();
832
            sMNCore.setName("MNCore");
833
            sMNCore.setVersion(mnCoreServiceVersion);
834
            sMNCore.setAvailable(mnCoreServiceAvailable);
835

    
836
            Service sMNRead = new Service();
837
            sMNRead.setName("MNRead");
838
            sMNRead.setVersion(mnReadServiceVersion);
839
            sMNRead.setAvailable(mnReadServiceAvailable);
840

    
841
            Service sMNAuthorization = new Service();
842
            sMNAuthorization.setName("MNAuthorization");
843
            sMNAuthorization.setVersion(mnAuthorizationServiceVersion);
844
            sMNAuthorization.setAvailable(mnAuthorizationServiceAvailable);
845

    
846
            Service sMNStorage = new Service();
847
            sMNStorage.setName("MNStorage");
848
            sMNStorage.setVersion(mnStorageServiceVersion);
849
            sMNStorage.setAvailable(mnStorageServiceAvailable);
850

    
851
            Service sMNReplication = new Service();
852
            sMNReplication.setName("MNReplication");
853
            sMNReplication.setVersion(mnReplicationServiceVersion);
854
            sMNReplication.setAvailable(mnReplicationServiceAvailable);
855

    
856
            services.addService(sMNRead);
857
            services.addService(sMNCore);
858
            services.addService(sMNAuthorization);
859
            services.addService(sMNStorage);
860
            services.addService(sMNReplication);
861
            node.setServices(services);
862

    
863
            // Set the schedule for synchronization
864
            Synchronization synchronization = new Synchronization();
865
            Schedule schedule = new Schedule();
866
            Date now = new Date();
867
            schedule.setYear(PropertyService.getProperty("dataone.nodeSynchronization.schedule.year"));
868
            schedule.setMon(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mon"));
869
            schedule.setMday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mday"));
870
            schedule.setWday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.wday"));
871
            schedule.setHour(PropertyService.getProperty("dataone.nodeSynchronization.schedule.hour"));
872
            schedule.setMin(PropertyService.getProperty("dataone.nodeSynchronization.schedule.min"));
873
            schedule.setSec(PropertyService.getProperty("dataone.nodeSynchronization.schedule.sec"));
874
            synchronization.setSchedule(schedule);
875
            synchronization.setLastHarvested(now);
876
            synchronization.setLastCompleteHarvest(now);
877
            node.setSynchronization(synchronization);
878

    
879
            node.setType(nodeType);
880
            return node;
881

    
882
        } catch (PropertyNotFoundException pnfe) {
883
            String msg = "MNodeService.getCapabilities(): " + "property not found: " + pnfe.getMessage();
884
            logMetacat.error(msg);
885
            throw new ServiceFailure("2162", msg);
886
        }
887
    }
888

    
889
    /**
890
     * Returns the number of operations that have been serviced by the node 
891
     * over time periods of one and 24 hours.
892
     * 
893
     * @param session - the Session object containing the credentials for the Subject
894
     * @param period - An ISO8601 compatible DateTime range specifying the time 
895
     *                 range for which to return operation statistics.
896
     * @param requestor - Limit to operations performed by given requestor identity.
897
     * @param event -  Enumerated value indicating the type of event being examined
898
     * @param format - Limit to events involving objects of the specified format
899
     * 
900
     * @return the desired log records
901
     * 
902
     * @throws InvalidToken
903
     * @throws ServiceFailure
904
     * @throws NotAuthorized
905
     * @throws InvalidRequest
906
     * @throws NotImplemented
907
     */
908
    public MonitorList getOperationStatistics(Session session, Date startTime, 
909
        Date endTime, Subject requestor, Event event, ObjectFormatIdentifier formatId)
910
        throws NotImplemented, ServiceFailure, NotAuthorized, InsufficientResources, UnsupportedType {
911

    
912
        MonitorList monitorList = new MonitorList();
913

    
914
        try {
915

    
916
            // get log records first
917
            Log logs = getLogRecords(session, startTime, endTime, event, 0, null);
918

    
919
            // TODO: aggregate by day or hour -- needs clarification
920
            int count = 1;
921
            for (LogEntry logEntry : logs.getLogEntryList()) {
922
                Identifier pid = logEntry.getIdentifier();
923
                Date logDate = logEntry.getDateLogged();
924
                // if we are filtering by format
925
                if (formatId != null) {
926
                    SystemMetadata sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
927
                    if (!sysmeta.getFormatId().getValue().equals(formatId.getValue())) {
928
                        // does not match
929
                        continue;
930
                    }
931
                }
932
                MonitorInfo item = new MonitorInfo();
933
                item.setCount(count);
934
                item.setDate(new java.sql.Date(logDate.getTime()));
935
                monitorList.addMonitorInfo(item);
936

    
937
            }
938
        } catch (Exception e) {
939
            e.printStackTrace();
940
            throw new ServiceFailure("2081", "Could not retrieve statistics: " + e.getMessage());
941
        }
942

    
943
        return monitorList;
944

    
945
    }
946

    
947
    /**
948
     * A callback method used by a CN to indicate to a MN that it cannot 
949
     * complete synchronization of the science metadata identified by pid.  Log
950
     * the event in the metacat event log.
951
     * 
952
     * @param session
953
     * @param syncFailed
954
     * 
955
     * @throws ServiceFailure
956
     * @throws NotAuthorized
957
     * @throws NotImplemented
958
     */
959
    @Override
960
    public boolean synchronizationFailed(Session session, SynchronizationFailed syncFailed) 
961
        throws NotImplemented, ServiceFailure, NotAuthorized {
962

    
963
        String localId;
964

    
965
        try {
966
            localId = IdentifierManager.getInstance().getLocalId(syncFailed.getPid());
967
        } catch (McdbDocNotFoundException e) {
968
            throw new ServiceFailure("2161", "The identifier specified by " + syncFailed.getPid() + " was not found on this node.");
969

    
970
        }
971
        // TODO: update the CN URL below when the CNRead.SynchronizationFailed
972
        // method is changed to include the URL as a parameter
973
        logMetacat.debug("Synchronization for the object identified by " + syncFailed.getPid() + " failed from " + syncFailed.getNodeId()
974
                + " Logging the event to the Metacat EventLog as a 'syncFailed' event.");
975
        // TODO: use the event type enum when the SYNCHRONIZATION_FAILED event is added
976
        String principal = Constants.SUBJECT_PUBLIC;
977
        if (session != null && session.getSubject() != null) {
978
          principal = session.getSubject().getValue();
979
        }
980
        try {
981
          EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), principal, localId, "synchronization_failed");
982
        } catch (Exception e) {
983
            throw new ServiceFailure("2161", "Could not log the error for: " + syncFailed.getPid());
984
        }
985
        //EventLog.getInstance().log("CN URL WILL GO HERE", 
986
        //  session.getSubject().getValue(), localId, Event.SYNCHRONIZATION_FAILED);
987
        return true;
988

    
989
    }
990

    
991
    /**
992
     * Essentially a get() but with different logging behavior
993
     */
994
    @Override
995
    public InputStream getReplica(Session session, Identifier pid) 
996
        throws NotAuthorized, NotImplemented, ServiceFailure, InvalidToken {
997

    
998
        logMetacat.info("MNodeService.getReplica() called.");
999

    
1000
        // cannot be called by public
1001
        if (session == null) {
1002
        	throw new InvalidToken("2183", "No session was provided.");
1003
        }
1004
        
1005
        logMetacat.info("MNodeService.getReplica() called with parameters: \n" +
1006
             "\tSession.Subject      = " + session.getSubject().getValue() + "\n" +
1007
             "\tIdentifier           = " + pid.getValue());
1008

    
1009
        InputStream inputStream = null; // bytes to be returned
1010
        handler = new MetacatHandler(new Timer());
1011
        boolean allowed = false;
1012
        String localId; // the metacat docid for the pid
1013

    
1014
        // get the local docid from Metacat
1015
        try {
1016
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1017
        } catch (McdbDocNotFoundException e) {
1018
            throw new ServiceFailure("2181", "The object specified by " + 
1019
                    pid.getValue() + " does not exist at this node.");
1020
            
1021
        }
1022

    
1023
        Subject targetNodeSubject = session.getSubject();
1024

    
1025
        // check for authorization to replicate, null session to act as this source MN
1026
        try {
1027
            allowed = D1Client.getCN().isNodeAuthorized(null, targetNodeSubject, pid);
1028
        } catch (InvalidToken e1) {
1029
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1030
                + e1.getMessage());
1031
            
1032
        } catch (NotFound e1) {
1033
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1034
                    + e1.getMessage());
1035

    
1036
        } catch (InvalidRequest e1) {
1037
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1038
                    + e1.getMessage());
1039

    
1040
        }
1041

    
1042
        logMetacat.info("Called D1Client.isNodeAuthorized(). Allowed = " + allowed +
1043
            " for identifier " + pid.getValue());
1044

    
1045
        // if the person is authorized, perform the read
1046
        if (allowed) {
1047
            try {
1048
                inputStream = MetacatHandler.read(localId);
1049
            } catch (Exception e) {
1050
                throw new ServiceFailure("1020", "The object specified by " + 
1051
                    pid.getValue() + "could not be returned due to error: " + e.getMessage());
1052
            }
1053
        }
1054

    
1055
        // if we fail to set the input stream
1056
        if (inputStream == null) {
1057
            throw new ServiceFailure("2181", "The object specified by " + 
1058
                pid.getValue() + "does not exist at this node.");
1059
        }
1060

    
1061
        // log the replica event
1062
        String principal = null;
1063
        if (session.getSubject() != null) {
1064
            principal = session.getSubject().getValue();
1065
        }
1066
        EventLog.getInstance().log(request.getRemoteAddr(), 
1067
            request.getHeader("User-Agent"), principal, localId, "replicate");
1068

    
1069
        return inputStream;
1070
    }
1071

    
1072
    /**
1073
     * A method to notify the Member Node that the authoritative copy of 
1074
     * system metadata on the Coordinating Nodes has changed.
1075
     * 
1076
     * @param session   Session information that contains the identity of the 
1077
     *                  calling user as retrieved from the X.509 certificate 
1078
     *                  which must be traceable to the CILogon service.
1079
     * @param serialVersion   The serialVersion of the system metadata
1080
     * @param dateSysMetaLastModified  The time stamp for when the system metadata was changed
1081
     * @throws NotImplemented
1082
     * @throws ServiceFailure
1083
     * @throws NotAuthorized
1084
     * @throws InvalidRequest
1085
     * @throws InvalidToken
1086
     */
1087
    public boolean systemMetadataChanged(Session session, Identifier pid,
1088
        long serialVersion, Date dateSysMetaLastModified) 
1089
        throws NotImplemented, ServiceFailure, NotAuthorized, InvalidRequest,
1090
        InvalidToken {
1091
        
1092
        SystemMetadata currentLocalSysMeta = null;
1093
        SystemMetadata newSysMeta = null;
1094
        CNode cn = D1Client.getCN();
1095
        NodeList nodeList = null;
1096
        Subject callingSubject = null;
1097
        boolean allowed = false;
1098
        
1099
        // are we allowed to call this?
1100
        callingSubject = session.getSubject();
1101
        nodeList = cn.listNodes();
1102
        
1103
        for(Node node : nodeList.getNodeList()) {
1104
            // must be a CN
1105
            if ( node.getType().equals(NodeType.CN)) {
1106
               List<Subject> subjectList = node.getSubjectList();
1107
               // the calling subject must be in the subject list
1108
               if ( subjectList.contains(callingSubject)) {
1109
                   allowed = true;
1110
                   
1111
               }
1112
               
1113
            }
1114
        }
1115
        
1116
        if (!allowed ) {
1117
            String msg = "The subject identified by " + callingSubject.getValue() +
1118
              " is not authorized to call this service.";
1119
            throw new NotAuthorized("1331", msg);
1120
            
1121
        }
1122
        
1123
        // compare what we have locally to what is sent in the change notification
1124
        try {
1125
            currentLocalSysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1126
             
1127
        } catch (RuntimeException e) {
1128
            String msg = "SystemMetadata for pid " + pid.getValue() +
1129
              " couldn't be updated because it couldn't be found locally: " +
1130
              e.getMessage();
1131
            logMetacat.error(msg);
1132
            ServiceFailure sf = new ServiceFailure("1333", msg);
1133
            sf.initCause(e);
1134
            throw sf; 
1135
        }
1136
        
1137
        if (currentLocalSysMeta.getSerialVersion().longValue() < serialVersion ) {
1138
            try {
1139
                newSysMeta = cn.getSystemMetadata(null, pid);
1140
            } catch (NotFound e) {
1141
                // huh? you just said you had it
1142
            	String msg = "On updating the local copy of system metadata " + 
1143
                "for pid " + pid.getValue() +", the CN reports it is not found." +
1144
                " The error message was: " + e.getMessage();
1145
                logMetacat.error(msg);
1146
                ServiceFailure sf = new ServiceFailure("1333", msg);
1147
                sf.initCause(e);
1148
                throw sf;
1149
            }
1150
            
1151
            // update the local copy of system metadata for the pid
1152
            try {
1153
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1154
                logMetacat.info("Updated local copy of system metadata for pid " +
1155
                    pid.getValue() + " after change notification from the CN.");
1156
                
1157
            } catch (RuntimeException e) {
1158
                String msg = "SystemMetadata for pid " + pid.getValue() +
1159
                  " couldn't be updated: " +
1160
                  e.getMessage();
1161
                logMetacat.error(msg);
1162
                ServiceFailure sf = new ServiceFailure("1333", msg);
1163
                sf.initCause(e);
1164
                throw sf;
1165
            }
1166
        }
1167
        
1168
        return true;
1169
        
1170
    }
1171
    
1172
    /*
1173
     * Set the replication status for the object on the Coordinating Node
1174
     * 
1175
     * @param session - the session for the this target node
1176
     * @param pid - the identifier of the object being updated
1177
     * @param nodeId - the identifier of this target node
1178
     * @param status - the replication status to set
1179
     * @param failure - the exception to include, if any
1180
     */
1181
    private void setReplicationStatus(Session session, Identifier pid, 
1182
        NodeReference nodeId, ReplicationStatus status, BaseException failure) 
1183
        throws ServiceFailure, NotImplemented, NotAuthorized, 
1184
        InvalidRequest {
1185
        
1186
        // call the CN as the MN to set the replication status
1187
        try {
1188
            this.cn = D1Client.getCN();
1189
            this.cn.setReplicationStatus(session, pid, nodeId,
1190
                    status, failure);
1191
            
1192
        } catch (InvalidToken e) {
1193
            throw new ServiceFailure("2151",
1194
                    "Could not set the replication status on the CN (InvalidToken): " + 
1195
                    e.getMessage());
1196
            
1197
        } catch (NotFound e) {
1198
            throw new ServiceFailure("2151",
1199
                    "Could not set the replication status on the CN (NotFound): " + 
1200
                    e.getMessage());
1201
            
1202
        }
1203

    
1204

    
1205
    }
1206
    
1207
}
(3-3/5)