Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *  Copyright: 2000-2011 Regents of the University of California and the
4
 *              National Center for Ecological Analysis and Synthesis
5
 *
6
 *   '$Author:  $'
7
 *     '$Date:  $'
8
 *
9
 * This program is free software; you can redistribute it and/or modify
10
 * it under the terms of the GNU General Public License as published by
11
 * the Free Software Foundation; either version 2 of the License, or
12
 * (at your option) any later version.
13
 *
14
 * This program is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 * GNU General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU General Public License
20
 * along with this program; if not, write to the Free Software
21
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22
 */
23

    
24
package edu.ucsb.nceas.metacat.dataone;
25

    
26
import java.io.ByteArrayInputStream;
27
import java.io.IOException;
28
import java.io.InputStream;
29
import java.math.BigInteger;
30
import java.security.NoSuchAlgorithmException;
31
import java.util.ArrayList;
32
import java.util.Calendar;
33
import java.util.Date;
34
import java.util.HashSet;
35
import java.util.List;
36
import java.util.Set;
37
import java.util.Timer;
38
import java.util.UUID;
39
import java.util.Vector;
40

    
41
import javax.servlet.http.HttpServletRequest;
42

    
43
import org.apache.commons.io.IOUtils;
44
import org.apache.log4j.Logger;
45
import org.apache.solr.client.solrj.SolrServerException;
46
import org.dataone.client.CNode;
47
import org.dataone.client.D1Client;
48
import org.dataone.client.MNode;
49
import org.dataone.client.auth.CertificateManager;
50
import org.dataone.configuration.Settings;
51
import org.dataone.service.exceptions.BaseException;
52
import org.dataone.service.exceptions.IdentifierNotUnique;
53
import org.dataone.service.exceptions.InsufficientResources;
54
import org.dataone.service.exceptions.InvalidRequest;
55
import org.dataone.service.exceptions.InvalidSystemMetadata;
56
import org.dataone.service.exceptions.InvalidToken;
57
import org.dataone.service.exceptions.NotAuthorized;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.NotImplemented;
60
import org.dataone.service.exceptions.ServiceFailure;
61
import org.dataone.service.exceptions.SynchronizationFailed;
62
import org.dataone.service.exceptions.UnsupportedType;
63
import org.dataone.service.mn.tier1.v1.MNCore;
64
import org.dataone.service.mn.tier1.v1.MNRead;
65
import org.dataone.service.mn.tier2.v1.MNAuthorization;
66
import org.dataone.service.mn.tier3.v1.MNStorage;
67
import org.dataone.service.mn.tier4.v1.MNReplication;
68
import org.dataone.service.mn.v1.MNQuery;
69
import org.dataone.service.types.v1.Checksum;
70
import org.dataone.service.types.v1.DescribeResponse;
71
import org.dataone.service.types.v1.Event;
72
import org.dataone.service.types.v1.Identifier;
73
import org.dataone.service.types.v1.Log;
74
import org.dataone.service.types.v1.LogEntry;
75
import org.dataone.service.types.v1.MonitorInfo;
76
import org.dataone.service.types.v1.MonitorList;
77
import org.dataone.service.types.v1.Node;
78
import org.dataone.service.types.v1.NodeList;
79
import org.dataone.service.types.v1.NodeReference;
80
import org.dataone.service.types.v1.NodeState;
81
import org.dataone.service.types.v1.NodeType;
82
import org.dataone.service.types.v1.ObjectFormatIdentifier;
83
import org.dataone.service.types.v1.ObjectList;
84
import org.dataone.service.types.v1.Permission;
85
import org.dataone.service.types.v1.Ping;
86
import org.dataone.service.types.v1.ReplicationStatus;
87
import org.dataone.service.types.v1.Schedule;
88
import org.dataone.service.types.v1.Service;
89
import org.dataone.service.types.v1.Services;
90
import org.dataone.service.types.v1.Session;
91
import org.dataone.service.types.v1.Subject;
92
import org.dataone.service.types.v1.Synchronization;
93
import org.dataone.service.types.v1.SystemMetadata;
94
import org.dataone.service.types.v1.util.AuthUtils;
95
import org.dataone.service.types.v1.util.ChecksumUtil;
96
import org.dataone.service.types.v1_1.QueryEngineDescription;
97
import org.dataone.service.types.v1_1.QueryEngineList;
98
import org.dataone.service.types.v1_1.QueryField;
99
import org.dataone.service.util.Constants;
100

    
101
import edu.ucsb.nceas.ezid.EZIDException;
102
import edu.ucsb.nceas.metacat.DBQuery;
103
import edu.ucsb.nceas.metacat.EventLog;
104
import edu.ucsb.nceas.metacat.IdentifierManager;
105
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
106
import edu.ucsb.nceas.metacat.MetaCatServlet;
107
import edu.ucsb.nceas.metacat.MetacatHandler;
108
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
109
import edu.ucsb.nceas.metacat.index.MetacatSolrEngineDescriptionHandler;
110
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
111
import edu.ucsb.nceas.metacat.properties.PropertyService;
112
import edu.ucsb.nceas.metacat.shared.MetacatUtilException;
113
import edu.ucsb.nceas.metacat.util.DocumentUtil;
114
import edu.ucsb.nceas.metacat.util.SystemUtil;
115
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
116

    
117
/**
118
 * Represents Metacat's implementation of the DataONE Member Node 
119
 * service API. Methods implement the various MN* interfaces, and methods common
120
 * to both Member Node and Coordinating Node interfaces are found in the
121
 * D1NodeService base class.
122
 * 
123
 * Implements:
124
 * MNCore.ping()
125
 * MNCore.getLogRecords()
126
 * MNCore.getObjectStatistics()
127
 * MNCore.getOperationStatistics()
128
 * MNCore.getStatus()
129
 * MNCore.getCapabilities()
130
 * MNRead.get()
131
 * MNRead.getSystemMetadata()
132
 * MNRead.describe()
133
 * MNRead.getChecksum()
134
 * MNRead.listObjects()
135
 * MNRead.synchronizationFailed()
136
 * MNAuthorization.isAuthorized()
137
 * MNAuthorization.setAccessPolicy()
138
 * MNStorage.create()
139
 * MNStorage.update()
140
 * MNStorage.delete()
141
 * MNReplication.replicate()
142
 * 
143
 */
144
public class MNodeService extends D1NodeService 
145
    implements MNAuthorization, MNCore, MNRead, MNReplication, MNStorage, MNQuery {
146

    
147
    private static final String PATHQUERY = "pathquery";
148
	private static final String UUID_SCHEME = "UUID";
149
	private static final String DOI_SCHEME = "DOI";
150
	private static final String UUID_PREFIX = "urn:uuid:";
151

    
152
	/* the logger instance */
153
    private Logger logMetacat = null;
154
    
155
    /* A reference to a remote Memeber Node */
156
    private MNode mn;
157
    
158
    /* A reference to a Coordinating Node */
159
    private CNode cn;
160

    
161

    
162
    /**
163
     * Singleton accessor to get an instance of MNodeService.
164
     * 
165
     * @return instance - the instance of MNodeService
166
     */
167
    public static MNodeService getInstance(HttpServletRequest request) {
168
        return new MNodeService(request);
169
    }
170

    
171
    /**
172
     * Constructor, private for singleton access
173
     */
174
    private MNodeService(HttpServletRequest request) {
175
        super(request);
176
        logMetacat = Logger.getLogger(MNodeService.class);
177
        
178
        // set the Member Node certificate file location
179
        CertificateManager.getInstance().setCertificateLocation(Settings.getConfiguration().getString("D1Client.certificate.file"));
180
    }
181

    
182
    /**
183
     * Deletes an object from the Member Node, where the object is either a 
184
     * data object or a science metadata object.
185
     * 
186
     * @param session - the Session object containing the credentials for the Subject
187
     * @param pid - The object identifier to be deleted
188
     * 
189
     * @return pid - the identifier of the object used for the deletion
190
     * 
191
     * @throws InvalidToken
192
     * @throws ServiceFailure
193
     * @throws NotAuthorized
194
     * @throws NotFound
195
     * @throws NotImplemented
196
     * @throws InvalidRequest
197
     */
198
    @Override
199
    public Identifier delete(Session session, Identifier pid) 
200
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
201

    
202
    	// only admin of  the MN or the CN is allowed a full delete
203
        boolean allowed = false;
204
        allowed = isAdminAuthorized(session);
205
        if (!allowed) { 
206
            throw new NotAuthorized("1320", "The provided identity does not have " + "permission to delete objects on the Node.");
207
        }
208
    	
209
    	// defer to superclass implementation
210
        return super.delete(session, pid);
211
    }
