711 |
711 |
|
712 |
712 |
}
|
713 |
713 |
|
714 |
|
|
715 |
714 |
try {
|
716 |
|
// do we already have a replica?
|
717 |
715 |
try {
|
718 |
|
localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
|
719 |
|
// if we have a local id, get the local object
|
|
716 |
// do we already have a replica?
|
720 |
717 |
try {
|
721 |
|
object = MetacatHandler.read(localId);
|
722 |
|
} catch (Exception e) {
|
723 |
|
// NOTE: we may already know about this ID because it could be a data file described by a metadata file
|
724 |
|
// https://redmine.dataone.org/issues/2572
|
725 |
|
// TODO: fix this so that we don't prevent ourselves from getting replicas
|
726 |
|
|
727 |
|
// let the CN know that the replication failed
|
728 |
|
logMetacat.warn("Object content not found on this node despite having localId: " + localId);
|
729 |
|
String msg = "Can't read the object bytes properly, replica is invalid.";
|
730 |
|
ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
|
731 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
|
732 |
|
logMetacat.warn(msg);
|
733 |
|
throw serviceFailure;
|
|
718 |
localId = IdentifierManager.getInstance().getLocalId(pid.getValue());
|
|
719 |
// if we have a local id, get the local object
|
|
720 |
try {
|
|
721 |
object = MetacatHandler.read(localId);
|
|
722 |
} catch (Exception e) {
|
|
723 |
// NOTE: we may already know about this ID because it could be a data file described by a metadata file
|
|
724 |
// https://redmine.dataone.org/issues/2572
|
|
725 |
// TODO: fix this so that we don't prevent ourselves from getting replicas
|
|
726 |
|
|
727 |
// let the CN know that the replication failed
|
|
728 |
logMetacat.warn("Object content not found on this node despite having localId: " + localId);
|
|
729 |
String msg = "Can't read the object bytes properly, replica is invalid.";
|
|
730 |
ServiceFailure serviceFailure = new ServiceFailure("2151", msg);
|
|
731 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, serviceFailure);
|
|
732 |
logMetacat.warn(msg);
|
|
733 |
throw serviceFailure;
|
|
734 |
|
|
735 |
}
|
|
736 |
|
|
737 |
} catch (McdbDocNotFoundException e) {
|
|
738 |
logMetacat.info("No replica found. Continuing.");
|
734 |
739 |
|
|
740 |
} catch (SQLException ee) {
|
|
741 |
throw new ServiceFailure("2151", "Couldn't identify the local id of the object with the specified identifier "
|
|
742 |
+pid.getValue()+" since - "+ee.getMessage());
|
735 |
743 |
}
|
|
744 |
|
|
745 |
// no local replica, get a replica
|
|
746 |
if ( object == null ) {
|
|
747 |
/*boolean success = true;
|
|
748 |
try {
|
|
749 |
//use the v2 ping api to connect the source node
|
|
750 |
mn.ping();
|
|
751 |
} catch (Exception e) {
|
|
752 |
success = false;
|
|
753 |
}*/
|
|
754 |
D1NodeVersionChecker checker = new D1NodeVersionChecker(sourceNode);
|
|
755 |
String nodeVersion = checker.getVersion("MNRead");
|
|
756 |
if(nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V1)) {
|
|
757 |
//The source node is a v1 node, we use the v1 api
|
|
758 |
org.dataone.client.v1.MNode mNodeV1 = org.dataone.client.v1.itk.D1Client.getMN(sourceNode);
|
|
759 |
object = mNodeV1.getReplica(thisNodeSession, pid);
|
|
760 |
} else if (nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V2)){
|
|
761 |
// session should be null to use the default certificate
|
|
762 |
// location set in the Certificate manager
|
|
763 |
MNode mn = D1Client.getMN(sourceNode);
|
|
764 |
object = mn.getReplica(thisNodeSession, pid);
|
|
765 |
} else {
|
|
766 |
throw new ServiceFailure("2151", "The version of MNRead service is "+nodeVersion+" in the source node "+sourceNode.getValue()+" and it is supported. Please check the information in the cn");
|
|
767 |
}
|
|
768 |
|
|
769 |
logMetacat.info("MNodeService.getReplica() called for identifier "
|
|
770 |
+ pid.getValue());
|
736 |
771 |
|
737 |
|
} catch (McdbDocNotFoundException e) {
|
738 |
|
logMetacat.info("No replica found. Continuing.");
|
739 |
|
|
740 |
|
} catch (SQLException ee) {
|
741 |
|
throw new ServiceFailure("2151", "Couldn't identify the local id of the object with the specified identifier "
|
742 |
|
+pid.getValue()+" since - "+ee.getMessage());
|
|
772 |
}
|
|
773 |
|
|
774 |
} catch (InvalidToken e) {
|
|
775 |
String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
|
|
776 |
failure = new ServiceFailure("2151", msg);
|
|
777 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
778 |
logMetacat.error(msg);
|
|
779 |
throw new ServiceFailure("2151", msg);
|
|
780 |
|
|
781 |
} catch (NotFound e) {
|
|
782 |
String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
|
|
783 |
failure = new ServiceFailure("2151", msg);
|
|
784 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
785 |
logMetacat.error(msg);
|
|
786 |
throw new ServiceFailure("2151", msg);
|
|
787 |
|
|
788 |
} catch (NotAuthorized e) {
|
|
789 |
String msg = "Could not retrieve object to replicate (NotAuthorized): "+ e.getMessage();
|
|
790 |
failure = new ServiceFailure("2151", msg);
|
|
791 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
792 |
logMetacat.error(msg);
|
|
793 |
throw new ServiceFailure("2151", msg);
|
|
794 |
} catch (NotImplemented e) {
|
|
795 |
String msg = "Could not retrieve object to replicate (mn.getReplica NotImplemented): "+ e.getMessage();
|
|
796 |
failure = new ServiceFailure("2151", msg);
|
|
797 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
798 |
logMetacat.error(msg);
|
|
799 |
throw new ServiceFailure("2151", msg);
|
|
800 |
} catch (ServiceFailure e) {
|
|
801 |
String msg = "Could not retrieve object to replicate (ServiceFailure): "+ e.getMessage();
|
|
802 |
failure = new ServiceFailure("2151", msg);
|
|
803 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
804 |
logMetacat.error(msg);
|
|
805 |
throw new ServiceFailure("2151", msg);
|
|
806 |
} catch (InsufficientResources e) {
|
|
807 |
String msg = "Could not retrieve object to replicate (InsufficientResources): "+ e.getMessage();
|
|
808 |
failure = new ServiceFailure("2151", msg);
|
|
809 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
810 |
logMetacat.error(msg);
|
|
811 |
throw new ServiceFailure("2151", msg);
|
743 |
812 |
}
|
744 |
|
|
745 |
|
// no local replica, get a replica
|
746 |
|
if ( object == null ) {
|
747 |
|
/*boolean success = true;
|
|
813 |
|
|
814 |
// verify checksum on the object, if supported
|
|
815 |
if (object.markSupported()) {
|
|
816 |
Checksum givenChecksum = sysmeta.getChecksum();
|
|
817 |
Checksum computedChecksum = null;
|
748 |
818 |
try {
|
749 |
|
//use the v2 ping api to connect the source node
|
750 |
|
mn.ping();
|
|
819 |
computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
|
|
820 |
object.reset();
|
|
821 |
|
751 |
822 |
} catch (Exception e) {
|
752 |
|
success = false;
|
753 |
|
}*/
|
754 |
|
D1NodeVersionChecker checker = new D1NodeVersionChecker(sourceNode);
|
755 |
|
String nodeVersion = checker.getVersion("MNRead");
|
756 |
|
if(nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V1)) {
|
757 |
|
//The source node is a v1 node, we use the v1 api
|
758 |
|
org.dataone.client.v1.MNode mNodeV1 = org.dataone.client.v1.itk.D1Client.getMN(sourceNode);
|
759 |
|
object = mNodeV1.getReplica(thisNodeSession, pid);
|
760 |
|
} else if (nodeVersion != null && nodeVersion.equals(D1NodeVersionChecker.V2)){
|
761 |
|
// session should be null to use the default certificate
|
762 |
|
// location set in the Certificate manager
|
763 |
|
MNode mn = D1Client.getMN(sourceNode);
|
764 |
|
object = mn.getReplica(thisNodeSession, pid);
|
765 |
|
} else {
|
766 |
|
throw new ServiceFailure("2151", "The version of MNRead service is "+nodeVersion+" in the source node "+sourceNode.getValue()+" and it is supported. Please check the information in the cn");
|
|
823 |
String msg = "Error computing checksum on replica: " + e.getMessage();
|
|
824 |
logMetacat.error(msg);
|
|
825 |
ServiceFailure sf = new ServiceFailure("2151", msg);
|
|
826 |
sf.initCause(e);
|
|
827 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, sf);
|
|
828 |
throw sf;
|
767 |
829 |
}
|
768 |
|
|
769 |
|
logMetacat.info("MNodeService.getReplica() called for identifier "
|
770 |
|
+ pid.getValue());
|
771 |
|
|
|
830 |
if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
|
|
831 |
logMetacat.error("Given checksum for " + pid.getValue() +
|
|
832 |
"is " + givenChecksum.getValue());
|
|
833 |
logMetacat.error("Computed checksum for " + pid.getValue() +
|
|
834 |
"is " + computedChecksum.getValue());
|
|
835 |
String msg = "Computed checksum does not match declared checksum";
|
|
836 |
failure = new ServiceFailure("2151", msg);
|
|
837 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
838 |
throw new ServiceFailure("2151", msg);
|
|
839 |
}
|
772 |
840 |
}
|
773 |
841 |
|
774 |
|
} catch (InvalidToken e) {
|
775 |
|
String msg = "Could not retrieve object to replicate (InvalidToken): "+ e.getMessage();
|
776 |
|
failure = new ServiceFailure("2151", msg);
|
777 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
778 |
|
logMetacat.error(msg);
|
779 |
|
throw new ServiceFailure("2151", msg);
|
780 |
|
|
781 |
|
} catch (NotFound e) {
|
782 |
|
String msg = "Could not retrieve object to replicate (NotFound): "+ e.getMessage();
|
783 |
|
failure = new ServiceFailure("2151", msg);
|
784 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
785 |
|
logMetacat.error(msg);
|
786 |
|
throw new ServiceFailure("2151", msg);
|
787 |
|
|
788 |
|
} catch (NotAuthorized e) {
|
789 |
|
String msg = "Could not retrieve object to replicate (NotAuthorized): "+ e.getMessage();
|
790 |
|
failure = new ServiceFailure("2151", msg);
|
791 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
792 |
|
logMetacat.error(msg);
|
793 |
|
throw new ServiceFailure("2151", msg);
|
794 |
|
} catch (NotImplemented e) {
|
795 |
|
String msg = "Could not retrieve object to replicate (mn.getReplica NotImplemented): "+ e.getMessage();
|
796 |
|
failure = new ServiceFailure("2151", msg);
|
797 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
798 |
|
logMetacat.error(msg);
|
799 |
|
throw new ServiceFailure("2151", msg);
|
800 |
|
} catch (ServiceFailure e) {
|
801 |
|
String msg = "Could not retrieve object to replicate (ServiceFailure): "+ e.getMessage();
|
802 |
|
failure = new ServiceFailure("2151", msg);
|
803 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
804 |
|
logMetacat.error(msg);
|
805 |
|
throw new ServiceFailure("2151", msg);
|
806 |
|
} catch (InsufficientResources e) {
|
807 |
|
String msg = "Could not retrieve object to replicate (InsufficientResources): "+ e.getMessage();
|
808 |
|
failure = new ServiceFailure("2151", msg);
|
809 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
810 |
|
logMetacat.error(msg);
|
811 |
|
throw new ServiceFailure("2151", msg);
|
812 |
|
}
|
813 |
|
|
814 |
|
// verify checksum on the object, if supported
|
815 |
|
if (object.markSupported()) {
|
816 |
|
Checksum givenChecksum = sysmeta.getChecksum();
|
817 |
|
Checksum computedChecksum = null;
|
|
842 |
// add it to local store
|
|
843 |
Identifier retPid;
|
818 |
844 |
try {
|
819 |
|
computedChecksum = ChecksumUtil.checksum(object, givenChecksum.getAlgorithm());
|
820 |
|
object.reset();
|
821 |
|
|
|
845 |
// skip the MN.create -- this mutates the system metadata and we don't want it to
|
|
846 |
if ( localId == null ) {
|
|
847 |
// TODO: this will fail if we already "know" about the identifier
|
|
848 |
// FIXME: see https://redmine.dataone.org/issues/2572
|
|
849 |
retPid = super.create(session, pid, object, sysmeta);
|
|
850 |
result = (retPid.getValue().equals(pid.getValue()));
|
|
851 |
}
|
|
852 |
|
822 |
853 |
} catch (Exception e) {
|
823 |
|
String msg = "Error computing checksum on replica: " + e.getMessage();
|
824 |
|
logMetacat.error(msg);
|
825 |
|
ServiceFailure sf = new ServiceFailure("2151", msg);
|
826 |
|
sf.initCause(e);
|
827 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, sf);
|
828 |
|
throw sf;
|
829 |
|
}
|
830 |
|
if (!givenChecksum.getValue().equals(computedChecksum.getValue())) {
|
831 |
|
logMetacat.error("Given checksum for " + pid.getValue() +
|
832 |
|
"is " + givenChecksum.getValue());
|
833 |
|
logMetacat.error("Computed checksum for " + pid.getValue() +
|
834 |
|
"is " + computedChecksum.getValue());
|
835 |
|
String msg = "Computed checksum does not match declared checksum";
|
|
854 |
String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
|
836 |
855 |
failure = new ServiceFailure("2151", msg);
|
837 |
856 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
|
857 |
logMetacat.error(msg);
|
838 |
858 |
throw new ServiceFailure("2151", msg);
|
|
859 |
|
839 |
860 |
}
|
|
861 |
} finally {
|
|
862 |
IOUtils.closeQuietly(object);
|
840 |
863 |
}
|
|
864 |
|
841 |
865 |
|
842 |
|
// add it to local store
|
843 |
|
Identifier retPid;
|
844 |
|
try {
|
845 |
|
// skip the MN.create -- this mutates the system metadata and we don't want it to
|
846 |
|
if ( localId == null ) {
|
847 |
|
// TODO: this will fail if we already "know" about the identifier
|
848 |
|
// FIXME: see https://redmine.dataone.org/issues/2572
|
849 |
|
retPid = super.create(session, pid, object, sysmeta);
|
850 |
|
result = (retPid.getValue().equals(pid.getValue()));
|
851 |
|
}
|
852 |
|
|
853 |
|
} catch (Exception e) {
|
854 |
|
String msg = "Could not save object to local store (" + e.getClass().getName() + "): " + e.getMessage();
|
855 |
|
failure = new ServiceFailure("2151", msg);
|
856 |
|
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.FAILED, failure);
|
857 |
|
logMetacat.error(msg);
|
858 |
|
throw new ServiceFailure("2151", msg);
|
859 |
|
|
860 |
|
}
|
861 |
|
|
862 |
866 |
// finish by setting the replication status
|
863 |
867 |
setReplicationStatus(thisNodeSession, pid, nodeId, ReplicationStatus.COMPLETED, null);
|
864 |
868 |
return result;
|
Close the input stream object on the MN.replicate method.