Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.ByteArrayInputStream;
27
import java.io.File;
28
import java.io.FileInputStream;
29
import java.io.FileOutputStream;
30
import java.io.IOException;
31
import java.io.InputStream;
32
import java.io.UnsupportedEncodingException;
33
import java.math.BigInteger;
34
import java.net.URISyntaxException;
35
import java.security.NoSuchAlgorithmException;
36
import java.util.ArrayList;
37
import java.util.Calendar;
38
import java.util.Date;
39
import java.util.HashSet;
40
import java.util.List;
41
import java.util.Map;
42
import java.util.Set;
43
import java.util.Timer;
44
import java.util.UUID;
45
import java.util.Vector;
46

    
47
import javax.servlet.http.HttpServletRequest;
48

    
49
import org.apache.commons.io.IOUtils;
50
import org.apache.log4j.Logger;
51
import org.dataone.client.CNode;
52
import org.dataone.client.D1Client;
53
import org.dataone.client.MNode;
54
import org.dataone.client.auth.CertificateManager;
55
import org.dataone.client.formats.ObjectFormatInfo;
56
import org.dataone.configuration.Settings;
57
import org.dataone.ore.ResourceMapFactory;
58
import org.dataone.service.exceptions.BaseException;
59
import org.dataone.service.exceptions.IdentifierNotUnique;
60
import org.dataone.service.exceptions.InsufficientResources;
61
import org.dataone.service.exceptions.InvalidRequest;
62
import org.dataone.service.exceptions.InvalidSystemMetadata;
63
import org.dataone.service.exceptions.InvalidToken;
64
import org.dataone.service.exceptions.NotAuthorized;
65
import org.dataone.service.exceptions.NotFound;
66
import org.dataone.service.exceptions.NotImplemented;
67
import org.dataone.service.exceptions.ServiceFailure;
68
import org.dataone.service.exceptions.SynchronizationFailed;
69
import org.dataone.service.exceptions.UnsupportedType;
70
import org.dataone.service.mn.tier1.v1.MNCore;
71
import org.dataone.service.mn.tier1.v1.MNRead;
72
import org.dataone.service.mn.tier2.v1.MNAuthorization;
73
import org.dataone.service.mn.tier3.v1.MNStorage;
74
import org.dataone.service.mn.tier4.v1.MNReplication;
75
import org.dataone.service.mn.v1.MNQuery;
76
import org.dataone.service.types.v1.Checksum;
77
import org.dataone.service.types.v1.DescribeResponse;
78
import org.dataone.service.types.v1.Event;
79
import org.dataone.service.types.v1.Identifier;
80
import org.dataone.service.types.v1.Log;
81
import org.dataone.service.types.v1.LogEntry;
82
import org.dataone.service.types.v1.MonitorInfo;
83
import org.dataone.service.types.v1.MonitorList;
84
import org.dataone.service.types.v1.Node;
85
import org.dataone.service.types.v1.NodeList;
86
import org.dataone.service.types.v1.NodeReference;
87
import org.dataone.service.types.v1.NodeState;
88
import org.dataone.service.types.v1.NodeType;
89
import org.dataone.service.types.v1.ObjectFormatIdentifier;
90
import org.dataone.service.types.v1.ObjectList;
91
import org.dataone.service.types.v1.Permission;
92
import org.dataone.service.types.v1.Ping;
93
import org.dataone.service.types.v1.ReplicationStatus;
94
import org.dataone.service.types.v1.Schedule;
95
import org.dataone.service.types.v1.Service;
96
import org.dataone.service.types.v1.Services;
97
import org.dataone.service.types.v1.Session;
98
import org.dataone.service.types.v1.Subject;
99
import org.dataone.service.types.v1.Synchronization;
100
import org.dataone.service.types.v1.SystemMetadata;
101
import org.dataone.service.types.v1.util.AuthUtils;
102
import org.dataone.service.types.v1.util.ChecksumUtil;
103
import org.dataone.service.types.v1_1.QueryEngineDescription;
104
import org.dataone.service.types.v1_1.QueryEngineList;
105
import org.dataone.service.types.v1_1.QueryField;
106
import org.dataone.service.util.Constants;
107
import org.dspace.foresite.OREException;
108
import org.dspace.foresite.OREParserException;
109
import org.dspace.foresite.ORESerialiserException;
110
import org.dspace.foresite.ResourceMap;
111

    
112
import edu.ucsb.nceas.ezid.EZIDException;
113
import edu.ucsb.nceas.metacat.DBQuery;
114
import edu.ucsb.nceas.metacat.EventLog;
115
import edu.ucsb.nceas.metacat.IdentifierManager;
116
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
117
import edu.ucsb.nceas.metacat.MetaCatServlet;
118
import edu.ucsb.nceas.metacat.MetacatHandler;
119

    
120
import edu.ucsb.nceas.metacat.common.query.EnabledQueryEngines;
121
import edu.ucsb.nceas.metacat.common.query.stream.ContentTypeByteArrayInputStream;
122
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
123
import edu.ucsb.nceas.metacat.index.MetacatSolrEngineDescriptionHandler;
124
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
125
import edu.ucsb.nceas.metacat.properties.PropertyService;
126
import edu.ucsb.nceas.metacat.shared.MetacatUtilException;
127
import edu.ucsb.nceas.metacat.util.DocumentUtil;
128
import edu.ucsb.nceas.metacat.util.SystemUtil;
129
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
130
import gov.loc.repository.bagit.Bag;
131
import gov.loc.repository.bagit.BagFactory;
132
import gov.loc.repository.bagit.writer.Writer;
133
import gov.loc.repository.bagit.writer.impl.ZipWriter;
134

    
135
/**
136
 * Represents Metacat's implementation of the DataONE Member Node 
137
 * service API. Methods implement the various MN* interfaces, and methods common
138
 * to both Member Node and Coordinating Node interfaces are found in the
139
 * D1NodeService base class.
140
 * 
141
 * Implements:
142
 * MNCore.ping()
143
 * MNCore.getLogRecords()
144
 * MNCore.getObjectStatistics()
145
 * MNCore.getOperationStatistics()
146
 * MNCore.getStatus()
147
 * MNCore.getCapabilities()
148
 * MNRead.get()
149
 * MNRead.getSystemMetadata()
150
 * MNRead.describe()
151
 * MNRead.getChecksum()
152
 * MNRead.listObjects()
153
 * MNRead.synchronizationFailed()
154
 * MNAuthorization.isAuthorized()
155
 * MNAuthorization.setAccessPolicy()
156
 * MNStorage.create()
157
 * MNStorage.update()
158
 * MNStorage.delete()
159
 * MNReplication.replicate()
160
 * 
161
 */
162
public class MNodeService extends D1NodeService 
163
    implements MNAuthorization, MNCore, MNRead, MNReplication, MNStorage, MNQuery {
164

    
165
    //private static final String PATHQUERY = "pathquery";
166
	public static final String UUID_SCHEME = "UUID";
167
	public static final String DOI_SCHEME = "DOI";
168
	private static final String UUID_PREFIX = "urn:uuid:";
169

    
170
	/* the logger instance */
171
    private Logger logMetacat = null;
172
    
173
    /* A reference to a remote Memeber Node */
174
    private MNode mn;
175
    
176
    /* A reference to a Coordinating Node */
177
    private CNode cn;
178

    
179

    
180
    /**
181
     * Singleton accessor to get an instance of MNodeService.
182
     * 
183
     * @return instance - the instance of MNodeService
184
     */
185
    public static MNodeService getInstance(HttpServletRequest request) {
186
        return new MNodeService(request);
187
    }
188

    
189
    /**
190
     * Constructor, private for singleton access
191
     */
192
    private MNodeService(HttpServletRequest request) {
193
        super(request);
194
        logMetacat = Logger.getLogger(MNodeService.class);
195
        
196
        // set the Member Node certificate file location
197
        CertificateManager.getInstance().setCertificateLocation(Settings.getConfiguration().getString("D1Client.certificate.file"));
198
    }
199

    
200
    /**
201
     * Deletes an object from the Member Node, where the object is either a 
202
     * data object or a science metadata object.
203
     * 
204
     * @param session - the Session object containing the credentials for the Subject
205
     * @param pid - The object identifier to be deleted
206
     * 
207
     * @return pid - the identifier of the object used for the deletion
208
     * 
209
     * @throws InvalidToken
210
     * @throws ServiceFailure
211
     * @throws NotAuthorized
212
     * @throws NotFound
213
     * @throws NotImplemented
214
     * @throws InvalidRequest
215
     */
216
    @Override
217
    public Identifier delete(Session session, Identifier pid) 
218
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
219

    
220
    	// only admin of  the MN or the CN is allowed a full delete
221
        boolean allowed = false;
222
        allowed = isAdminAuthorized(session);
223
        if (!allowed) { 
224
            throw new NotAuthorized("1320", "The provided identity does not have " + "permission to delete objects on the Node.");
225
        }
226
    	
227
    	// defer to superclass implementation
228
        return super.delete(session, pid);
229
    }