212

    
213
    /**
214
     * Updates an existing object by creating a new object identified by 
215
     * newPid on the Member Node which explicitly obsoletes the object 
216
     * identified by pid through appropriate changes to the SystemMetadata 
217
     * of pid and newPid
218
     * 
219
     * @param session - the Session object containing the credentials for the Subject
220
     * @param pid - The identifier of the object to be updated
221
     * @param object - the new object bytes
222
     * @param sysmeta - the new system metadata describing the object
223
     * 
224
     * @return newPid - the identifier of the new object
225
     * 
226
     * @throws InvalidToken
227
     * @throws ServiceFailure
228
     * @throws NotAuthorized
229
     * @throws NotFound
230
     * @throws NotImplemented
231
     * @throws IdentifierNotUnique
232
     * @throws UnsupportedType
233
     * @throws InsufficientResources
234
     * @throws InvalidSystemMetadata
235
     * @throws InvalidRequest
236
     */
237
    @Override
238
    public Identifier update(Session session, Identifier pid, InputStream object, 
239
        Identifier newPid, SystemMetadata sysmeta) 
240
        throws InvalidToken, ServiceFailure, NotAuthorized, IdentifierNotUnique, 
241
        UnsupportedType, InsufficientResources, NotFound, 
242
        InvalidSystemMetadata, NotImplemented, InvalidRequest {
243

    
244
        String localId = null;
245
        boolean allowed = false;
246
        boolean isScienceMetadata = false;
247
        
248
        if (session == null) {
249
        	throw new InvalidToken("1210", "No session has been provided");
250
        }
251
        Subject subject = session.getSubject();
252

    
253
        // verify the pid is valid format
254
        if (!isValidIdentifier(pid)) {
255
        	throw new InvalidRequest("1202", "The provided identifier is invalid.");
256
        }
257

    
258
        // check for the existing identifier
259
        try {
260
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
261
            
262
        } catch (McdbDocNotFoundException e) {
263
            throw new InvalidRequest("1202", "The object with the provided " + 
264
                "identifier was not found.");
265
            
266
        }
267
        
268
        // set the originating node
269
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
270
        sysmeta.setOriginMemberNode(originMemberNode);
271
        
272
        // set the submitter to match the certificate
273
        sysmeta.setSubmitter(subject);
274
        // set the dates
275
        Date now = Calendar.getInstance().getTime();
276
        sysmeta.setDateSysMetadataModified(now);
277
        sysmeta.setDateUploaded(now);
278
        
279
        // make sure serial version is set to something
280
        BigInteger serialVersion = sysmeta.getSerialVersion();
281
        if (serialVersion == null) {
282
        	sysmeta.setSerialVersion(BigInteger.ZERO);
283
        }
284

    
285
        // does the subject have WRITE ( == update) priveleges on the pid?
286
        allowed = isAuthorized(session, pid, Permission.WRITE);
287

    
288
        if (allowed) {
289
        	
290
        	// check quality of SM
291
        	if (sysmeta.getObsoletedBy() != null) {
292
        		throw new InvalidSystemMetadata("1300", "Cannot include obsoletedBy when updating object");
293
        	}
294
        	if (sysmeta.getObsoletes() != null && !sysmeta.getObsoletes().getValue().equals(pid.getValue())) {
295
        		throw new InvalidSystemMetadata("1300", "The identifier provided in obsoletes does not match old Identifier");
296
        	}
297

    
298
            // get the existing system metadata for the object
299
            SystemMetadata existingSysMeta = getSystemMetadata(session, pid);
300

    
301
            // check for previous update
302
            // see: https://redmine.dataone.org/issues/3336
303
            Identifier existingObsoletedBy = existingSysMeta.getObsoletedBy();
304
            if (existingObsoletedBy != null) {
305
            	throw new InvalidRequest("1202", 
306
            			"The previous identifier has already been made obsolete by: " + existingObsoletedBy.getValue());
307
            }
308
            
309
            // add the newPid to the obsoletedBy list for the existing sysmeta
310
            existingSysMeta.setObsoletedBy(newPid);
311

    
312
            // then update the existing system metadata
313
            updateSystemMetadata(existingSysMeta);
314

    
315
            // prep the new system metadata, add pid to the affected lists
316
            sysmeta.setObsoletes(pid);
317
            //sysmeta.addDerivedFrom(pid);
318

    
319
            isScienceMetadata = isScienceMetadata(sysmeta);
320

    
321
            // do we have XML metadata or a data object?
322
            if (isScienceMetadata) {
323

    
324
                // update the science metadata XML document
325
                // TODO: handle non-XML metadata/data documents (like netCDF)
326
                // TODO: don't put objects into memory using stream to string
327
                String objectAsXML = "";
328
                try {
329
                    objectAsXML = IOUtils.toString(object, "UTF-8");
330
                    // give the old pid so we can calculate the new local id 
331
                    localId = insertOrUpdateDocument(objectAsXML, pid, session, "update");
332
                    // register the newPid and the generated localId
333
                    if (newPid != null) {
334
                        IdentifierManager.getInstance().createMapping(newPid.getValue(), localId);
335

    
336
                    }
337

    
338
                } catch (IOException e) {
339
                    String msg = "The Node is unable to create the object. " + "There was a problem converting the object to XML";
340
                    logMetacat.info(msg);
341
                    throw new ServiceFailure("1310", msg + ": " + e.getMessage());
342

    
343
                }
344

    
345
            } else {
346

    
347
                // update the data object
348
                localId = insertDataObject(object, newPid, session);
349

    
350
            }
351

    
352
            // and insert the new system metadata
353
            insertSystemMetadata(sysmeta);
354

    
355
            // log the update event
356
            EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), subject.getValue(), localId, Event.UPDATE.toString());
357
            
358
            // attempt to register the identifier - it checks if it is a doi
359
            try {
360
    			DOIService.getInstance().registerDOI(sysmeta);
361
    		} catch (EZIDException e) {
362
                throw new ServiceFailure("1190", "Could not register DOI: " + e.getMessage());
363
    		}
364

    
365
        } else {
366
            throw new NotAuthorized("1200", "The provided identity does not have " + "permission to UPDATE the object identified by " + pid.getValue()
367
                    + " on the Member Node.");
368
        }
369

    
370
        return newPid;
371
    }
372

    
373
    public Identifier create(Session session, Identifier pid, InputStream object, SystemMetadata sysmeta) throws InvalidToken, ServiceFailure, NotAuthorized,
