Project

General

Profile

« Previous | Next » 

Revision 9544

Added by Jing Tao over 8 years ago

Close the input stream object on the MN.replicate method.

View differences:

src/edu/ucsb/nceas/metacat/admin/upgrade/dataone/GenerateORE.java
224 224
				
225 225
				// get the original ORE map
226 226
				InputStream originalOreStream = mn.get(null, orePid);
227
				Map<Identifier, Map<Identifier, List<Identifier>>> originalOre = ResourceMapFactory.getInstance().parseResourceMap(originalOreStream);
227
				Map<Identifier, Map<Identifier, List<Identifier>>> originalOre = null;
228
				try {
229
				     originalOre = ResourceMapFactory.getInstance().parseResourceMap(originalOreStream);
230
				} finally {
231
				    IOUtils.closeQuietly(originalOreStream);
232
				}
233
				
228 234

  
229 235
				// generate the updated ORE map, in this case we aren't changing any values, just altering the serialization using a newer foresite library
230 236
				Identifier updatedOrePid = new Identifier();
src/edu/ucsb/nceas/metacat/dataone/MNodeService.java
711 711

  
712 712
        }
713 713
        
714

  
715 714
        try {
716
            // do we already have a replica?
717 715
            try {
718
                localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
719
                // if we have a local id, get the local object
716
                // do we already have a replica?
720 717
                try {
721
                    object = MetacatHandler.read(localId);
722
                } catch (Exception e) {
723
                	// NOTE: we may already know about this ID because it could be a data file described by a metadata file
724
                	// https://redmine.dataone.org/issues/2572
725
                	// TODO: fix this so that we don't prevent ourselves from getting replicas
726
                	
727
                    // let the CN know that the replication failed
728
                	logMetacat.warn("Object content not found on this node despite having localId: " + localId);
729
                	String msg = "Can't read the object bytes properly, replica is invalid.";
730
                    ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
731
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
732
                    logMetacat.warn(msg);
733
                    throw serviceFailure;
718
                    localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
719
                    // if we have a local id, get the local object
720
                    try {
721
                        object = MetacatHandler.read(localId);
722
                    } catch (Exception e) {
723
                        // NOTE: we may already know about this ID because it could be a data file described by a metadata file
724
                        // https://redmine.dataone.org/issues/2572
725
                        // TODO: fix this so that we don't prevent ourselves from getting replicas
726
                        
727
                        // let the CN know that the replication failed
728
                        logMetacat.warn("Object content not found on this node despite having localId: " + localId);
729
                        String msg = "Can't read the object bytes properly, replica is invalid.";
730
                        ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
731
                        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
732
                        logMetacat.warn(msg);
733
                        throw serviceFailure;
734
                        
735
                    }
736

  
737
                } catch (McdbDocNotFoundException e) {
738
                    logMetacat.info("No replica found. Continuing.");
734 739
                    
740
                } catch (SQLException ee) {
741
                    throw new ServiceFailure("2151", "Couldn't identify the local id of the object with the specified identifier "
742
                                            +pid.getValue()+" since - "+ee.getMessage());
735 743
                }
744
                
745
                // no local replica, get a replica
746
                if ( object == null ) {
747
                    /*boolean success = true;
748
                    try {
749
                        //use the v2 ping api to connect the source node
750
                        mn.ping();
751
                    } catch (Exception e) {
752
                        success = false;
753
                    }*/
754
                    D1NodeVersionChecker checker = new D1NodeVersionChecker(sourceNode);
755
                    String nodeVersion = checker.getVersion("MNRead");
756
                    if(nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V1)) {
757
                        //The source node is a v1 node, we use the v1 api
758
                        org.dataone.client.v1.MNode mNodeV1 =  org.dataone.client.v1.itk.D1Client.getMN(sourceNode);
759
                        object = mNodeV1.getReplica(thisNodeSession, pid);
760
                    } else if (nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V2)){
761
                     // session should be null to use the default certificate
762
                        // location set in the Certificate manager
763
                        MNode mn = D1Client.getMN(sourceNode);
764
                        object = mn.getReplica(thisNodeSession, pid);
765
                    } else {
766
                        throw new ServiceFailure("2151", "The version of MNRead service is "+nodeVersion+" in the source node "+sourceNode.getValue()+" and it is supported. Please check the information in the cn");
767
                    }
768
                    
769
                    logMetacat.info("MNodeService.getReplica() called for identifier "
770
                                    + pid.getValue());
736 771

  
737
            } catch (McdbDocNotFoundException e) {
738
                logMetacat.info("No replica found. Continuing.");
739
                
740
            } catch (SQLException ee) {
741
                throw new ServiceFailure("2151", "Couldn't identify the local id of the object with the specified identifier "
742
                                        +pid.getValue()+" since - "+ee.getMessage());
772
                }
