Revision 6419
Added by ben leinfelder about 13 years ago
src/edu/ucsb/nceas/metacat/dataone/CNodeService.java | ||
---|---|---|
25 | 25 |
|
26 | 26 |
import java.util.Date; |
27 | 27 |
import java.util.List; |
28 |
import java.util.Set; |
|
28 | 29 |
|
29 | 30 |
import org.apache.log4j.Logger; |
30 | 31 |
import org.dataone.configuration.Settings; |
... | ... | |
65 | 66 |
import com.hazelcast.core.Hazelcast; |
66 | 67 |
import com.hazelcast.core.IMap; |
67 | 68 |
import com.hazelcast.core.IQueue; |
69 |
import com.hazelcast.query.SqlPredicate; |
|
68 | 70 |
|
69 | 71 |
import edu.ucsb.nceas.metacat.EventLog; |
70 | 72 |
import edu.ucsb.nceas.metacat.IdentifierManager; |
... | ... | |
107 | 109 |
/* The Hazelcast distributed task id generator namespace */ |
108 | 110 |
private String taskIds; |
109 | 111 |
|
112 |
/* The name of the pending replication tasks map */ |
|
113 |
private String pendingTasksQueue; |
|
114 |
|
|
110 | 115 |
/* The Hazelcast distributed system metadata map */ |
111 | 116 |
private IMap<NodeReference, Node> nodes; |
112 | 117 |
|
113 | 118 |
/* The Hazelcast distributed system metadata map */ |
114 | 119 |
private IMap<Identifier, SystemMetadata> systemMetadata; |
120 |
|
|
121 |
/* The Hazelcast distributed pending replication tasks map*/ |
|
122 |
private IMap<String, CNReplicationTask> pendingReplicationTasks; |
|
115 | 123 |
|
116 | 124 |
/* the logger instance */ |
117 | 125 |
private Logger logMetacat = null; |
... | ... | |
150 | 158 |
PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadata"); |
151 | 159 |
taskIds = |
152 | 160 |
PropertyService.getProperty("dataone.hazelcast.storageCluster.tasksIdGenerator"); |
161 |
pendingTasksQueue = |
|
162 |
PropertyService.getProperty("dataone.hazelcast.replicationPendingTasks"); |
|
153 | 163 |
|
154 | 164 |
// Become a DataONE-process cluster client |
155 | 165 |
String[] addresses = addressList.split(","); |
156 | 166 |
hzClient = |
157 | 167 |
HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses); |
158 | 168 |
nodes = hzClient.getMap(nodeMap); |
159 |
|
|
169 |
pendingReplicationTasks = hzClient.getMap(pendingTasksQueue); |
|
170 |
|
|
160 | 171 |
// Get a reference to the shared system metadata map as a cluster member |
161 | 172 |
systemMetadata = Hazelcast.getMap(systemMetadataMap); |
162 |
|
|
173 |
|
|
163 | 174 |
// Listen for changes to the system metadata map |
164 | 175 |
systemMetadata.addEntryListener(this, true); |
165 | 176 |
|
... | ... | |
826 | 837 |
throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, |
827 | 838 |
NotFound, InvalidRequest { |
828 | 839 |
|
829 |
throw new NotImplemented("4870", "isReplicationAuthorized not implemented"); |
|
840 |
// build a predicate like: |
|
841 |
// "pid = '{pid} ' AND |
|
842 |
// pemission = '{permission} ' AND |
|
843 |
// originatingNodeSubject = '{originatingNodeSubject}' AND |
|
844 |
// targetNodeSubject = '{targetNodeSubject} '" |
|
845 |
boolean isAllowed = false; |
|
846 |
String query = ""; |
|
847 |
query += "pid = '"; |
|
848 |
query += pid; |
|
849 |
query += "' AND permission = '"; |
|
850 |
query += replicatePermission.name(); |
|
851 |
query += "' AND originatingNodeSubject = '"; |
|
852 |
query += originatingNodeSession.getSubject().getValue(); |
|
853 |
query += "' AND targetNodeSubject = '"; |
|
854 |
query += targetNodeSubject.getValue(); |
|
855 |
query += "'"; |
|
856 |
|
|
857 |
logMetacat.debug("Pending replication task query is: " + query); |
|
858 |
// search the hzPendingReplicationTasks map for the originating node subject, |
|
859 |
// target node subject, pid, and replicate permission |
|
860 |
|
|
861 |
Set<CNReplicationTask> tasks = |
|
862 |
(Set<CNReplicationTask>) this.pendingReplicationTasks.values(new SqlPredicate(query)); |
|
863 |
|
|
864 |
// do we have a matching task? |
|
865 |
if ( tasks.size() >= 1 ) { |
|
866 |
isAllowed = true; |
|
867 |
|
|
868 |
} |
|
869 |
|
|
870 |
return isAllowed; |
|
830 | 871 |
|
831 | 872 |
} |
832 | 873 |
|
Also available in: Unified diff
use pending replication task queue to check if node is authorized for replication. moved from old ReplicationService code