230

    
231
    /**
232
     * Updates an existing object by creating a new object identified by 
233
     * newPid on the Member Node which explicitly obsoletes the object 
234
     * identified by pid through appropriate changes to the SystemMetadata 
235
     * of pid and newPid
236
     * 
237
     * @param session - the Session object containing the credentials for the Subject
238
     * @param pid - The identifier of the object to be updated
239
     * @param object - the new object bytes
240
     * @param sysmeta - the new system metadata describing the object
241
     * 
242
     * @return newPid - the identifier of the new object
243
     * 
244
     * @throws InvalidToken
245
     * @throws ServiceFailure
246
     * @throws NotAuthorized
247
     * @throws NotFound
248
     * @throws NotImplemented
249
     * @throws IdentifierNotUnique
250
     * @throws UnsupportedType
251
     * @throws InsufficientResources
252
     * @throws InvalidSystemMetadata
253
     * @throws InvalidRequest
254
     */
255
    @Override
256
    public Identifier update(Session session, Identifier pid, InputStream object, 
257
        Identifier newPid, SystemMetadata sysmeta) 
258
        throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
259
        UnsupportedType, InsufficientResources, NotFound, 
260
        InvalidSystemMetadata, NotImplemented, InvalidRequest {
261

    
262
        String localId = null;
263
        boolean allowed = false;
264
        boolean isScienceMetadata = false;
265
        
266
        if (session == null) {
267
        	throw new InvalidToken("1210", "No session has been provided");
268
        }
269
        Subject subject = session.getSubject();
270

    
271
        // verify the pid is valid format
272
        if (!isValidIdentifier(pid)) {
273
        	throw new InvalidRequest("1202", "The provided identifier is invalid.");
274
        }
275

    
276
        // check for the existing identifier
277
        try {
278
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
279
            
280
        } catch (McdbDocNotFoundException e) {
281
            throw new InvalidRequest("1202", "The object with the provided " + 
282
                "identifier was not found.");
283
            
284
        }
285
        
286
        // set the originating node
287
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
288
        sysmeta.setOriginMemberNode(originMemberNode);
289
        
290
        // set the submitter to match the certificate
291
        sysmeta.setSubmitter(subject);
292
        // set the dates
293
        Date now = Calendar.getInstance().getTime();
294
        sysmeta.setDateSysMetadataModified(now);
295
        sysmeta.setDateUploaded(now);
296
        
297
        // make sure serial version is set to something
298
        BigInteger serialVersion = sysmeta.getSerialVersion();
299
        if (serialVersion == null) {
300
        	sysmeta.setSerialVersion(BigInteger.ZERO);
301
        }
302

    
303
        // does the subject have WRITE ( == update) priveleges on the pid?
304
        allowed = isAuthorized(session, pid, Permission.WRITE);
305

    
306
        if (allowed) {
307
        	
308
        	// check quality of SM
309
        	if (sysmeta.getObsoletedBy() != null) {
310
        		throw new InvalidSystemMetadata("1300", "Cannot include obsoletedBy when updating object");
311
        	}
312
        	if (sysmeta.getObsoletes() != null && !sysmeta.getObsoletes().getValue().equals(pid.getValue())) {
313
        		throw new InvalidSystemMetadata("1300", "The identifier provided in obsoletes does not match old Identifier");
314
        	}
315

    
316
            // get the existing system metadata for the object
317
            SystemMetadata existingSysMeta = getSystemMetadata(session, pid);
318

    
319
            // check for previous update
320
            // see: https://redmine.dataone.org/issues/3336
321
            Identifier existingObsoletedBy = existingSysMeta.getObsoletedBy();
322
            if (existingObsoletedBy != null) {
323
            	throw new InvalidRequest("1202", 
324
            			"The previous identifier has already been made obsolete by: " + existingObsoletedBy.getValue());
325
            }
326
            
327
            // add the newPid to the obsoletedBy list for the existing sysmeta
328
            existingSysMeta.setObsoletedBy(newPid);
329

    
330
            // then update the existing system metadata
331
            updateSystemMetadata(existingSysMeta);
332

    
333
            // prep the new system metadata, add pid to the affected lists
334
            sysmeta.setObsoletes(pid);
335
            //sysmeta.addDerivedFrom(pid);
336

    
337
            isScienceMetadata = isScienceMetadata(sysmeta);
338

    
339
            // do we have XML metadata or a data object?
340
            if (isScienceMetadata) {
341

    
342
                // update the science metadata XML document
343
                // TODO: handle non-XML metadata/data documents (like netCDF)
344
                // TODO: don't put objects into memory using stream to string
345
                String objectAsXML = "";
346
                try {
347
                    objectAsXML = IOUtils.toString(object, "UTF-8");
348
                    // give the old pid so we can calculate the new local id 
349
                    localId = insertOrUpdateDocument(objectAsXML, pid, session, "update");
350
                    // register the newPid and the generated localId
351
                    if (newPid != null) {
352
                        IdentifierManager.getInstance().createMapping(newPid.getValue(), localId);
353

    
354
                    }
355

    
356
                } catch (IOException e) {
357
                    String msg = "The Node is unable to create the object. " + "There was a problem converting the object to XML";
358
                    logMetacat.info(msg);
359
                    throw new ServiceFailure("1310", msg + ": " + e.getMessage());
360

    
361
                }
362

    
363
            } else {
364

    
365
                // update the data object
366
                localId = insertDataObject(object, newPid, session);
367

    
368
            }
369

    
370
            // and insert the new system metadata
371
            insertSystemMetadata(sysmeta);
372

    
373
            // log the update event
374
            EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), subject.getValue(), localId, Event.UPDATE.toString());
375
            
376
            // attempt to register the identifier - it checks if it is a doi
377
            try {
378
    			DOIService.getInstance().registerDOI(sysmeta);
379
    		} catch (EZIDException e) {
380
                throw new ServiceFailure("1190", "Could not register DOI: " + e.getMessage());
381
    		}
382

    
383
        } else {
384
            throw new NotAuthorized("1200", "The provided identity does not have " + "permission to UPDATE the object identified by " + pid.getValue()
385
                    + " on the Member Node.");
386
        }
387

    
388
        return newPid;
389
    }
390

    
391
    public Identifier create(Session session, Identifier pid, InputStream object, SystemMetadata sysmeta) throws InvalidToken, ServiceFailure, NotAuthorized,
392
            IdentifierNotUnique, UnsupportedType, InsufficientResources, InvalidSystemMetadata, NotImplemented, InvalidRequest {
393

    
394
        // check for null session
395
        if (session == null) {
396
          throw new InvalidToken("1110", "Session is required to WRITE to the Node.");
397
        }
398
        // set the submitter to match the certificate
399
        sysmeta.setSubmitter(session.getSubject());
400
        // set the originating node
401
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
402
        sysmeta.setOriginMemberNode(originMemberNode);
403
        sysmeta.setArchived(false);
404

    
405
        // set the dates
406
        Date now = Calendar.getInstance().getTime();
407
        sysmeta.setDateSysMetadataModified(now);
408
        sysmeta.setDateUploaded(now);
409
        
410
        // set the serial version
411
        sysmeta.setSerialVersion(BigInteger.ZERO);
412

    
413
        // check that we are not attempting to subvert versioning
414
        if (sysmeta.getObsoletes() != null && sysmeta.getObsoletes().getValue() != null) {
415
            throw new InvalidSystemMetadata("1180", 
416
              "The supplied system metadata is invalid. " +
417
              "The obsoletes field cannot have a value when creating entries.");
418
        }
419
        
420
        if (sysmeta.getObsoletedBy() != null && sysmeta.getObsoletedBy().getValue() != null) {
421
            throw new InvalidSystemMetadata("1180", 
422
              "The supplied system metadata is invalid. " +
423
              "The obsoletedBy field cannot have a value when creating entries.");
424
        }
425

    
426
        // call the shared impl
427
        Identifier resultPid = super.create(session, pid, object, sysmeta);
428
        
429
        // attempt to register the identifier - it checks if it is a doi
430
        try {
431
			DOIService.getInstance().registerDOI(sysmeta);
432
		} catch (EZIDException e) {
433
			ServiceFailure sf = new ServiceFailure("1190", "Could not register DOI: " + e.getMessage());
434
			sf.initCause(e);
435
            throw sf;
436
		}
437
        
438
        // return 
439
		return resultPid ;
440
    }