773

  
774
            } catch (InvalidToken e) {            
775
                String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
776
                failure = new ServiceFailure("2151", msg);
777
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
778
                logMetacat.error(msg);
779
                throw new ServiceFailure("2151", msg);
780

  
781
            } catch (NotFound e) {
782
                String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
783
                failure = new ServiceFailure("2151", msg);
784
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
785
                logMetacat.error(msg);
786
                throw new ServiceFailure("2151", msg);
787

  
788
            } catch (NotAuthorized e) {
789
                String msg = "Could not retrieve object to replicate (NotAuthorized): "+ e.getMessage();
790
                failure = new ServiceFailure("2151", msg);
791
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
792
                logMetacat.error(msg);
793
                throw new ServiceFailure("2151", msg);
794
            } catch (NotImplemented e) {
795
                String msg = "Could not retrieve object to replicate (mn.getReplica NotImplemented): "+ e.getMessage();
796
                failure = new ServiceFailure("2151", msg);
797
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
798
                logMetacat.error(msg);
799
                throw new ServiceFailure("2151", msg);
800
            } catch (ServiceFailure e) {
801
                String msg = "Could not retrieve object to replicate (ServiceFailure): "+ e.getMessage();
802
                failure = new ServiceFailure("2151", msg);
803
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
804
                logMetacat.error(msg);
805
                throw new ServiceFailure("2151", msg);
806
            } catch (InsufficientResources e) {
807
                String msg = "Could not retrieve object to replicate (InsufficientResources): "+ e.getMessage();
808
                failure = new ServiceFailure("2151", msg);
809
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
810
                logMetacat.error(msg);
811
                throw new ServiceFailure("2151", msg);
743 812
            }
744
            
745
            // no local replica, get a replica
746
            if ( object == null ) {
747
                /*boolean success = true;
813

  
814
            // verify checksum on the object, if supported
815
            if (object.markSupported()) {
816
                Checksum givenChecksum = sysmeta.getChecksum();
817
                Checksum computedChecksum = null;
748 818
                try {
749
                    //use the v2 ping api to connect the source node
750
                    mn.ping();
819
                    computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
820
                    object.reset();
821

  
751 822
                } catch (Exception e) {
752
                    success = false;
753
                }*/
754
                D1NodeVersionChecker checker = new D1NodeVersionChecker(sourceNode);
755
                String nodeVersion = checker.getVersion("MNRead");
756
                if(nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V1)) {
757
                    //The source node is a v1 node, we use the v1 api
758
                    org.dataone.client.v1.MNode mNodeV1 =  org.dataone.client.v1.itk.D1Client.getMN(sourceNode);
759
                    object = mNodeV1.getReplica(thisNodeSession, pid);
760
                } else if (nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V2)){
761
                 // session should be null to use the default certificate
762
                    // location set in the Certificate manager
763
                    MNode mn = D1Client.getMN(sourceNode);
764
                    object = mn.getReplica(thisNodeSession, pid);
765
                } else {
766
                    throw new ServiceFailure("2151", "The version of MNRead service is "+nodeVersion+" in the source node "+sourceNode.getValue()+" and it is supported. Please check the information in the cn");
823
                    String msg = "Error computing checksum on replica: " + e.getMessage();
824
                    logMetacat.error(msg);
825
                    ServiceFailure sf = new ServiceFailure("2151", msg);
826
                    sf.initCause(e);
827
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, sf);
828
                    throw sf;
767 829
                }
768
                
769
                logMetacat.info("MNodeService.getReplica() called for identifier "
770
                                + pid.getValue());
771

  
830
                if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
831
                    logMetacat.error("Given    checksum for " + pid.getValue() + 
832
                        "is " + givenChecksum.getValue());
833
                    logMetacat.error("Computed checksum for " + pid.getValue() + 
834
                        "is " + computedChecksum.getValue());
835
                    String msg = "Computed checksum does not match declared checksum";