374
            IdentifierNotUnique, UnsupportedType, InsufficientResources, InvalidSystemMetadata, NotImplemented, InvalidRequest {
375

    
376
        // check for null session
377
        if (session == null) {
378
          throw new InvalidToken("1110", "Session is required to WRITE to the Node.");
379
        }
380
        // set the submitter to match the certificate
381
        sysmeta.setSubmitter(session.getSubject());
382
        // set the originating node
383
        NodeReference originMemberNode = this.getCapabilities().getIdentifier();
384
        sysmeta.setOriginMemberNode(originMemberNode);
385
        sysmeta.setArchived(false);
386

    
387
        // set the dates
388
        Date now = Calendar.getInstance().getTime();
389
        sysmeta.setDateSysMetadataModified(now);
390
        sysmeta.setDateUploaded(now);
391
        
392
        // set the serial version
393
        sysmeta.setSerialVersion(BigInteger.ZERO);
394

    
395
        // check that we are not attempting to subvert versioning
396
        if (sysmeta.getObsoletes() != null && sysmeta.getObsoletes().getValue() != null) {
397
            throw new InvalidSystemMetadata("1180", 
398
              "The supplied system metadata is invalid. " +
399
              "The obsoletes field cannot have a value when creating entries.");
400
        }
401
        
402
        if (sysmeta.getObsoletedBy() != null && sysmeta.getObsoletedBy().getValue() != null) {
403
            throw new InvalidSystemMetadata("1180", 
404
              "The supplied system metadata is invalid. " +
405
              "The obsoletedBy field cannot have a value when creating entries.");
406
        }
407

    
408
        // call the shared impl
409
        Identifier resultPid = super.create(session, pid, object, sysmeta);
410
        
411
        // attempt to register the identifier - it checks if it is a doi
412
        try {
413
			DOIService.getInstance().registerDOI(sysmeta);
414
		} catch (EZIDException e) {
415
			ServiceFailure sf = new ServiceFailure("1190", "Could not register DOI: " + e.getMessage());
416
			sf.initCause(e);
417
            throw sf;
418
		}
419
        
420
        // return 
421
		return resultPid ;
422
    }
423

    
424
    /**
425
     * Called by a Coordinating Node to request that the Member Node create a 
426
     * copy of the specified object by retrieving it from another Member 
427
     * Node and storing it locally so that it can be made accessible to 
428
     * the DataONE system.
429
     * 
430
     * @param session - the Session object containing the credentials for the Subject
431
     * @param sysmeta - Copy of the CN held system metadata for the object
432
     * @param sourceNode - A reference to node from which the content should be 
433
     *                     retrieved. The reference should be resolved by 
434
     *                     checking the CN node registry.
435
     * 
436
     * @return true if the replication succeeds
437
     * 
438
     * @throws ServiceFailure
439
     * @throws NotAuthorized
440
     * @throws NotImplemented
441
     * @throws UnsupportedType
442
     * @throws InsufficientResources
443
     * @throws InvalidRequest
444
     */
445
    @Override
446
    public boolean replicate(Session session, SystemMetadata sysmeta,
447
            NodeReference sourceNode) throws NotImplemented, ServiceFailure,
448
            NotAuthorized, InvalidRequest, InsufficientResources,
449
            UnsupportedType {
450

    
451
        if (session != null && sysmeta != null && sourceNode != null) {
452
            logMetacat.info("MNodeService.replicate() called with parameters: \n" +
453
                            "\tSession.Subject      = "                           +
454
                            session.getSubject().getValue() + "\n"                +
455
                            "\tidentifier           = "                           + 
456
                            sysmeta.getIdentifier().getValue()                    +
457
                            "\n" + "\tSource NodeReference ="                     +
458
                            sourceNode.getValue());
459
        }
460
        boolean result = false;
461
        String nodeIdStr = null;
462
        NodeReference nodeId = null;
463

    
464
        // get the referenced object
465
        Identifier pid = sysmeta.getIdentifier();
466

    
467
        // get from the membernode
468
        // TODO: switch credentials for the server retrieval?
469
        this.mn = D1Client.getMN(sourceNode);
470
        this.cn = D1Client.getCN();
471
        InputStream object = null;
472
        Session thisNodeSession = null;
473
        SystemMetadata localSystemMetadata = null;
474
        BaseException failure = null;
475
        String localId = null;
476
        
477
        // TODO: check credentials
478
        // cannot be called by public
479
        if (session == null || session.getSubject() == null) {
480
            String msg = "No session was provided to replicate identifier " +
481
            sysmeta.getIdentifier().getValue();
482
            logMetacat.info(msg);
483
            throw new NotAuthorized("2152", msg);
484
            
485
        }
486

    
487

    
488
        // get the local node id
489
        try {
490
            nodeIdStr = PropertyService.getProperty("dataone.nodeId");
491
            nodeId = new NodeReference();
492
            nodeId.setValue(nodeIdStr);
493

    
494
        } catch (PropertyNotFoundException e1) {
495
            String msg = "Couldn't get dataone.nodeId property: " + e1.getMessage();
496
            failure = new ServiceFailure("2151", msg);
497
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
498
            logMetacat.error(msg);
499
            return true;
500

    
501
        }
502
        
503

    
504
        try {
505
            // do we already have a replica?
506
            try {
507
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
508
                // if we have a local id, get the local object
509
                try {
510
                    object = MetacatHandler.read(localId);
511
                } catch (Exception e) {
512
                	// NOTE: we may already know about this ID because it could be a data file described by a metadata file
513
                	// https://redmine.dataone.org/issues/2572
514
                	// TODO: fix this so that we don't prevent ourselves from getting replicas
515
                	
516
                    // let the CN know that the replication failed
517
                	logMetacat.warn("Object content not found on this node despite having localId: " + localId);
518
                	String msg = "Can't read the object bytes properly, replica is invalid.";
519
                    ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
520
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
521
                    logMetacat.warn(msg);
522
                    throw serviceFailure;
523
                    
524
                }
525

    
526
            } catch (McdbDocNotFoundException e) {
527
                logMetacat.info("No replica found. Continuing.");
528
                
529
            }
530
            
531
            // no local replica, get a replica
532
            if ( object == null ) {
533
                // session should be null to use the default certificate
534
                // location set in the Certificate manager
535
                object = mn.getReplica(thisNodeSession, pid);
536
                logMetacat.info("MNodeService.getReplica() called for identifier "
537
                                + pid.getValue());
538

    
539
            }
540

    
541
        } catch (InvalidToken e) {            
542
            String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
543
            failure = new ServiceFailure("2151", msg);
544
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
545
            logMetacat.error(msg);
546
            throw new ServiceFailure("2151", msg);
547

    
548
        } catch (NotFound e) {
549
            String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
550
            failure = new ServiceFailure("2151", msg);
551
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
552
            logMetacat.error(msg);
553
            throw new ServiceFailure("2151", msg);
554

    
555
        }
556

    
557
        // verify checksum on the object, if supported
558
        if (object.markSupported()) {
559
            Checksum givenChecksum = sysmeta.getChecksum();
560
            Checksum computedChecksum = null;
561
            try {
562
                computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
563
                object.reset();
564

    
565
            } catch (Exception e) {
566
                String msg = "Error computing checksum on replica: " + e.getMessage();
567
                logMetacat.error(msg);
568
                ServiceFailure sf = new ServiceFailure("2151", msg);
569
                sf.initCause(e);
570
                throw sf;
571
            }
572
            if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
573
                logMetacat.error("Given    checksum for " + pid.getValue() + 
574
                    "is " + givenChecksum.getValue());
575
                logMetacat.error("Computed checksum for " + pid.getValue() + 
576
                    "is " + computedChecksum.getValue());
577
                throw new ServiceFailure("2151",
578
                        "Computed checksum does not match declared checksum");
579
            }
580
        }