441

    
442
    /**
443
     * Called by a Coordinating Node to request that the Member Node create a 
444
     * copy of the specified object by retrieving it from another Member 
445
     * Node and storing it locally so that it can be made accessible to 
446
     * the DataONE system.
447
     * 
448
     * @param session - the Session object containing the credentials for the Subject
449
     * @param sysmeta - Copy of the CN held system metadata for the object
450
     * @param sourceNode - A reference to node from which the content should be 
451
     *                     retrieved. The reference should be resolved by 
452
     *                     checking the CN node registry.
453
     * 
454
     * @return true if the replication succeeds
455
     * 
456
     * @throws ServiceFailure
457
     * @throws NotAuthorized
458
     * @throws NotImplemented
459
     * @throws UnsupportedType
460
     * @throws InsufficientResources
461
     * @throws InvalidRequest
462
     */
463
    @Override
464
    public boolean replicate(Session session, SystemMetadata sysmeta,
465
            NodeReference sourceNode) throws NotImplemented, ServiceFailure,
466
            NotAuthorized, InvalidRequest, InsufficientResources,
467
            UnsupportedType {
468

    
469
        if (session != null && sysmeta != null && sourceNode != null) {
470
            logMetacat.info("MNodeService.replicate() called with parameters: \n" +
471
                            "\tSession.Subject      = "                           +
472
                            session.getSubject().getValue() + "\n"                +
473
                            "\tidentifier           = "                           + 
474
                            sysmeta.getIdentifier().getValue()                    +
475
                            "\n" + "\tSource NodeReference ="                     +
476
                            sourceNode.getValue());
477
        }
478
        boolean result = false;
479
        String nodeIdStr = null;
480
        NodeReference nodeId = null;
481

    
482
        // get the referenced object
483
        Identifier pid = sysmeta.getIdentifier();
484

    
485
        // get from the membernode
486
        // TODO: switch credentials for the server retrieval?
487
        this.mn = D1Client.getMN(sourceNode);
488
        this.cn = D1Client.getCN();
489
        InputStream object = null;
490
        Session thisNodeSession = null;
491
        SystemMetadata localSystemMetadata = null;
492
        BaseException failure = null;
493
        String localId = null;
494
        
495
        // TODO: check credentials
496
        // cannot be called by public
497
        if (session == null || session.getSubject() == null) {
498
            String msg = "No session was provided to replicate identifier " +
499
            sysmeta.getIdentifier().getValue();
500
            logMetacat.info(msg);
501
            throw new NotAuthorized("2152", msg);
502
            
503
        }
504

    
505

    
506
        // get the local node id
507
        try {
508
            nodeIdStr = PropertyService.getProperty("dataone.nodeId");
509
            nodeId = new NodeReference();
510
            nodeId.setValue(nodeIdStr);
511

    
512
        } catch (PropertyNotFoundException e1) {
513
            String msg = "Couldn't get dataone.nodeId property: " + e1.getMessage();
514
            failure = new ServiceFailure("2151", msg);
515
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
516
            logMetacat.error(msg);
517
            return true;
518

    
519
        }
520
        
521

    
522
        try {
523
            // do we already have a replica?
524
            try {
525
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
526
                // if we have a local id, get the local object
527
                try {
528
                    object = MetacatHandler.read(localId);
529
                } catch (Exception e) {
530
                	// NOTE: we may already know about this ID because it could be a data file described by a metadata file
531
                	// https://redmine.dataone.org/issues/2572
532
                	// TODO: fix this so that we don't prevent ourselves from getting replicas
533
                	
534
                    // let the CN know that the replication failed
535
                	logMetacat.warn("Object content not found on this node despite having localId: " + localId);
536
                	String msg = "Can't read the object bytes properly, replica is invalid.";
537
                    ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
538
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
539
                    logMetacat.warn(msg);
540
                    throw serviceFailure;
541
                    
542
                }
543

    
544
            } catch (McdbDocNotFoundException e) {
545
                logMetacat.info("No replica found. Continuing.");
546
                
547
            }
548
            
549
            // no local replica, get a replica
550
            if ( object == null ) {
551
                // session should be null to use the default certificate
552
                // location set in the Certificate manager
553
                object = mn.getReplica(thisNodeSession, pid);
554
                logMetacat.info("MNodeService.getReplica() called for identifier "
555
                                + pid.getValue());
556

    
557
            }
558

    
559
        } catch (InvalidToken e) {            
560
            String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
561
            failure = new ServiceFailure("2151", msg);
562
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
563
            logMetacat.error(msg);
564
            throw new ServiceFailure("2151", msg);
565

    
566
        } catch (NotFound e) {
567
            String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
568
            failure = new ServiceFailure("2151", msg);
569
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
570
            logMetacat.error(msg);
571
            throw new ServiceFailure("2151", msg);
572

    
573
        }
574

    
575
        // verify checksum on the object, if supported
576
        if (object.markSupported()) {
577
            Checksum givenChecksum = sysmeta.getChecksum();
578
            Checksum computedChecksum = null;
579
            try {
580
                computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
581
                object.reset();
582

    
583
            } catch (Exception e) {
584
                String msg = "Error computing checksum on replica: " + e.getMessage();
585
                logMetacat.error(msg);
586
                ServiceFailure sf = new ServiceFailure("2151", msg);
587
                sf.initCause(e);
588
                throw sf;
589
            }
590
            if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
591
                logMetacat.error("Given    checksum for " + pid.getValue() + 
592
                    "is " + givenChecksum.getValue());
593
                logMetacat.error("Computed checksum for " + pid.getValue() + 
594
                    "is " + computedChecksum.getValue());
595
                throw new ServiceFailure("2151",
596
                        "Computed checksum does not match declared checksum");
597
            }
598
        }
599

    
600
        // add it to local store
601
        Identifier retPid;
602
        try {
603
            // skip the MN.create -- this mutates the system metadata and we don't want it to
604
            if ( localId == null ) {
605
                // TODO: this will fail if we already "know" about the identifier
606
            	// FIXME: see https://redmine.dataone.org/issues/2572
607
                retPid = super.create(session, pid, object, sysmeta);
608
                result = (retPid.getValue().equals(pid.getValue()));
609
            }
610
            
611
        } catch (Exception e) {
612
            String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
613
            failure = new ServiceFailure("2151", msg);
614
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
615
            logMetacat.error(msg);
616
            throw new ServiceFailure("2151", msg);
617
            
618
        }
619

    
620
        // finish by setting the replication status
621
        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
622
        return result;
623

    
624
    }
625

    
626
    /**
627
     * Return the object identified by the given object identifier
628
     * 
629
     * @param session - the Session object containing the credentials for the Subject
630
     * @param pid - the object identifier for the given object
631
     * 
632
     * @return inputStream - the input stream of the given object
633
     * 
634
     * @throws InvalidToken
635
     * @throws ServiceFailure
636
     * @throws NotAuthorized
637
     * @throws InvalidRequest
638
     * @throws NotImplemented
639
     */
640
    @Override
641
    public InputStream get(Session session, Identifier pid) 
642
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
643

    
644
        return super.get(session, pid);
645

    
646
    }
647

    
648
    /**
649
     * Returns a Checksum for the specified object using an accepted hashing algorithm
650
     * 
651
     * @param session - the Session object containing the credentials for the Subject
652
     * @param pid - the object identifier for the given object
653
     * @param algorithm -  the name of an algorithm that will be used to compute 
654
     *                     a checksum of the bytes of the object
655
     * 
656
     * @return checksum - the checksum of the given object
657
     * 
658
     * @throws InvalidToken
659
     * @throws ServiceFailure
660
     * @throws NotAuthorized
661
     * @throws NotFound
662
     * @throws InvalidRequest
663
     * @throws NotImplemented
664
     */
665
    @Override
666
    public Checksum getChecksum(Session session, Identifier pid, String algorithm) 
667
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
668
        InvalidRequest, NotImplemented {
669

    
670
        Checksum checksum = null;
671

    
672
        InputStream inputStream = get(session, pid);
673

    
674
        try {
675
            checksum = ChecksumUtil.checksum(inputStream, algorithm);
676

    
677
        } catch (NoSuchAlgorithmException e) {
678
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
679
                    + e.getMessage());
680
        } catch (IOException e) {
681
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
682
                    + e.getMessage());
683
        }
684

    
685
        if (checksum == null) {
686
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned.");
687
        }
688

    
689
        return checksum;
690
    }