836
                    failure = new ServiceFailure("2151", msg);
837
                    setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
838
                    throw new ServiceFailure("2151", msg);
839
                }
772 840
            }
773 841

  
774
        } catch (InvalidToken e) {            
775
            String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
776
            failure = new ServiceFailure("2151", msg);
777
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
778
            logMetacat.error(msg);
779
            throw new ServiceFailure("2151", msg);
780

  
781
        } catch (NotFound e) {
782
            String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
783
            failure = new ServiceFailure("2151", msg);
784
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
785
            logMetacat.error(msg);
786
            throw new ServiceFailure("2151", msg);
787

  
788
        } catch (NotAuthorized e) {
789
            String msg = "Could not retrieve object to replicate (NotAuthorized): "+ e.getMessage();
790
            failure = new ServiceFailure("2151", msg);
791
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
792
            logMetacat.error(msg);
793
            throw new ServiceFailure("2151", msg);
794
        } catch (NotImplemented e) {
795
            String msg = "Could not retrieve object to replicate (mn.getReplica NotImplemented): "+ e.getMessage();
796
            failure = new ServiceFailure("2151", msg);
797
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
798
            logMetacat.error(msg);
799
            throw new ServiceFailure("2151", msg);
800
        } catch (ServiceFailure e) {
801
            String msg = "Could not retrieve object to replicate (ServiceFailure): "+ e.getMessage();
802
            failure = new ServiceFailure("2151", msg);
803
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
804
            logMetacat.error(msg);
805
            throw new ServiceFailure("2151", msg);
806
        } catch (InsufficientResources e) {
807
            String msg = "Could not retrieve object to replicate (InsufficientResources): "+ e.getMessage();
808
            failure = new ServiceFailure("2151", msg);
809
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
810
            logMetacat.error(msg);
811
            throw new ServiceFailure("2151", msg);
812
        }
813

  
814
        // verify checksum on the object, if supported
815
        if (object.markSupported()) {
816
            Checksum givenChecksum = sysmeta.getChecksum();
817
            Checksum computedChecksum = null;
842
            // add it to local store
843
            Identifier retPid;
818 844
            try {
819
                computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
820
                object.reset();
821

  
845
                // skip the MN.create -- this mutates the system metadata and we don't want it to
846
                if ( localId == null ) {
847
                    // TODO: this will fail if we already "know" about the identifier
848
                    // FIXME: see https://redmine.dataone.org/issues/2572
849
                    retPid = super.create(session, pid, object, sysmeta);
850
                    result = (retPid.getValue().equals(pid.getValue()));
851
                }
852
                
822 853
            } catch (Exception e) {
823
                String msg = "Error computing checksum on replica: " + e.getMessage();
824
                logMetacat.error(msg);
825
                ServiceFailure sf = new ServiceFailure("2151", msg);
826
                sf.initCause(e);
827
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, sf);
828
                throw sf;
829
            }
830
            if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
831
                logMetacat.error("Given    checksum for " + pid.getValue() + 
832
                    "is " + givenChecksum.getValue());
833
                logMetacat.error("Computed checksum for " + pid.getValue() + 
834
                    "is " + computedChecksum.getValue());
835
                String msg = "Computed checksum does not match declared checksum";
854
                String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
836 855
                failure = new ServiceFailure("2151", msg);
837 856
                setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
857
                logMetacat.error(msg);
838 858
                throw new ServiceFailure("2151", msg);
859
                
839 860
            }
861
        } finally {
862
            IOUtils.closeQuietly(object);
840 863
        }
864
        
841 865

  
842
        // add it to local store
843
        Identifier retPid;
844
        try {
845
            // skip the MN.create -- this mutates the system metadata and we don't want it to
846
            if ( localId == null ) {
847
                // TODO: this will fail if we already "know" about the identifier
848
            	// FIXME: see https://redmine.dataone.org/issues/2572
849
                retPid = super.create(session, pid, object, sysmeta);
850
                result = (retPid.getValue().equals(pid.getValue()));
851
            }
852
            
853
        } catch (Exception e) {
854
            String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
855
            failure = new ServiceFailure("2151", msg);
856
            setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
857
            logMetacat.error(msg);
858
            throw new ServiceFailure("2151", msg);
859
            
860
        }
861

  
862 866
        // finish by setting the replication status
863 867
        setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
864 868
        return result;

Also available in: Unified diff