581

    
582
        // add it to local store
583
        Identifier retPid;
584
        try {
585
            // skip the MN.create -- this mutates the system metadata and we don't want it to
586
            if ( localId == null ) {
587
                // TODO: this will fail if we already "know" about the identifier
588
            	// FIXME: see https://redmine.dataone.org/issues/2572
589
                retPid = super.create(session, pid, object, sysmeta);
590
                result = (retPid.getValue().equals(pid.getValue()));
591
            }
592
            
593
        } catch (Exception e) {
594
            String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
595
            failure = new ServiceFailure("2151", msg);
596
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
597
            logMetacat.error(msg);
598
            throw new ServiceFailure("2151", msg);
599
            
600
        }
601

    
602
        // finish by setting the replication status
603
        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
604
        return result;
605

    
606
    }
607

    
608
    /**
609
     * Return the object identified by the given object identifier
610
     * 
611
     * @param session - the Session object containing the credentials for the Subject
612
     * @param pid - the object identifier for the given object
613
     * 
614
     * @return inputStream - the input stream of the given object
615
     * 
616
     * @throws InvalidToken
617
     * @throws ServiceFailure
618
     * @throws NotAuthorized
619
     * @throws InvalidRequest
620
     * @throws NotImplemented
621
     */
622
    @Override
623
    public InputStream get(Session session, Identifier pid) 
624
    throws InvalidToken, ServiceFailure, NotAuthorized, NotFound, NotImplemented {
625

    
626
        return super.get(session, pid);
627

    
628
    }
629

    
630
    /**
631
     * Returns a Checksum for the specified object using an accepted hashing algorithm
632
     * 
633
     * @param session - the Session object containing the credentials for the Subject
634
     * @param pid - the object identifier for the given object
635
     * @param algorithm -  the name of an algorithm that will be used to compute 
636
     *                     a checksum of the bytes of the object
637
     * 
638
     * @return checksum - the checksum of the given object
639
     * 
640
     * @throws InvalidToken
641
     * @throws ServiceFailure
642
     * @throws NotAuthorized
643
     * @throws NotFound
644
     * @throws InvalidRequest
645
     * @throws NotImplemented
646
     */
647
    @Override
648
    public Checksum getChecksum(Session session, Identifier pid, String algorithm) 
649
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
650
        InvalidRequest, NotImplemented {
651

    
652
        Checksum checksum = null;
653

    
654
        InputStream inputStream = get(session, pid);
655

    
656
        try {
657
            checksum = ChecksumUtil.checksum(inputStream, algorithm);
658

    
659
        } catch (NoSuchAlgorithmException e) {
660
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
661
                    + e.getMessage());
662
        } catch (IOException e) {
663
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned due to an internal error: "
664
                    + e.getMessage());
665
        }
666

    
667
        if (checksum == null) {
668
            throw new ServiceFailure("1410", "The checksum for the object specified by " + pid.getValue() + "could not be returned.");
669
        }
670

    
671
        return checksum;
672
    }
673

    
674
    /**
675
     * Return the system metadata for a given object
676
     * 
677
     * @param session - the Session object containing the credentials for the Subject
678
     * @param pid - the object identifier for the given object
679
     * 
680
     * @return inputStream - the input stream of the given system metadata object
681
     * 
682
     * @throws InvalidToken
683
     * @throws ServiceFailure
684
     * @throws NotAuthorized
685
     * @throws NotFound
686
     * @throws InvalidRequest
687
     * @throws NotImplemented
688
     */
689
    @Override
690
    public SystemMetadata getSystemMetadata(Session session, Identifier pid) 
691
        throws InvalidToken, ServiceFailure, NotAuthorized, NotFound,
692
        NotImplemented {
693

    
694
        return super.getSystemMetadata(session, pid);
695
    }
696

    
697
    /**
698
     * Retrieve the list of objects present on the MN that match the calling parameters
699
     * 
700
     * @param session - the Session object containing the credentials for the Subject
701
     * @param startTime - Specifies the beginning of the time range from which 
702
     *                    to return object (>=)
703
     * @param endTime - Specifies the beginning of the time range from which 
704
     *                  to return object (>=)
705
     * @param objectFormat - Restrict results to the specified object format
706
     * @param replicaStatus - Indicates if replicated objects should be returned in the list
707
     * @param start - The zero-based index of the first value, relative to the 
708
     *                first record of the resultset that matches the parameters.
709
     * @param count - The maximum number of entries that should be returned in 
710
     *                the response. The Member Node may return less entries 
711
     *                than specified in this value.
712
     * 
713
     * @return objectList - the list of objects matching the criteria
714
     * 
715
     * @throws InvalidToken
716
     * @throws ServiceFailure
717
     * @throws NotAuthorized
718
     * @throws InvalidRequest
719
     * @throws NotImplemented
720
     */
721
    @Override
722
    public ObjectList listObjects(Session session, Date startTime, Date endTime, ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
723
            Integer count) throws NotAuthorized, InvalidRequest, NotImplemented, ServiceFailure, InvalidToken {
724

    
725
        ObjectList objectList = null;
726

    
727
        try {
728
        	// safeguard against large requests
729
            if (count == null || count > MAXIMUM_DB_RECORD_COUNT) {
730
            	count = MAXIMUM_DB_RECORD_COUNT;
731
            }
732
            objectList = IdentifierManager.getInstance().querySystemMetadata(startTime, endTime, objectFormatId, replicaStatus, start, count);
733
        } catch (Exception e) {
734
            throw new ServiceFailure("1580", "Error querying system metadata: " + e.getMessage());
735
        }
736

    
737
        return objectList;
738
    }
739

    
740
    /**
741
     * Return a description of the node's capabilities and services.
742
     * 
743
     * @return node - the technical capabilities of the Member Node
744
     * 
745
     * @throws ServiceFailure
746
     * @throws NotAuthorized
747
     * @throws InvalidRequest
748
     * @throws NotImplemented
749
     */
750
    @Override
751
    public Node getCapabilities() 
