Project

General

Profile

« Previous | Next » 

Revision 6419

use pending replication task queue to check if node is authorized for replication. moved from old ReplicationService code

View differences:

CNodeService.java
25 25

  
26 26
import java.util.Date;
27 27
import java.util.List;
28
import java.util.Set;
28 29

  
29 30
import org.apache.log4j.Logger;
30 31
import org.dataone.configuration.Settings;
......
65 66
import com.hazelcast.core.Hazelcast;
66 67
import com.hazelcast.core.IMap;
67 68
import com.hazelcast.core.IQueue;
69
import com.hazelcast.query.SqlPredicate;
68 70

  
69 71
import edu.ucsb.nceas.metacat.EventLog;
70 72
import edu.ucsb.nceas.metacat.IdentifierManager;
......
107 109
  /* The Hazelcast distributed task id generator namespace */
108 110
  private String taskIds;
109 111
  
112
  /* The name of the pending replication tasks map */
113
  private String pendingTasksQueue;
114
  
110 115
  /* The Hazelcast distributed system metadata map */
111 116
  private IMap<NodeReference, Node> nodes;
112 117

  
113 118
  /* The Hazelcast distributed system metadata map */
114 119
  private IMap<Identifier, SystemMetadata> systemMetadata;
120
  
121
  /* The Hazelcast distributed pending replication tasks map*/
122
  private IMap<String, CNReplicationTask> pendingReplicationTasks;
115 123

  
116 124
  /* the logger instance */
117 125
  private Logger logMetacat = null;
......
150 158
        PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadata");
151 159
      taskIds = 
152 160
        PropertyService.getProperty("dataone.hazelcast.storageCluster.tasksIdGenerator");
161
      pendingTasksQueue = 
162
    	PropertyService.getProperty("dataone.hazelcast.replicationPendingTasks");
153 163
      
154 164
      // Become a DataONE-process cluster client
155 165
      String[] addresses = addressList.split(",");
156 166
      hzClient = 
157 167
        HazelcastClient.newHazelcastClient(this.groupName, this.groupPassword, addresses);
158 168
      nodes = hzClient.getMap(nodeMap);
159

  
169
      pendingReplicationTasks = hzClient.getMap(pendingTasksQueue);
170
      
160 171
      // Get a reference to the shared system metadata map as a cluster member
161 172
      systemMetadata = Hazelcast.getMap(systemMetadataMap);
162

  
173
      
163 174
      // Listen for changes to the system metadata map
164 175
      systemMetadata.addEntryListener(this, true);
165 176
      
......
826 837
    throws NotImplemented, NotAuthorized, InvalidToken, ServiceFailure, 
827 838
    NotFound, InvalidRequest {
828 839

  
829
    throw new NotImplemented("4870", "isReplicationAuthorized not implemented");
840
	// build a predicate like: 
841
	    // "pid                    = '{pid}                   ' AND 
842
	    //  pemission              = '{permission}            ' AND
843
	    //  originatingNodeSubject = '{originatingNodeSubject}' AND
844
	    //  targetNodeSubject      = '{targetNodeSubject}     '"
845
	    boolean isAllowed = false;
846
	    String query = "";
847
	    query += "pid = '";
848
	    query += pid;
849
	    query += "' AND permission = '";
850
	    query += replicatePermission.name();
851
	    query += "' AND originatingNodeSubject = '";
852
	    query += originatingNodeSession.getSubject().getValue();
853
	    query += "' AND targetNodeSubject = '";
854
	    query += targetNodeSubject.getValue();
855
	    query += "'";
856
	    
857
	    logMetacat.debug("Pending replication task query is: " + query);
858
	    // search the hzPendingReplicationTasks map for the  originating node subject, 
859
	    // target node subject, pid, and replicate permission
860
	    
861
	    Set<CNReplicationTask> tasks = 
862
	      (Set<CNReplicationTask>) this.pendingReplicationTasks.values(new SqlPredicate(query));
863
	    
864
	    // do we have a matching task?
865
	    if ( tasks.size() >= 1 ) {
866
	      isAllowed = true;
867
	      
868
	    }
869
	    
870
	    return isAllowed;
830 871
    
831 872
  }
832 873

  

Also available in: Unified diff