691

    
692
    /**
693
     * Return the system metadata for a given object
694
     * 
695
     * @param session - the Session object containing the credentials for the Subject
696
     * @param pid - the object identifier for the given object
697
     * 
698
     * @return inputStream - the input stream of the given system metadata object
699
     * 
700
     * @throws InvalidToken
701
     * @throws ServiceFailure
702
     * @throws NotAuthorized
703
     * @throws NotFound
704
     * @throws InvalidRequest
705
     * @throws NotImplemented
706
     */
707
    @Override
708
    public SystemMetadata getSystemMetadata(Session session, Identifier pid) 
709
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
710
        NotImplemented {
711

    
712
        return super.getSystemMetadata(session, pid);
713
    }
714

    
715
    /**
716
     * Retrieve the list of objects present on the MN that match the calling parameters
717
     * 
718
     * @param session - the Session object containing the credentials for the Subject
719
     * @param startTime - Specifies the beginning of the time range from which 
720
     *                    to return object (>=)
721
     * @param endTime - Specifies the beginning of the time range from which 
722
     *                  to return object (>=)
723
     * @param objectFormat - Restrict results to the specified object format
724
     * @param replicaStatus - Indicates if replicated objects should be returned in the list
725
     * @param start - The zero-based index of the first value, relative to the 
726
     *                first record of the resultset that matches the parameters.
727
     * @param count - The maximum number of entries that should be returned in 
728
     *                the response. The Member Node may return less entries 
729
     *                than specified in this value.
730
     * 
731
     * @return objectList - the list of objects matching the criteria
732
     * 
733
     * @throws InvalidToken
734
     * @throws ServiceFailure
735
     * @throws NotAuthorized
736
     * @throws InvalidRequest
737
     * @throws NotImplemented
738
     */
739
    @Override
740
    public ObjectList listObjects(Session session, Date startTime, Date endTime, ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
741
            Integer count) throws NotAuthorized, InvalidRequest, NotImplemented, ServiceFailure, InvalidToken {
742

    
743
        ObjectList objectList = null;
744

    
745
        try {
746
        	// safeguard against large requests
747
            if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
748
            	count = MAXIMUM_DB_RECORD_COUNT;
749
            }
750
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, objectFormatId, replicaStatus, start, count);
751
        } catch (Exception e) {
752
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
753
        }
754

    
755
        return objectList;
756
    }
757

    
758
    /**
759
     * Return a description of the node's capabilities and services.
760
     * 
761
     * @return node - the technical capabilities of the Member Node
762
     * 
763
     * @throws ServiceFailure
764
     * @throws NotAuthorized
765
     * @throws InvalidRequest
766
     * @throws NotImplemented
767
     */
768
    @Override
769
    public Node getCapabilities() 
770
        throws NotImplemented, ServiceFailure {
771

    
772
        String nodeName = null;
773
        String nodeId = null;
774
        String subject = null;
775
        String contactSubject = null;
776
        String nodeDesc = null;
777
        String nodeTypeString = null;
778
        NodeType nodeType = null;
779
        String mnCoreServiceVersion = null;
780
        String mnReadServiceVersion = null;
781
        String mnAuthorizationServiceVersion = null;
782
        String mnStorageServiceVersion = null;
783
        String mnReplicationServiceVersion = null;
784

    
785
        boolean nodeSynchronize = false;
786
        boolean nodeReplicate = false;
787
        boolean mnCoreServiceAvailable = false;
788
        boolean mnReadServiceAvailable = false;
789
        boolean mnAuthorizationServiceAvailable = false;
790
        boolean mnStorageServiceAvailable = false;
791
        boolean mnReplicationServiceAvailable = false;
792

    
793
        try {
794
            // get the properties of the node based on configuration information
795
            nodeName = PropertyService.getProperty("dataone.nodeName");
796
            nodeId = PropertyService.getProperty("dataone.nodeId");
797
            subject = PropertyService.getProperty("dataone.subject");
798
            contactSubject = PropertyService.getProperty("dataone.contactSubject");
799
            nodeDesc = PropertyService.getProperty("dataone.nodeDescription");
800
            nodeTypeString = PropertyService.getProperty("dataone.nodeType");
801
            nodeType = NodeType.convert(nodeTypeString);
802
            nodeSynchronize = new Boolean(PropertyService.getProperty("dataone.nodeSynchronize")).booleanValue();
803
            nodeReplicate = new Boolean(PropertyService.getProperty("dataone.nodeReplicate")).booleanValue();
804

    
805
            mnCoreServiceVersion = PropertyService.getProperty("dataone.mnCore.serviceVersion");
806
            mnReadServiceVersion = PropertyService.getProperty("dataone.mnRead.serviceVersion");
807
            mnAuthorizationServiceVersion = PropertyService.getProperty("dataone.mnAuthorization.serviceVersion");
808
            mnStorageServiceVersion = PropertyService.getProperty("dataone.mnStorage.serviceVersion");
809
            mnReplicationServiceVersion = PropertyService.getProperty("dataone.mnReplication.serviceVersion");
810

    
811
            mnCoreServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnCore.serviceAvailable")).booleanValue();
812
            mnReadServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnRead.serviceAvailable")).booleanValue();
813
            mnAuthorizationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnAuthorization.serviceAvailable")).booleanValue();
814
            mnStorageServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnStorage.serviceAvailable")).booleanValue();
815
            mnReplicationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnReplication.serviceAvailable")).booleanValue();
816

    
817
            // Set the properties of the node based on configuration information and
818
            // calls to current status methods
819
            String serviceName = SystemUtil.getSecureContextURL() + "/" + PropertyService.getProperty("dataone.serviceName");
820
            Node node = new Node();
821
            node.setBaseURL(serviceName + "/" + nodeTypeString);
822
            node.setDescription(nodeDesc);
823

    
824
            // set the node's health information
825
            node.setState(NodeState.UP);
826
            
827
            // set the ping response to the current value
828
            Ping canPing = new Ping();
829
            canPing.setSuccess(false);
830
            try {
831
            	Date pingDate = ping();
832
                canPing.setSuccess(pingDate != null);
833
            } catch (BaseException e) {
834
                e.printStackTrace();
835
                // guess it can't be pinged
836
            }
837
            
838
            node.setPing(canPing);
839

    
840
            NodeReference identifier = new NodeReference();
841
            identifier.setValue(nodeId);
842
            node.setIdentifier(identifier);
843
            Subject s = new Subject();
844
            s.setValue(subject);
845
            node.addSubject(s);
846
            Subject contact = new Subject();
847
            contact.setValue(contactSubject);
848
            node.addContactSubject(contact);
849
            node.setName(nodeName);
850
            node.setReplicate(nodeReplicate);
851
            node.setSynchronize(nodeSynchronize);
852

    
853
            // services: MNAuthorization, MNCore, MNRead, MNReplication, MNStorage
854
            Services services = new Services();
855

    
856
            Service sMNCore = new Service();
857
            sMNCore.setName("MNCore");
858
            sMNCore.setVersion(mnCoreServiceVersion);
859
            sMNCore.setAvailable(mnCoreServiceAvailable);
860

    
861
            Service sMNRead = new Service();
862
            sMNRead.setName("MNRead");
863
            sMNRead.setVersion(mnReadServiceVersion);
864
            sMNRead.setAvailable(mnReadServiceAvailable);
865

    
866
            Service sMNAuthorization = new Service();
867
            sMNAuthorization.setName("MNAuthorization");
868
            sMNAuthorization.setVersion(mnAuthorizationServiceVersion);
869
            sMNAuthorization.setAvailable(mnAuthorizationServiceAvailable);
870

    
871
            Service sMNStorage = new Service();
872
            sMNStorage.setName("MNStorage");
873
            sMNStorage.setVersion(mnStorageServiceVersion);
874
            sMNStorage.setAvailable(mnStorageServiceAvailable);
875

    
876
            Service sMNReplication = new Service();
877
            sMNReplication.setName("MNReplication");
878
            sMNReplication.setVersion(mnReplicationServiceVersion);
879
            sMNReplication.setAvailable(mnReplicationServiceAvailable);
880

    
881
            services.addService(sMNRead);
882
            services.addService(sMNCore);
883
            services.addService(sMNAuthorization);
884
            services.addService(sMNStorage);
885
            services.addService(sMNReplication);
886
            node.setServices(services);
887

    
888
            // Set the schedule for synchronization
889
            Synchronization synchronization = new Synchronization();
890
            Schedule schedule = new Schedule();
891
            Date now = new Date();
892
            schedule.setYear(PropertyService.getProperty("dataone.nodeSynchronization.schedule.year"));
893
            schedule.setMon(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mon"));
894
            schedule.setMday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mday"));
895
            schedule.setWday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.wday"));
896
            schedule.setHour(PropertyService.getProperty("dataone.nodeSynchronization.schedule.hour"));
897
            schedule.setMin(PropertyService.getProperty("dataone.nodeSynchronization.schedule.min"));
898
            schedule.setSec(PropertyService.getProperty("dataone.nodeSynchronization.schedule.sec"));
899
            synchronization.setSchedule(schedule);
900
            synchronization.setLastHarvested(now);
901
            synchronization.setLastCompleteHarvest(now);
902
            node.setSynchronization(synchronization);
903

    
904
            node.setType(nodeType);
905
            return node;
906

    
907
        } catch (PropertyNotFoundException pnfe) {
908
            String msg = "MNodeService.getCapabilities(): " + "property not found: " + pnfe.getMessage();
909
            logMetacat.error(msg);
910
            throw new ServiceFailure("2162", msg);
911
        }
912
    }
913

    
914
    /**
915
     * Returns the number of operations that have been serviced by the node 
916
     * over time periods of one and 24 hours.
917
     * 
918
     * @param session - the Session object containing the credentials for the Subject
919
     * @param period - An ISO8601 compatible DateTime range specifying the time 
920
     *                 range for which to return operation statistics.
921
     * @param requestor - Limit to operations performed by given requestor identity.
922
     * @param event -  Enumerated value indicating the type of event being examined
923
     * @param format - Limit to events involving objects of the specified format
924
     * 
925
     * @return the desired log records
926
     * 
927
     * @throws InvalidToken
928
     * @throws ServiceFailure
929
     * @throws NotAuthorized
930
     * @throws InvalidRequest
931
     * @throws NotImplemented
932
     */
933
    public MonitorList getOperationStatistics(Session session, Date startTime, 
934
        Date endTime, Subject requestor, Event event, ObjectFormatIdentifier formatId)
935
        throws NotImplemented, ServiceFailure, NotAuthorized, InsufficientResources, UnsupportedType {
936

    
937
        MonitorList monitorList = new MonitorList();
938

    
939
        try {
940

    
941
            // get log records first
942
            Log logs = getLogRecords(session, startTime, endTime, event, null, 0, null);
943

    
944
            // TODO: aggregate by day or hour -- needs clarification
945
            int count = 1;
946
            for (LogEntry logEntry : logs.getLogEntryList()) {
947
                Identifier pid = logEntry.getIdentifier();
948
                Date logDate = logEntry.getDateLogged();
949
                // if we are filtering by format
950
                if (formatId != null) {
951
                    SystemMetadata sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
952
                    if (!sysmeta.getFormatId().getValue().equals(formatId.getValue())) {
953
                        // does not match
954
                        continue;
955
                    }
956
                }
957
                MonitorInfo item = new MonitorInfo();
958
                item.setCount(count);
959
                item.setDate(new java.sql.Date(logDate.getTime()));
960
                monitorList.addMonitorInfo(item);
961

    
962
            }
963
        } catch (Exception e) {
964
            e.printStackTrace();
965
            throw new ServiceFailure("2081", "Could not retrieve statistics: " + e.getMessage());
966
        }
967

    
968
        return monitorList;
969

    
970
    }
971

    
972
    /**
973
     * A callback method used by a CN to indicate to a MN that it cannot 
974
     * complete synchronization of the science metadata identified by pid.  Log
975
     * the event in the metacat event log.
976
     * 
977
     * @param session
978
     * @param syncFailed
979
     * 
980
     * @throws ServiceFailure
981
     * @throws NotAuthorized
982
     * @throws NotImplemented
983
     */
984
    @Override
985
    public boolean synchronizationFailed(Session session, SynchronizationFailed syncFailed) 
986
        throws NotImplemented, ServiceFailure, NotAuthorized {
987

    
988
        String localId;
989
        Identifier pid;
990
        if ( syncFailed.getPid() != null ) {
991
            pid = new Identifier();
992
            pid.setValue(syncFailed.getPid());
993
            boolean allowed;
994
            
995
            //are we allowed? only CNs
996
            try {
997
                allowed = isAdminAuthorized(session);
998
                if ( !allowed ){
999
                    throw new NotAuthorized("2162", 
1000
                            "Not allowed to call synchronizationFailed() on this node.");
1001
                }
1002
            } catch (InvalidToken e) {
1003
                throw new NotAuthorized("2162", 
1004
                        "Not allowed to call synchronizationFailed() on this node.");
1005

    
1006
            }
1007
            
1008
        } else {
1009
            throw new ServiceFailure("2161", "The identifier cannot be null.");
1010

    
1011
        }
1012
        
1013
        try {
1014
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1015
        } catch (McdbDocNotFoundException e) {
1016
            throw new ServiceFailure("2161", "The identifier specified by " + 
1017
                    syncFailed.getPid() + " was not found on this node.");
1018

    
1019
        }
1020
        // TODO: update the CN URL below when the CNRead.SynchronizationFailed
1021
        // method is changed to include the URL as a parameter
1022
        logMetacat.debug("Synchronization for the object identified by " + 
1023
                pid.getValue() + " failed from " + syncFailed.getNodeId() + 
1024
                " Logging the event to the Metacat EventLog as a 'syncFailed' event.");
1025
        // TODO: use the event type enum when the SYNCHRONIZATION_FAILED event is added
1026
        String principal = Constants.SUBJECT_PUBLIC;
1027
        if (session != null && session.getSubject() != null) {
1028
          principal = session.getSubject().getValue();
1029
        }
1030
        try {
1031
          EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), principal, localId, "synchronization_failed");
1032
        } catch (Exception e) {
1033
            throw new ServiceFailure("2161", "Could not log the error for: " + pid.getValue());
1034
        }