752
        throws NotImplemented, ServiceFailure {
753

    
754
        String nodeName = null;
755
        String nodeId = null;
756
        String subject = null;
757
        String contactSubject = null;
758
        String nodeDesc = null;
759
        String nodeTypeString = null;
760
        NodeType nodeType = null;
761
        String mnCoreServiceVersion = null;
762
        String mnReadServiceVersion = null;
763
        String mnAuthorizationServiceVersion = null;
764
        String mnStorageServiceVersion = null;
765
        String mnReplicationServiceVersion = null;
766

    
767
        boolean nodeSynchronize = false;
768
        boolean nodeReplicate = false;
769
        boolean mnCoreServiceAvailable = false;
770
        boolean mnReadServiceAvailable = false;
771
        boolean mnAuthorizationServiceAvailable = false;
772
        boolean mnStorageServiceAvailable = false;
773
        boolean mnReplicationServiceAvailable = false;
774

    
775
        try {
776
            // get the properties of the node based on configuration information
777
            nodeName = PropertyService.getProperty("dataone.nodeName");
778
            nodeId = PropertyService.getProperty("dataone.nodeId");
779
            subject = PropertyService.getProperty("dataone.subject");
780
            contactSubject = PropertyService.getProperty("dataone.contactSubject");
781
            nodeDesc = PropertyService.getProperty("dataone.nodeDescription");
782
            nodeTypeString = PropertyService.getProperty("dataone.nodeType");
783
            nodeType = NodeType.convert(nodeTypeString);
784
            nodeSynchronize = new Boolean(PropertyService.getProperty("dataone.nodeSynchronize")).booleanValue();
785
            nodeReplicate = new Boolean(PropertyService.getProperty("dataone.nodeReplicate")).booleanValue();
786

    
787
            mnCoreServiceVersion = PropertyService.getProperty("dataone.mnCore.serviceVersion");
788
            mnReadServiceVersion = PropertyService.getProperty("dataone.mnRead.serviceVersion");
789
            mnAuthorizationServiceVersion = PropertyService.getProperty("dataone.mnAuthorization.serviceVersion");
790
            mnStorageServiceVersion = PropertyService.getProperty("dataone.mnStorage.serviceVersion");
791
            mnReplicationServiceVersion = PropertyService.getProperty("dataone.mnReplication.serviceVersion");
792

    
793
            mnCoreServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnCore.serviceAvailable")).booleanValue();
794
            mnReadServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnRead.serviceAvailable")).booleanValue();
795
            mnAuthorizationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnAuthorization.serviceAvailable")).booleanValue();
796
            mnStorageServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnStorage.serviceAvailable")).booleanValue();
797
            mnReplicationServiceAvailable = new Boolean(PropertyService.getProperty("dataone.mnReplication.serviceAvailable")).booleanValue();
798

    
799
            // Set the properties of the node based on configuration information and
800
            // calls to current status methods
801
            String serviceName = SystemUtil.getSecureContextURL() + "/" + PropertyService.getProperty("dataone.serviceName");
802
            Node node = new Node();
803
            node.setBaseURL(serviceName + "/" + nodeTypeString);
804
            node.setDescription(nodeDesc);
805

    
806
            // set the node's health information
807
            node.setState(NodeState.UP);
808
            
809
            // set the ping response to the current value
810
            Ping canPing = new Ping();
811
            canPing.setSuccess(false);
812
            try {
813
            	Date pingDate = ping();
814
                canPing.setSuccess(pingDate != null);
815
            } catch (BaseException e) {
816
                e.printStackTrace();
817
                // guess it can't be pinged
818
            }
819
            
820
            node.setPing(canPing);
821

    
822
            NodeReference identifier = new NodeReference();
823
            identifier.setValue(nodeId);
824
            node.setIdentifier(identifier);
825
            Subject s = new Subject();
826
            s.setValue(subject);
827
            node.addSubject(s);
828
            Subject contact = new Subject();
829
            contact.setValue(contactSubject);
830
            node.addContactSubject(contact);
831
            node.setName(nodeName);
832
            node.setReplicate(nodeReplicate);
833
            node.setSynchronize(nodeSynchronize);
834

    
835
            // services: MNAuthorization, MNCore, MNRead, MNReplication, MNStorage
836
            Services services = new Services();
837

    
838
            Service sMNCore = new Service();
839
            sMNCore.setName("MNCore");
840
            sMNCore.setVersion(mnCoreServiceVersion);
841
            sMNCore.setAvailable(mnCoreServiceAvailable);
842

    
843
            Service sMNRead = new Service();
844
            sMNRead.setName("MNRead");
845
            sMNRead.setVersion(mnReadServiceVersion);
846
            sMNRead.setAvailable(mnReadServiceAvailable);
847

    
848
            Service sMNAuthorization = new Service();
849
            sMNAuthorization.setName("MNAuthorization");
850
            sMNAuthorization.setVersion(mnAuthorizationServiceVersion);
851
            sMNAuthorization.setAvailable(mnAuthorizationServiceAvailable);
852

    
853
            Service sMNStorage = new Service();
854
            sMNStorage.setName("MNStorage");
855
            sMNStorage.setVersion(mnStorageServiceVersion);
856
            sMNStorage.setAvailable(mnStorageServiceAvailable);
857

    
858
            Service sMNReplication = new Service();
859
            sMNReplication.setName("MNReplication");
860
            sMNReplication.setVersion(mnReplicationServiceVersion);
861
            sMNReplication.setAvailable(mnReplicationServiceAvailable);
862

    
863
            services.addService(sMNRead);
864
            services.addService(sMNCore);
865
            services.addService(sMNAuthorization);
866
            services.addService(sMNStorage);
867
            services.addService(sMNReplication);
868
            node.setServices(services);
869

    
870
            // Set the schedule for synchronization
871
            Synchronization synchronization = new Synchronization();
872
            Schedule schedule = new Schedule();
873
            Date now = new Date();
874
            schedule.setYear(PropertyService.getProperty("dataone.nodeSynchronization.schedule.year"));
875
            schedule.setMon(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mon"));
876
            schedule.setMday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.mday"));
877
            schedule.setWday(PropertyService.getProperty("dataone.nodeSynchronization.schedule.wday"));
878
            schedule.setHour(PropertyService.getProperty("dataone.nodeSynchronization.schedule.hour"));
879
            schedule.setMin(PropertyService.getProperty("dataone.nodeSynchronization.schedule.min"));
880
            schedule.setSec(PropertyService.getProperty("dataone.nodeSynchronization.schedule.sec"));
881
            synchronization.setSchedule(schedule);
882
            synchronization.setLastHarvested(now);
883
            synchronization.setLastCompleteHarvest(now);
884
            node.setSynchronization(synchronization);
885

    
886
            node.setType(nodeType);
887
            return node;
888

    
889
        } catch (PropertyNotFoundException pnfe) {
890
            String msg = "MNodeService.getCapabilities(): " + "property not found: " + pnfe.getMessage();
891
            logMetacat.error(msg);
892
            throw new ServiceFailure("2162", msg);
893
        }
894
    }
895

    
896
    /**
897
     * Returns the number of operations that have been serviced by the node 
898
     * over time periods of one and 24 hours.
899
     * 
900
     * @param session - the Session object containing the credentials for the Subject
901
     * @param period - An ISO8601 compatible DateTime range specifying the time 
902
     *                 range for which to return operation statistics.
903
     * @param requestor - Limit to operations performed by given requestor identity.
904
     * @param event -  Enumerated value indicating the type of event being examined
905
     * @param format - Limit to events involving objects of the specified format
906
     * 
907
     * @return the desired log records
908
     * 
909
     * @throws InvalidToken
910
     * @throws ServiceFailure
911
     * @throws NotAuthorized
912
     * @throws InvalidRequest
913
     * @throws NotImplemented
914
     */
915
    public MonitorList getOperationStatistics(Session session, Date startTime, 
916
        Date endTime, Subject requestor, Event event, ObjectFormatIdentifier formatId)
917
        throws NotImplemented, ServiceFailure, NotAuthorized, InsufficientResources, UnsupportedType {
918

    
919
        MonitorList monitorList = new MonitorList();
920

    
921
        try {
922

    
923
            // get log records first
924
            Log logs = getLogRecords(session, startTime, endTime, event, null, 0, null);
925

    
926
            // TODO: aggregate by day or hour -- needs clarification
927
            int count = 1;
928
            for (LogEntry logEntry : logs.getLogEntryList()) {
929
                Identifier pid = logEntry.getIdentifier();
930
                Date logDate = logEntry.getDateLogged();
931
                // if we are filtering by format
932
                if (formatId != null) {
933
                    SystemMetadata sysmeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
934
                    if (!sysmeta.getFormatId().getValue().equals(formatId.getValue())) {
935
                        // does not match
936
                        continue;
937
                    }
938
                }
939
                MonitorInfo item = new MonitorInfo();
940
                item.setCount(count);
941
                item.setDate(new java.sql.Date(logDate.getTime()));
942
                monitorList.addMonitorInfo(item);
943

    
944
            }
945
        } catch (Exception e) {
946
            e.printStackTrace();
947
            throw new ServiceFailure("2081", "Could not retrieve statistics: " + e.getMessage());
948
        }
949

    
950
        return monitorList;
951

    
952
    }
953

    
954
    /**
955
     * A callback method used by a CN to indicate to a MN that it cannot 
956
     * complete synchronization of the science metadata identified by pid.  Log
957
     * the event in the metacat event log.
958
     * 
959
     * @param session
960
     * @param syncFailed
961
     * 
962
     * @throws ServiceFailure
963
     * @throws NotAuthorized
964
     * @throws NotImplemented
965
     */
966
    @Override
967
    public boolean synchronizationFailed(Session session, SynchronizationFailed syncFailed) 
968
        throws NotImplemented, ServiceFailure, NotAuthorized {
969

    
970
        String localId;
971
        Identifier pid;
972
        if ( syncFailed.getPid() != null ) {
973
            pid = new Identifier();
974
            pid.setValue(syncFailed.getPid());
975
            boolean allowed;
976
            
977
            //are we allowed? only CNs
978
            try {
979
                allowed = isAdminAuthorized(session);
980
                if ( !allowed ){
981
                    throw new NotAuthorized("2162", 
982
                            "Not allowed to call synchronizationFailed() on this node.");
983
                }
984
            } catch (InvalidToken e) {
985
                throw new NotAuthorized("2162", 
986
                        "Not allowed to call synchronizationFailed() on this node.");
987

    
988
            }
989
            
990
        } else {
991
            throw new ServiceFailure("2161", "The identifier cannot be null.");
992

    
993
        }
994
        
995
        try {
996
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
997
        } catch (McdbDocNotFoundException e) {
998
            throw new ServiceFailure("2161", "The identifier specified by " + 
999
                    syncFailed.getPid() + " was not found on this node.");
1000

    
1001
        }
1002
        // TODO: update the CN URL below when the CNRead.SynchronizationFailed
1003
        // method is changed to include the URL as a parameter
1004
        logMetacat.debug("Synchronization for the object identified by " + 
1005
                pid.getValue() + " failed from " + syncFailed.getNodeId() + 
1006
                " Logging the event to the Metacat EventLog as a 'syncFailed' event.");
1007
        // TODO: use the event type enum when the SYNCHRONIZATION_FAILED event is added
1008
        String principal = Constants.SUBJECT_PUBLIC;
1009
        if (session != null && session.getSubject() != null) {
1010
          principal = session.getSubject().getValue();
1011
        }
1012
        try {
1013
          EventLog.getInstance().log(request.getRemoteAddr(), request.getHeader("User-Agent"), principal, localId, "synchronization_failed");
1014
        } catch (Exception e) {
1015
            throw new ServiceFailure("2161", "Could not log the error for: " + pid.getValue());
1016
        }
1017
        //EventLog.getInstance().log("CN URL WILL GO HERE", 
1018
        //  session.getSubject().getValue(), localId, Event.SYNCHRONIZATION_FAILED);
1019
        return true;
1020

    
1021
    }
1022

    
1023
    /**
1024
     * Essentially a get() but with different logging behavior
1025
     */
1026
    @Override
1027
    public InputStream getReplica(Session session, Identifier pid) 
1028
        throws NotAuthorized, NotImplemented, ServiceFailure, InvalidToken {
1029

    
1030
        logMetacat.info("MNodeService.getReplica() called.");
1031

    
1032
        // cannot be called by public
1033
        if (session == null) {
1034
        	throw new InvalidToken("2183", "No session was provided.");
1035
        }
1036
        
1037
        logMetacat.info("MNodeService.getReplica() called with parameters: \n" +
1038
             "\tSession.Subject      = " + session.getSubject().getValue() + "\n" +
1039
             "\tIdentifier           = " + pid.getValue());
1040

    
1041
        InputStream inputStream = null; // bytes to be returned
1042
        handler = new MetacatHandler(new Timer());
1043
        boolean allowed = false;
1044
        String localId; // the metacat docid for the pid
1045

    
1046
        // get the local docid from Metacat
1047
        try {
1048
            localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
1049
        } catch (McdbDocNotFoundException e) {
1050
            throw new ServiceFailure("2181", "The object specified by " + 
1051
                    pid.getValue() + " does not exist at this node.");
1052
            
1053
        }
1054

    
1055
        Subject targetNodeSubject = session.getSubject();
1056

    
1057
        // check for authorization to replicate, null session to act as this source MN
1058
        try {
1059
            allowed = D1Client.getCN().isNodeAuthorized(null, targetNodeSubject, pid);
1060
        } catch (InvalidToken e1) {
1061
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1062
                + e1.getMessage());
1063
            
1064
        } catch (NotFound e1) {
1065
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1066
                    + e1.getMessage());
1067

    
1068
        } catch (InvalidRequest e1) {
1069
            throw new ServiceFailure("2181", "Could not determine if node is authorized: " 
1070
                    + e1.getMessage());
1071

    
1072
        }
1073

    
1074
        logMetacat.info("Called D1Client.isNodeAuthorized(). Allowed = " + allowed +
1075
            " for identifier " + pid.getValue());
1076

    
1077
        // if the person is authorized, perform the read
1078
        if (allowed) {
1079
            try {
1080
                inputStream = MetacatHandler.read(localId);
1081
            } catch (Exception e) {
1082
                throw new ServiceFailure("1020", "The object specified by " + 
1083
                    pid.getValue() + "could not be returned due to error: " + e.getMessage());
1084
            }
1085
        }
1086

    
1087
        // if we fail to set the input stream
1088
        if (inputStream == null) {
1089
            throw new ServiceFailure("2181", "The object specified by " + 
1090
                pid.getValue() + "does not exist at this node.");
1091
        }
1092

    
1093
        // log the replica event
1094
        String principal = null;
1095
        if (session.getSubject() != null) {
1096
            principal = session.getSubject().getValue();
1097
        }
1098
        EventLog.getInstance().log(request.getRemoteAddr(), 
1099
            request.getHeader("User-Agent"), principal, localId, "replicate");
1100

    
1101
        return inputStream;
1102
    }
1103

    
1104
    /**
1105
     * A method to notify the Member Node that the authoritative copy of 
1106
     * system metadata on the Coordinating Nodes has changed.
1107
     * 
1108
     * @param session   Session information that contains the identity of the 
1109
     *                  calling user as retrieved from the X.509 certificate 
1110
     *                  which must be traceable to the CILogon service.
1111
     * @param serialVersion   The serialVersion of the system metadata
1112
     * @param dateSysMetaLastModified  The time stamp for when the system metadata was changed
1113
     * @throws NotImplemented
1114
     * @throws ServiceFailure
1115
     * @throws NotAuthorized
1116
     * @throws InvalidRequest
1117
     * @throws InvalidToken
1118
     */
1119
    public boolean systemMetadataChanged(Session session, Identifier pid,
1120
        long serialVersion, Date dateSysMetaLastModified) 
1121
        throws NotImplemented, ServiceFailure, NotAuthorized, InvalidRequest,