1035
        //EventLog.getInstance().log("CN URL WILL GO HERE", 
1036
        //  session.getSubject().getValue(), localId, Event.SYNCHRONIZATION_FAILED);
1037
        return true;
1038

    
1039
    }
1040

    
1041
    /**
1042
     * Essentially a get() but with different logging behavior
1043
     */
1044
    @Override
1045
    public InputStream getReplica(Session session, Identifier pid) 
1046
        throws NotAuthorized, NotImplemented, ServiceFailure, InvalidToken {
1047

    
1048
        logMetacat.info("MNodeService.getReplica() called.");
1049

    
1050
        // cannot be called by public
1051
        if (session == null) {
1052
        	throw new InvalidToken("2183", "No session was provided.");
1053
        }
1054
        
1055
        logMetacat.info("MNodeService.getReplica() called with parameters: \n" +
1056
             "\tSession.Subject      = " + session.getSubject().getValue() + "\n" +
1057
             "\tIdentifier           = " + pid.getValue());
1058

    
1059
        InputStream inputStream = null; // bytes to be returned
1060
        handler = new MetacatHandler(new Timer());
1061
        boolean allowed = false;
1062
        String localId; // the metacat docid for the pid
1063

    
1064
        // get the local docid from Metacat
1065
        try {
1066
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1067
        } catch (McdbDocNotFoundException e) {
1068
            throw new ServiceFailure("2181", "The object specified by " + 
1069
                    pid.getValue() + " does not exist at this node.");
1070
            
1071
        }
1072

    
1073
        Subject targetNodeSubject = session.getSubject();
1074

    
1075
        // check for authorization to replicate, null session to act as this source MN
1076
        try {
1077
            allowed = D1Client.getCN().isNodeAuthorized(null, targetNodeSubject, pid);
1078
        } catch (InvalidToken e1) {
1079
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1080
                + e1.getMessage());
1081
            
1082
        } catch (NotFound e1) {
1083
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1084
                    + e1.getMessage());
1085

    
1086
        } catch (InvalidRequest e1) {
1087
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1088
                    + e1.getMessage());
1089

    
1090
        }
1091

    
1092
        logMetacat.info("Called D1Client.isNodeAuthorized(). Allowed = " + allowed +
1093
            " for identifier " + pid.getValue());
1094

    
1095
        // if the person is authorized, perform the read
1096
        if (allowed) {
1097
            try {
1098
                inputStream = MetacatHandler.read(localId);
1099
            } catch (Exception e) {
1100
                throw new ServiceFailure("1020", "The object specified by " + 
1101
                    pid.getValue() + "could not be returned due to error: " + e.getMessage());
1102
            }
1103
        }
1104

    
1105
        // if we fail to set the input stream
1106
        if (inputStream == null) {
1107
            throw new ServiceFailure("2181", "The object specified by " + 
1108
                pid.getValue() + "does not exist at this node.");
1109
        }
1110

    
1111
        // log the replica event
1112
        String principal = null;
1113
        if (session.getSubject() != null) {
1114
            principal = session.getSubject().getValue();
1115
        }
1116
        EventLog.getInstance().log(request.getRemoteAddr(), 
1117
            request.getHeader("User-Agent"), principal, localId, "replicate");