1122
        InvalidToken {
1123
        
1124
        // cannot be called by public
1125
        if (session == null) {
1126
        	throw new InvalidToken("2183", "No session was provided.");
1127
        }
1128

    
1129
        SystemMetadata currentLocalSysMeta = null;
1130
        SystemMetadata newSysMeta = null;
1131
        CNode cn = D1Client.getCN();
1132
        NodeList nodeList = null;
1133
        Subject callingSubject = null;
1134
        boolean allowed = false;
1135
        
1136
        // are we allowed to call this?
1137
        callingSubject = session.getSubject();
1138
        nodeList = cn.listNodes();
1139
        
1140
        for(Node node : nodeList.getNodeList()) {
1141
            // must be a CN
1142
            if ( node.getType().equals(NodeType.CN)) {
1143
               List<Subject> subjectList = node.getSubjectList();
1144
               // the calling subject must be in the subject list
1145
               if ( subjectList.contains(callingSubject)) {
1146
                   allowed = true;
1147
                   
1148
               }
1149
               
1150
            }
1151
        }
1152
        
1153
        if (!allowed ) {
1154
            String msg = "The subject identified by " + callingSubject.getValue() +
1155
              " is not authorized to call this service.";
1156
            throw new NotAuthorized("1331", msg);
1157
            
1158
        }
1159
        
1160
        // compare what we have locally to what is sent in the change notification
1161
        try {
1162
            currentLocalSysMeta = HazelcastService.getInstance().getSystemMetadataMap().get(pid);
1163
             
1164
        } catch (RuntimeException e) {
1165
            String msg = "SystemMetadata for pid " + pid.getValue() +
1166
              " couldn't be updated because it couldn't be found locally: " +
1167
              e.getMessage();
1168
            logMetacat.error(msg);
1169
            ServiceFailure sf = new ServiceFailure("1333", msg);
1170
            sf.initCause(e);
1171
            throw sf; 
1172
        }
1173
        
1174
        if (currentLocalSysMeta.getSerialVersion().longValue() < serialVersion ) {
1175
            try {
1176
                newSysMeta = cn.getSystemMetadata(null, pid);
1177
            } catch (NotFound e) {
1178
                // huh? you just said you had it
1179
            	String msg = "On updating the local copy of system metadata " + 
1180
                "for pid " + pid.getValue() +", the CN reports it is not found." +
1181
                " The error message was: " + e.getMessage();
1182
                logMetacat.error(msg);
1183
                ServiceFailure sf = new ServiceFailure("1333", msg);
1184
                sf.initCause(e);
1185
                throw sf;
1186
            }
1187
            
1188
            // update the local copy of system metadata for the pid
1189
            try {
1190
                HazelcastService.getInstance().getSystemMetadataMap().put(newSysMeta.getIdentifier(), newSysMeta);
1191
                logMetacat.info("Updated local copy of system metadata for pid " +
1192
                    pid.getValue() + " after change notification from the CN.");
1193
                
1194
            } catch (RuntimeException e) {
1195
                String msg = "SystemMetadata for pid " + pid.getValue() +
1196
                  " couldn't be updated: " +
1197
                  e.getMessage();
1198
                logMetacat.error(msg);
1199
                ServiceFailure sf = new ServiceFailure("1333", msg);
1200
                sf.initCause(e);
1201
                throw sf;
1202
            }
1203
        }
1204
        
1205
        return true;
1206
        
1207
    }
1208
    
1209
    /*
1210
     * Set the replication status for the object on the Coordinating Node
1211
     * 
1212
     * @param session - the session for the this target node
1213
     * @param pid - the identifier of the object being updated
1214
     * @param nodeId - the identifier of this target node
1215
     * @param status - the replication status to set
1216
     * @param failure - the exception to include, if any
1217
     */
1218
    private void setReplicationStatus(Session session, Identifier pid, 
1219
        NodeReference nodeId, ReplicationStatus status, BaseException failure) 
1220
        throws ServiceFailure, NotImplemented, NotAuthorized, 
1221
        InvalidRequest {
1222
        
1223
        // call the CN as the MN to set the replication status
1224
        try {
1225
            this.cn = D1Client.getCN();
1226
            this.cn.setReplicationStatus(session, pid, nodeId,
1227
                    status, failure);
1228
            
1229
        } catch (InvalidToken e) {
1230
        	String msg = "Could not set the replication status for " + pid.getValue() + " on the CN (InvalidToken): " + e.getMessage();
1231
            logMetacat.error(msg);
1232
        	throw new ServiceFailure("2151",
1233
                    msg);
1234
            
1235
        } catch (NotFound e) {
1236
        	String msg = "Could not set the replication status for " + pid.getValue() + " on the CN (NotFound): " + e.getMessage();
1237
            logMetacat.error(msg);
1238
        	throw new ServiceFailure("2151",
1239
                    msg);
1240
            
1241
        }
1242
    }
1243

    
1244
	@Override
1245
	public Identifier generateIdentifier(Session session, String scheme, String fragment)
1246
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1247
			InvalidRequest {
1248
		
1249
		Identifier identifier = new Identifier();
1250
		
1251
		// handle different schemes
1252
		if (scheme.equalsIgnoreCase(UUID_SCHEME)) {
1253
			// UUID
1254
			UUID uuid = UUID.randomUUID();
1255
            identifier.setValue(UUID_PREFIX + uuid.toString());
1256
		} else if (scheme.equalsIgnoreCase(DOI_SCHEME)) {
1257
			// generate a DOI
1258
			try {
1259
				identifier = DOIService.getInstance().generateDOI();
1260
			} catch (EZIDException e) {
1261
				ServiceFailure sf = new ServiceFailure("2191", "Could not generate DOI: " + e.getMessage());
1262
				sf.initCause(e);
1263
				throw sf;
1264
			}
1265
		} else {
1266
			// default if we don't know the scheme
1267
			if (fragment != null) {
1268
				// for now, just autogen with fragment
1269
				String autogenId = DocumentUtil.generateDocumentId(fragment, 0);
1270
				identifier.setValue(autogenId);			
1271
			} else {
1272
				// autogen with no fragment
1273
				String autogenId = DocumentUtil.generateDocumentId(0);
1274
				identifier.setValue(autogenId);
1275
			}
1276
		}
1277
		
1278
		// TODO: reserve the identifier with the CN. We can only do this when
1279
		// 1) the MN is part of a CN cluster
1280
		// 2) the request is from an authenticated user
1281
		
1282
		return identifier;
1283
	}
1284

    
1285
	@Override
1286
	public boolean isAuthorized(Identifier pid, Permission permission)
1287
			throws ServiceFailure, InvalidRequest, InvalidToken, NotFound,
1288
			NotAuthorized, NotImplemented {
1289

    
1290
		return isAuthorized(null, pid, permission);
1291
	}
1292

    
1293
	@Override
1294
	public boolean systemMetadataChanged(Identifier pid, long serialVersion, Date dateSysMetaLastModified)
1295
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1296
			InvalidRequest {
1297

    
1298
		return systemMetadataChanged(null, pid, serialVersion, dateSysMetaLastModified);
1299
	}
1300

    
1301
	@Override
1302
	public Log getLogRecords(Date fromDate, Date toDate, Event event, String pidFilter,
1303
			Integer start, Integer count) throws InvalidRequest, InvalidToken,
1304
			NotAuthorized, NotImplemented, ServiceFailure {
1305

    
1306
		return getLogRecords(null, fromDate, toDate, event, pidFilter, start, count);
1307
	}
1308

    
1309
	@Override
1310
	public DescribeResponse describe(Identifier pid) throws InvalidToken,
1311
			NotAuthorized, NotImplemented, ServiceFailure, NotFound {
1312

    
1313
		return describe(null, pid);
1314
	}
1315

    
1316
	@Override
1317
	public InputStream get(Identifier pid) throws InvalidToken, NotAuthorized,
1318
			NotImplemented, ServiceFailure, NotFound, InsufficientResources {
1319

    
1320
		return get(null, pid);
1321
	}
1322

    
1323
	@Override
1324
	public Checksum getChecksum(Identifier pid, String algorithm)
1325
			throws InvalidRequest, InvalidToken, NotAuthorized, NotImplemented,
1326
			ServiceFailure, NotFound {
1327

    
1328
		return getChecksum(null, pid, algorithm);
1329
	}