1118

    
1119
        return inputStream;
1120
    }
1121

    
1122
    /**
1123
     * A method to notify the Member Node that the authoritative copy of 
1124
     * system metadata on the Coordinating Nodes has changed.
1125
     * 
1126
     * @param session   Session information that contains the identity of the 
1127
     *                  calling user as retrieved from the X.509 certificate 
1128
     *                  which must be traceable to the CILogon service.
1129
     * @param serialVersion   The serialVersion of the system metadata
1130
     * @param dateSysMetaLastModified  The time stamp for when the system metadata was changed
1131
     * @throws NotImplemented
1132
     * @throws ServiceFailure
1133
     * @throws NotAuthorized
1134
     * @throws InvalidRequest
1135
     * @throws InvalidToken
1136
     */
1137
    public boolean systemMetadataChanged(Session session, Identifier pid,
1138
        long serialVersion, Date dateSysMetaLastModified) 
1139
        throws NotImplemented, ServiceFailure, NotAuthorized, InvalidRequest,
1140
        InvalidToken {
1141
        
1142
        // cannot be called by public
1143
        if (session == null) {
1144
        	throw new InvalidToken("2183", "No session was provided.");
1145
        }
1146

    
1147
        SystemMetadata currentLocalSysMeta = null;
1148
        SystemMetadata newSysMeta = null;
1149
        CNode cn = D1Client.getCN();
1150
        NodeList nodeList = null;
1151
        Subject callingSubject = null;
1152
        boolean allowed = false;
1153
        
1154
        // are we allowed to call this?
1155
        callingSubject = session.getSubject();
1156
        nodeList = cn.listNodes();
1157
        
1158
        for(Node node : nodeList.getNodeList()) {
1159
            // must be a CN
1160
            if ( node.getType().equals(NodeType.CN)) {
1161
               List<Subject> subjectList = node.getSubjectList();
1162
               // the calling subject must be in the subject list
1163
               if ( subjectList.contains(callingSubject)) {
1164
                   allowed = true;
1165
                   
1166
               }
1167
               
1168
            }
1169
        }
1170
        
1171
        if (!allowed ) {
1172
            String msg = "The subject identified by " + callingSubject.getValue() +
1173
              " is not authorized to call this service.";
1174
            throw new NotAuthorized("1331", msg);
1175
            
1176
        }
1177
        
1178
        // compare what we have locally to what is sent in the change notification
1179
        try {
1180
            currentLocalSysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1181
             
1182
        } catch (RuntimeException e) {
1183
            String msg = "SystemMetadata for pid " + pid.getValue() +
1184
              " couldn't be updated because it couldn't be found locally: " +
1185
              e.getMessage();
1186
            logMetacat.error(msg);
1187
            ServiceFailure sf = new ServiceFailure("1333", msg);
1188
            sf.initCause(e);
1189
            throw sf; 
1190
        }
1191
        
1192
        if (currentLocalSysMeta.getSerialVersion().longValue() < serialVersion ) {
1193
            try {
1194
                newSysMeta = cn.getSystemMetadata(null, pid);
1195
            } catch (NotFound e) {
1196
                // huh? you just said you had it
1197
            	String msg = "On updating the local copy of system metadata " + 
1198
                "for pid " + pid.getValue() +", the CN reports it is not found." +
1199
                " The error message was: " + e.getMessage();
1200
                logMetacat.error(msg);
1201
                ServiceFailure sf = new ServiceFailure("1333", msg);
1202
                sf.initCause(e);
1203
                throw sf;
1204
            }
1205
            
1206
            // update the local copy of system metadata for the pid
1207
            try {
1208
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1209
                // submit for indexing
1210
                HazelcastService.getInstance().getIndexQueue().add(newSysMeta);
1211
                logMetacat.info("Updated local copy of system metadata for pid " +
1212
                    pid.getValue() + " after change notification from the CN.");
1213
                
1214
            } catch (RuntimeException e) {
1215
                String msg = "SystemMetadata for pid " + pid.getValue() +
1216
                  " couldn't be updated: " +
1217
                  e.getMessage();
1218
                logMetacat.error(msg);
1219
                ServiceFailure sf = new ServiceFailure("1333", msg);
1220
                sf.initCause(e);
1221
                throw sf;
1222
            }
1223
        }
1224
        
1225
        return true;
1226
        
1227
    }
1228
    
1229
    /*
1230
     * Set the replication status for the object on the Coordinating Node
1231
     * 
1232
     * @param session - the session for the this target node
1233
     * @param pid - the identifier of the object being updated
1234
     * @param nodeId - the identifier of this target node
1235
     * @param status - the replication status to set
1236
     * @param failure - the exception to include, if any
1237
     */
1238
    private void setReplicationStatus(Session session, Identifier pid, 
1239
        NodeReference nodeId, ReplicationStatus status, BaseException failure) 
1240
        throws ServiceFailure, NotImplemented, NotAuthorized, 
1241
        InvalidRequest {
1242
        
1243
        // call the CN as the MN to set the replication status
1244
        try {
1245
            this.cn = D1Client.getCN();
1246
            this.cn.setReplicationStatus(session, pid, nodeId,
1247
                    status, failure);
1248
            
1249
        } catch (InvalidToken e) {
1250
        	String msg = "Could not set the replication status for " + pid.getValue() + " on the CN (InvalidToken): " + e.getMessage();
1251
            logMetacat.error(msg);
1252
        	throw new ServiceFailure("2151",
1253
                    msg);
1254
            
1255
        } catch (NotFound e) {
1256
        	String msg = "Could not set the replication status for " + pid.getValue() + " on the CN (NotFound): " + e.getMessage();
1257
            logMetacat.error(msg);
1258
        	throw new ServiceFailure("2151",
1259
                    msg);
1260
            
1261
        }
1262
    }
1263

    
1264
	@Override
1265
	public Identifier generateIdentifier(Session session, String scheme, String fragment)
1266
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1267
			InvalidRequest {
1268
		
1269
		Identifier identifier = new Identifier();
1270
		
1271
		// handle different schemes
1272
		if (scheme.equalsIgnoreCase(UUID_SCHEME)) {
1273
			// UUID
1274
			UUID uuid = UUID.randomUUID();
1275
            identifier.setValue(UUID_PREFIX + uuid.toString());
1276
		} else if (scheme.equalsIgnoreCase(DOI_SCHEME)) {
1277
			// generate a DOI
1278
			try {
1279
				identifier = DOIService.getInstance().generateDOI();
1280
			} catch (EZIDException e) {
1281
				ServiceFailure sf = new ServiceFailure("2191", "Could not generate DOI: " + e.getMessage());
1282
				sf.initCause(e);
1283
				throw sf;
1284
			}
1285
		} else {
1286
			// default if we don't know the scheme
1287
			if (fragment != null) {
1288
				// for now, just autogen with fragment
1289
				String autogenId = DocumentUtil.generateDocumentId(fragment, 0);
1290
				identifier.setValue(autogenId);			
1291
			} else {
1292
				// autogen with no fragment
1293
				String autogenId = DocumentUtil.generateDocumentId(0);
1294
				identifier.setValue(autogenId);
1295
			}
1296
		}
1297
		
1298
		// TODO: reserve the identifier with the CN. We can only do this when
1299
		// 1) the MN is part of a CN cluster
1300
		// 2) the request is from an authenticated user
1301
		
1302
		return identifier;
1303
	}
1304

    
1305
	@Override
1306
	public boolean isAuthorized(Identifier pid, Permission permission)
1307
			throws ServiceFailure, InvalidRequest, InvalidToken, NotFound,
1308
			NotAuthorized, NotImplemented {
1309

    
1310
		return isAuthorized(null, pid, permission);
1311
	}
1312

    
1313
	@Override
1314
	public boolean systemMetadataChanged(Identifier pid, long serialVersion, Date dateSysMetaLastModified)
1315
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1316
			InvalidRequest {
1317

    
1318
		return systemMetadataChanged(null, pid, serialVersion, dateSysMetaLastModified);
1319
	}
1320

    
1321
	@Override
1322
	public Log getLogRecords(Date fromDate, Date toDate, Event event, String pidFilter,
1323
			Integer start, Integer count) throws InvalidRequest, InvalidToken,
1324
			NotAuthorized, NotImplemented, ServiceFailure {
1325

    
1326
		return getLogRecords(null, fromDate, toDate, event, pidFilter, start, count);
1327
	}
1328

    
1329
	@Override
1330
	public DescribeResponse describe(Identifier pid) throws InvalidToken,
1331
			NotAuthorized, NotImplemented, ServiceFailure, NotFound {
1332

    
1333
		return describe(null, pid);
1334
	}
1335

    
1336
	@Override
1337
	public InputStream get(Identifier pid) throws InvalidToken, NotAuthorized,
1338
			NotImplemented, ServiceFailure, NotFound, InsufficientResources {
1339

    
1340
		return get(null, pid);
1341
	}
1342

    
1343
	@Override
1344
	public Checksum getChecksum(Identifier pid, String algorithm)
1345
			throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1346
			ServiceFailure, NotFound {
1347

    
1348
		return getChecksum(null, pid, algorithm);
1349
	}
1350

    
1351
	@Override
1352
	public SystemMetadata getSystemMetadata(Identifier pid)
1353
			throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1354
			NotFound {
1355

    
1356
		return getSystemMetadata(null, pid);
1357
	}
1358

    
1359
	@Override
1360
	public ObjectList listObjects(Date startTime, Date endTime,
1361
			ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
1362
			Integer count) throws InvalidRequest, InvalidToken, NotAuthorized,
1363
			NotImplemented, ServiceFailure {
1364

    
1365
		return listObjects(null, startTime, endTime, objectFormatId, replicaStatus, start, count);
1366
	}
1367

    
1368
	@Override
1369
	public boolean synchronizationFailed(SynchronizationFailed syncFailed)
1370
			throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure {
1371

    
1372
		return synchronizationFailed(null, syncFailed);
1373
	}
1374

    
1375
	@Override
1376
	public InputStream getReplica(Identifier pid) throws InvalidToken,
1377
			NotAuthorized, NotImplemented, ServiceFailure, NotFound,
1378
			InsufficientResources {
1379

    
1380
		return getReplica(null, pid);
1381
	}
1382

    
1383
	@Override
1384
	public boolean replicate(SystemMetadata sysmeta, NodeReference sourceNode)
1385
			throws NotImplemented, ServiceFailure, NotAuthorized,
1386
			InvalidRequest, InvalidToken, InsufficientResources,
1387
			UnsupportedType {
1388

    
1389
		return replicate(null, sysmeta, sourceNode);
1390
	}
1391

    
1392
	@Override
1393
	public Identifier create(Identifier pid, InputStream object,
1394
			SystemMetadata sysmeta) throws IdentifierNotUnique,
1395
			InsufficientResources, InvalidRequest, InvalidSystemMetadata,
1396
			InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1397
			UnsupportedType {
1398

    
1399
		return create(null, pid, object, sysmeta);
1400
	}
1401

    
1402
	@Override
1403
	public Identifier delete(Identifier pid) throws InvalidToken,
1404
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1405

    
1406
		return delete(null, pid);
1407
	}
1408

    
1409
	@Override
1410
	public Identifier generateIdentifier(String scheme, String fragment)
1411
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1412
			InvalidRequest {
1413

    
1414
		return generateIdentifier(null, scheme, fragment);
1415
	}
1416

    
1417
	@Override
1418
	public Identifier update(Identifier pid, InputStream object,
1419
			Identifier newPid, SystemMetadata sysmeta) throws IdentifierNotUnique,
1420
			InsufficientResources, InvalidRequest, InvalidSystemMetadata,
1421
			InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1422
			UnsupportedType, NotFound {
1423

    
1424
		return update(null, pid, object, newPid, sysmeta);
1425
	}
1426

    
1427
	@Override
1428
	public QueryEngineDescription getQueryEngineDescription(String engine)
1429
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1430
			NotFound {
1431
	    if(engine != null && engine.equals(EnabledQueryEngines.PATHQUERYENGINE)) {
1432
	        QueryEngineDescription qed = new QueryEngineDescription();
1433
	        qed.setName(EnabledQueryEngines.PATHQUERYENGINE);
1434
	        qed.setQueryEngineVersion("1.0");
1435
	        qed.addAdditionalInfo("This is the traditional structured query for Metacat");
1436
	        Vector<String> pathsForIndexing = null;
1437
	        try {
1438
	            pathsForIndexing = SystemUtil.getPathsForIndexing();
1439
	        } catch (MetacatUtilException e) {
1440
	            logMetacat.warn("Could not get index paths", e);
1441
	        }
1442
	        for (String fieldName: pathsForIndexing) {
1443
	            QueryField field = new QueryField();
1444
	            field.addDescription("Indexed field for path '" + fieldName + "'");
1445
	            field.setName(fieldName);
1446
	            field.setReturnable(true);
1447
	            field.setSearchable(true);
1448
	            field.setSortable(false);
1449
	            // TODO: determine type and multivaluedness
1450
	            field.setType(String.class.getName());
1451
	            //field.setMultivalued(true);
1452
	            qed.addQueryField(field);
1453
	        }
1454
	        return qed;
1455
	    } else if (engine != null && engine.equals(EnabledQueryEngines.SOLRENGINE)) {
1456
	        if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.SOLRENGINE)) {
1457
                throw new NotImplemented("0000", "MNodeService.getQueryEngineDescription - the query engine "+engine +" hasn't been implemented or has been disabled.");
1458
            }
1459
	        try {
1460
	            QueryEngineDescription qed = MetacatSolrEngineDescriptionHandler.getInstance().getQueryEngineDescritpion();
1461
	            return qed;
1462
	        } catch (Exception e) {
1463
	            e.printStackTrace();
1464
	            throw new ServiceFailure("Solr server error", e.getMessage());
1465
	        }
1466
	    } else {
1467
	        throw new NotFound("404", "The Metacat member node can't find the query engine - "+engine);
1468
	    }
1469
		
1470
	}
1471

    
1472
	@Override
1473
	public QueryEngineList listQueryEngines() throws InvalidToken,
1474
			ServiceFailure, NotAuthorized, NotImplemented {
1475
		QueryEngineList qel = new QueryEngineList();
1476
		//qel.addQueryEngine(EnabledQueryEngines.PATHQUERYENGINE);
1477
		//qel.addQueryEngine(EnabledQueryEngines.SOLRENGINE);
1478
		List<String> enables = EnabledQueryEngines.getInstance().getEnabled();
1479
		for(String name : enables) {
1480
		    qel.addQueryEngine(name);
1481
		}
1482
		return qel;
1483
	}
1484

    
1485
	@Override
1486
	public InputStream query(String engine, String query) throws InvalidToken,
1487
			ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented,
1488
			NotFound {
1489
	    String user = Constants.SUBJECT_PUBLIC;
1490
        String[] groups= null;
1491
        Set<Subject> subjects = null;
1492
        if (session != null) {
1493
            user = session.getSubject().getValue();
1494
            subjects = AuthUtils.authorizedClientSubjects(session);
1495
            if (subjects != null) {
1496
                List<String> groupList = new ArrayList<String>();
1497
                for (Subject subject: subjects) {
1498
                    groupList.add(subject.getValue());
1499
                }
1500
                groups = groupList.toArray(new String[0]);
1501
            }
1502
        } else {
1503
            //add the public user subject to the set 
1504
            Subject subject = new Subject();
1505
            subject.setValue(Constants.SUBJECT_PUBLIC);
1506
            subjects = new HashSet<Subject>();
1507
            subjects.add(subject);
1508
        }
1509
        //System.out.println("====== user is "+user);
1510
        //System.out.println("====== groups are "+groups);
1511
		if (engine != null && engine.equals(EnabledQueryEngines.PATHQUERYENGINE)) {
1512
			try {
1513
				DBQuery queryobj = new DBQuery();
1514
				
1515
				String results = queryobj.performPathquery(query, user, groups);
1516
				ContentTypeByteArrayInputStream ctbais = new ContentTypeByteArrayInputStream(results.getBytes(MetaCatServlet.DEFAULT_ENCODING));
1517
				ctbais.setContentType("text/xml");
1518
				return ctbais;
1519

    
1520
			} catch (Exception e) {
1521
				throw new ServiceFailure("Pathquery error", e.getMessage());
1522
			}
1523
			
1524
		} else if (engine != null && engine.equals(EnabledQueryEngines.SOLRENGINE)) {
1525
		    if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.SOLRENGINE)) {
1526
		        throw new NotImplemented("0000", "MNodeService.query - the query engine "+engine +" hasn't been implemented or has been disabled.");
1527
		    }
1528
		    logMetacat.info("The query is ==================================== \n"+query);
1529
		    try {
1530
		        
1531
                return MetacatSolrIndex.getInstance().query(query, subjects);
1532
            } catch (Exception e) {
1533
                // TODO Auto-generated catch block
1534
                throw new ServiceFailure("Solr server error", e.getMessage());
1535
            } 
1536
		}
1537
		return null;
1538
	}
1539
	