1330

    
1331
	@Override
1332
	public SystemMetadata getSystemMetadata(Identifier pid)
1333
			throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1334
			NotFound {
1335

    
1336
		return getSystemMetadata(null, pid);
1337
	}
1338

    
1339
	@Override
1340
	public ObjectList listObjects(Date startTime, Date endTime,
1341
			ObjectFormatIdentifier objectFormatId, Boolean replicaStatus, Integer start,
1342
			Integer count) throws InvalidRequest, InvalidToken, NotAuthorized,
1343
			NotImplemented, ServiceFailure {
1344

    
1345
		return listObjects(null, startTime, endTime, objectFormatId, replicaStatus, start, count);
1346
	}
1347

    
1348
	@Override
1349
	public boolean synchronizationFailed(SynchronizationFailed syncFailed)
1350
			throws InvalidToken, NotAuthorized, NotImplemented, ServiceFailure {
1351

    
1352
		return synchronizationFailed(null, syncFailed);
1353
	}
1354

    
1355
	@Override
1356
	public InputStream getReplica(Identifier pid) throws InvalidToken,
1357
			NotAuthorized, NotImplemented, ServiceFailure, NotFound,
1358
			InsufficientResources {
1359

    
1360
		return getReplica(null, pid);
1361
	}
1362

    
1363
	@Override
1364
	public boolean replicate(SystemMetadata sysmeta, NodeReference sourceNode)
1365
			throws NotImplemented, ServiceFailure, NotAuthorized,
1366
			InvalidRequest, InvalidToken, InsufficientResources,
1367
			UnsupportedType {
1368

    
1369
		return replicate(null, sysmeta, sourceNode);
1370
	}
1371

    
1372
	@Override
1373
	public Identifier create(Identifier pid, InputStream object,
1374
			SystemMetadata sysmeta) throws IdentifierNotUnique,
1375
			InsufficientResources, InvalidRequest, InvalidSystemMetadata,
1376
			InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1377
			UnsupportedType {
1378

    
1379
		return create(null, pid, object, sysmeta);
1380
	}
1381

    
1382
	@Override
1383
	public Identifier delete(Identifier pid) throws InvalidToken,
1384
			ServiceFailure, NotAuthorized, NotFound, NotImplemented {
1385

    
1386
		return delete(null, pid);
1387
	}
1388

    
1389
	@Override
1390
	public Identifier generateIdentifier(String scheme, String fragment)
1391
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1392
			InvalidRequest {
1393

    
1394
		return generateIdentifier(null, scheme, fragment);
1395
	}
1396

    
1397
	@Override
1398
	public Identifier update(Identifier pid, InputStream object,
1399
			Identifier newPid, SystemMetadata sysmeta) throws IdentifierNotUnique,
1400
			InsufficientResources, InvalidRequest, InvalidSystemMetadata,
1401
			InvalidToken, NotAuthorized, NotImplemented, ServiceFailure,
1402
			UnsupportedType, NotFound {
1403

    
1404
		return update(null, pid, object, newPid, sysmeta);
1405
	}
1406

    
1407
	@Override
1408
	public QueryEngineDescription getQueryEngineDescription(String engine)
1409
			throws InvalidToken, ServiceFailure, NotAuthorized, NotImplemented,
1410
			NotFound {
1411
	    if(engine != null && engine.equals(PATHQUERY)) {
1412
	        QueryEngineDescription qed = new QueryEngineDescription();
1413
	        qed.setName(PATHQUERY);
1414
	        qed.setQueryEngineVersion("1.0");
1415
	        qed.addAdditionalInfo("This is the traditional structured query for Metacat");
1416
	        Vector<String> pathsForIndexing = null;
1417
	        try {
1418
	            pathsForIndexing = SystemUtil.getPathsForIndexing();
1419
	        } catch (MetacatUtilException e) {
1420
	            logMetacat.warn("Could not get index paths", e);
1421
	        }
1422
	        for (String fieldName: pathsForIndexing) {
1423
	            QueryField field = new QueryField();
1424
	            field.addDescription("Indexed field for path '" + fieldName + "'");
1425
	            field.setName(fieldName);
1426
	            field.setReturnable(true);
1427
	            field.setSearchable(true);
1428
	            field.setSortable(false);
1429
	            // TODO: determine type and multivaluedness
1430
	            field.setType(String.class.getName());
1431
	            //field.setMultivalued(true);
1432
	            qed.addQueryField(field);
1433
	        }
1434
	        return qed;
1435
	    } else if (engine != null && engine.equals(MetacatSolrIndex.SOLRQUERY)) {
1436
	        try {
1437
	            QueryEngineDescription qed = MetacatSolrEngineDescriptionHandler.getInstance().getQueryEngineDescritpion();
1438
	            return qed;
1439
	        } catch (Exception e) {
1440
	            e.printStackTrace();
1441
	            throw new ServiceFailure("Solr server error", e.getMessage());
1442
	        }
1443
	    } else {
1444
	        throw new NotFound("404", "The Metacat member node can't find the query engine - "+engine);
1445
	    }
1446
		
1447
	}
1448

    
1449
	@Override
1450
	public QueryEngineList listQueryEngines() throws InvalidToken,
1451
			ServiceFailure, NotAuthorized, NotImplemented {
1452
		QueryEngineList qel = new QueryEngineList();
1453
		// support pathquery initially
1454
		qel.addQueryEngine(PATHQUERY);
1455
		qel.addQueryEngine(MetacatSolrIndex.SOLRQUERY);
1456
		return qel;
1457
	}
1458

    
1459
	@Override
1460
	public InputStream query(String engine, String query) throws InvalidToken,
1461
			ServiceFailure, NotAuthorized, InvalidRequest, NotImplemented,
1462
			NotFound {
1463
	    String user = Constants.SUBJECT_PUBLIC;
1464
        String[] groups= null;
1465
        Set<Subject> subjects = null;
1466
        if (session != null) {
1467
            user = session.getSubject().getValue();
1468
            subjects = AuthUtils.authorizedClientSubjects(session);
1469
            if (subjects != null) {
1470
                List<String> groupList = new ArrayList<String>();
1471
                for (Subject subject: subjects) {
1472
                    groupList.add(subject.getValue());
1473
                }
1474
                groups = groupList.toArray(new String[0]);
1475
            }
1476
        } else {
1477
            //add the public user subject to the set 
1478
            Subject subject = new Subject();
1479
            subject.setValue(Constants.SUBJECT_PUBLIC);
1480
            subjects = new HashSet();
1481
            subjects.add(subject);
1482
        }
1483
        //System.out.println("====== user is "+user);
1484
        //System.out.println("====== groups are "+groups);
1485
		if (engine != null && engine.equals(PATHQUERY)) {
1486
			try {
1487
				DBQuery queryobj = new DBQuery();
1488
				
1489
				String results = queryobj.performPathquery(query, user, groups);
1490
				return new ByteArrayInputStream(results.getBytes(MetaCatServlet.DEFAULT_ENCODING));
1491

    
1492
			} catch (Exception e) {
1493
				
1494
			}
1495
			
1496
		} else if (engine != null && engine.equals(MetacatSolrIndex.SOLRQUERY)) {
1497
		    logMetacat.info("The query is ==================================== \n"+query);
1498
		    try {
1499
		        
1500
                return MetacatSolrIndex.getInstance().query(query, subjects);
1501
            } catch (Exception e) {
1502
                // TODO Auto-generated catch block
1503
                throw new ServiceFailure("Solr server error", e.getMessage());
1504
            } 
1505
		}
1506
		return null;
1507
	}
1508
    
1509
}
(4-4/6)