1540
	/**
1541
	 * Given an existing Science Metadata PID, this method mints a DOI
1542
	 * and updates the original object "publishing" the update with the DOI.
1543
	 * This includes updating the ORE map that describes the Science Metadata+data.
1544
	 * TODO: ensure all referenced objects allow public read
1545
	 * 
1546
	 * @see https://projects.ecoinformatics.org/ecoinfo/issues/6014
1547
	 * 
1548
	 * @param originalIdentifier
1549
	 * @param request
1550
	 * @throws InvalidRequest
1551
	 * @throws EZIDException
1552
	 * @throws InvalidToken
1553
	 * @throws NotAuthorized
1554
	 * @throws NotImplemented
1555
	 * @throws ServiceFailure
1556
	 * @throws NotFound
1557
	 * @throws InsufficientResources
1558
	 * @throws IdentifierNotUnique
1559
	 * @throws InvalidSystemMetadata
1560
	 * @throws UnsupportedType
1561
	 * @throws McdbDocNotFoundException 
1562
	 * @throws OREParserException 
1563
	 * @throws URISyntaxException 
1564
	 * @throws OREException 
1565
	 * @throws UnsupportedEncodingException 
1566
	 * @throws ORESerialiserException 
1567
	 * @throws NoSuchAlgorithmException 
1568
	 */
1569
	public Identifier publish(Session session, Identifier originalIdentifier) throws InvalidRequest, EZIDException, InvalidToken, NotAuthorized, NotImplemented, ServiceFailure, NotFound, InsufficientResources, IdentifierNotUnique, InvalidSystemMetadata, UnsupportedType, McdbDocNotFoundException, UnsupportedEncodingException, OREException, URISyntaxException, OREParserException, ORESerialiserException, NoSuchAlgorithmException {
1570
		
1571
		// mint a DOI for the new revision
1572
		Identifier newIdentifier = this.generateIdentifier(session, MNodeService.DOI_SCHEME, null);
1573
		
1574
		// get the original SM and update the values
1575
		SystemMetadata sysmeta = this.getSystemMetadata(session, originalIdentifier);
1576
		sysmeta.setIdentifier(newIdentifier);
1577
		sysmeta.setObsoletes(originalIdentifier);
1578
		sysmeta.setObsoletedBy(null);
1579
		
1580
		// get the bytes
1581
		InputStream inputStream = this.get(session, originalIdentifier);
1582
		
1583
		// update the object
1584
		this.update(session, originalIdentifier, inputStream , newIdentifier, sysmeta);
1585
		
1586
		// TODO: update ORE that references the scimeta
1587
		String localId = IdentifierManager.getInstance().getLocalId(originalIdentifier.getValue());
1588
		Identifier potentialOreIdentifier = new Identifier();
1589
		potentialOreIdentifier.setValue(SystemMetadataFactory.RESOURCE_MAP_PREFIX + localId);
1590
		
1591
		InputStream oreInputStream = null;
1592
		try {
1593
			oreInputStream = this.get(session, potentialOreIdentifier);
1594
		} catch (NotFound nf) {
1595
			// this is probably okay for many sci meta data docs
1596
			logMetacat.warn("No potential ORE map found for: " + potentialOreIdentifier.getValue());
1597
		}
1598
		if (oreInputStream != null) {
1599
			Identifier newOreIdentifier = MNodeService.getInstance(request).generateIdentifier(session, MNodeService.UUID_SCHEME, null);
1600

    
1601
			Map<Identifier, Map<Identifier, List<Identifier>>> resourceMapStructure = ResourceMapFactory.getInstance().parseResourceMap(oreInputStream);
1602
			Map<Identifier, List<Identifier>> sciMetaMap = resourceMapStructure.get(potentialOreIdentifier);
1603
			List<Identifier> dataIdentifiers = sciMetaMap.get(originalIdentifier);
1604
			
1605
			// TODO: ensure all data package objects allow public read
1606

    
1607
			// reconstruct the ORE with the new identifiers
1608
			sciMetaMap.remove(originalIdentifier);
1609
			sciMetaMap.put(newIdentifier, dataIdentifiers);
1610
			
1611
			ResourceMap resourceMap = ResourceMapFactory.getInstance().createResourceMap(newOreIdentifier, sciMetaMap);
1612
			String resourceMapString = ResourceMapFactory.getInstance().serializeResourceMap(resourceMap);
1613
			
1614
			// get the original ORE SM and update the values
1615
			SystemMetadata oreSysMeta = this.getSystemMetadata(session, potentialOreIdentifier);
1616
			oreSysMeta.setIdentifier(newOreIdentifier);
1617
			oreSysMeta.setObsoletes(potentialOreIdentifier);
1618
			oreSysMeta.setObsoletedBy(null);
1619
			oreSysMeta.setSize(BigInteger.valueOf(resourceMapString.getBytes("UTF-8").length));
1620
			oreSysMeta.setChecksum(ChecksumUtil.checksum(resourceMapString.getBytes("UTF-8"), oreSysMeta.getChecksum().getAlgorithm()));
1621
			
1622
			// save the updated ORE
1623
			this.update(
1624
					session, 
1625
					potentialOreIdentifier, 
1626
					new ByteArrayInputStream(resourceMapString.getBytes("UTF-8")), 
1627
					newOreIdentifier, 
1628
					oreSysMeta);
1629
			
1630
		}
1631
		
1632
		return newIdentifier;
1633
	}
1634
	
1635
	/**
1636
	 * Packages the given package in a Bagit collection for download
1637
	 * @param pid
1638
	 * @throws IOException 
1639
	 * @throws NotImplemented 
1640
	 * @throws NotFound 
1641
	 * @throws NotAuthorized 
1642
	 * @throws ServiceFailure 
1643
	 * @throws InvalidToken 
1644
	 * @throws OREParserException 
1645
	 * @throws URISyntaxException 
1646
	 * @throws OREException 
1647
	 */
1648
	public InputStream getPackage(Session session, Identifier pid) throws IOException, InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented, OREException, URISyntaxException, OREParserException {
1649
		InputStream bagInputStream = null;
1650
		BagFactory bagFactory = new BagFactory();
1651
		Bag bag = bagFactory.createBag();
1652
		
1653
		// track the temp files we use so we can delete them when finished
1654
		List<File> tempFiles = new ArrayList<File>();
1655
		
1656
		// the pids to include in the package
1657
		List<Identifier> packagePids = new ArrayList<Identifier>();
1658
		
1659
		// find the package contents
1660
		SystemMetadata sysMeta = this.getSystemMetadata(session, pid);
1661
		if (ObjectFormatService.getInstance().getFormat(sysMeta.getFormatId()).getFormatType().equals("RESOURCE")) {
1662
			InputStream oreInputStream = this.get(session, pid);
1663
			Map<Identifier, Map<Identifier, List<Identifier>>> resourceMapStructure = ResourceMapFactory.getInstance().parseResourceMap(oreInputStream);
1664
			packagePids.addAll(resourceMapStructure.keySet());
1665
			for (Map<Identifier, List<Identifier>> entries: resourceMapStructure.values()) {
1666
				packagePids.addAll(entries.keySet());
1667
				for (List<Identifier> dataPids: entries.values()) {
1668
					packagePids.addAll(dataPids);
1669
				}
1670
			}
1671
		} else {
1672
			// just the lone pid in this package
1673
			packagePids.add(pid);
1674
		}
1675

    
1676
		// loop through the package contents
1677
		for (Identifier entryPid: packagePids) {
1678
			SystemMetadata entrySysMeta = this.getSystemMetadata(session, entryPid);
1679
			String extension = ObjectFormatInfo.instance().getExtension(entrySysMeta.getFormatId().getValue());
1680
	        String prefix = entryPid.getValue();
1681
			File tempFile = File.createTempFile(prefix + "_bagit", extension);
1682
			tempFiles.add(tempFile);
1683
			InputStream entryInputStream = this.get(session, entryPid);
1684
			IOUtils.copy(entryInputStream, new FileOutputStream(tempFile));
1685
			bag.addFileToPayload(tempFile);
1686
		}
1687
		
1688
		bag = bag.makeComplete();
1689
		File bagFile = File.createTempFile("bagit", ".zip");
1690
		// TODO: delete more confidently
1691
		bagFile.deleteOnExit();
1692
		bag.setFile(bagFile);
1693
		Writer zipWriter = new ZipWriter(bagFactory);
1694
		bag.write(zipWriter, bagFile);
1695
		bagFile = bag.getFile();
1696
		bagInputStream = new FileInputStream(bagFile);
1697
		
1698
		// clean up temp entry files
1699
		for (File tf: tempFiles) {
1700
			tf.delete();
1701
		}
1702
		
1703
		return bagInputStream;
1704

    
1705
	}
1706
    
1707
}
(4-4/6)