Revision 9203
Added by rnahf over 9 years ago
src/edu/ucsb/nceas/metacat/replication/ReplicationHandler.java | ||
---|---|---|
40 | 40 |
import java.text.ParseException; |
41 | 41 |
import java.util.Date; |
42 | 42 |
import java.util.Hashtable; |
43 |
import java.util.Iterator; |
|
43 | 44 |
import java.util.TimerTask; |
44 | 45 |
import java.util.Vector; |
45 | 46 |
|
... | ... | |
144 | 145 |
*/ |
145 | 146 |
private void update() |
146 | 147 |
{ |
147 |
|
|
148 |
_xmlDocQueryCount = 0; |
|
149 |
_xmlRevQueryCount = 0; |
|
150 |
_xmlDocQueryTime = 0; |
|
151 |
_xmlRevQueryTime = 0; |
|
152 |
/* |
|
148 |
Vector<InputStream> responses = new Vector<InputStream>(); |
|
149 |
try { |
|
150 |
|
|
151 |
_xmlDocQueryCount = 0; |
|
152 |
_xmlRevQueryCount = 0; |
|
153 |
_xmlDocQueryTime = 0; |
|
154 |
_xmlRevQueryTime = 0; |
|
155 |
/* |
|
153 | 156 |
Pseudo-algorithm |
154 | 157 |
- request a doc list from each server in xml_replication |
155 | 158 |
- check the rev number of each of those documents agains the |
... | ... | |
161 | 164 |
- update last_checked to keep track of the last time it was checked. |
162 | 165 |
(this info is theoretically not needed using this system but probably |
163 | 166 |
should be kept anyway) |
164 |
*/ |
|
167 |
*/
|
|
165 | 168 |
|
166 |
ReplicationServer replServer = null; // Variable to store the |
|
167 |
// ReplicationServer got from |
|
168 |
// Server list |
|
169 |
String server = null; // Variable to store server name |
|
170 |
// String update; |
|
171 |
Vector<InputStream> responses = new Vector<InputStream>(); |
|
172 |
URL u; |
|
173 |
long replicationStartTime = System.currentTimeMillis(); |
|
174 |
long timeToGetServerList = 0; |
|
175 |
|
|
176 |
//Check for every server in server list to get updated list and put |
|
177 |
// them in to response |
|
178 |
long startTimeToGetServers = System.currentTimeMillis(); |
|
179 |
for (int i=0; i<serverList.size(); i++) |
|
180 |
{ |
|
181 |
// Get ReplicationServer object from server list |
|
182 |
replServer = serverList.serverAt(i); |
|
183 |
// Get server name from ReplicationServer object |
|
184 |
server = replServer.getServerName().trim(); |
|
185 |
InputStream result = null; |
|
186 |
logReplication.info("ReplicationHandler.update - full update started to: " + server); |
|
187 |
// Send command to that server to get updated docid information |
|
188 |
try |
|
169 |
ReplicationServer replServer = null; // Variable to store the |
|
170 |
// ReplicationServer got from |
|
171 |
// Server list |
|
172 |
String server = null; // Variable to store server name |
|
173 |
// String update; |
|
174 |
|
|
175 |
URL u; |
|
176 |
long replicationStartTime = System.currentTimeMillis(); |
|
177 |
long timeToGetServerList = 0; |
|
178 |
|
|
179 |
//Check for every server in server list to get updated list and put |
|
180 |
// them in to response |
|
181 |
long startTimeToGetServers = System.currentTimeMillis(); |
|
182 |
for (int i=0; i<serverList.size(); i++) |
|
189 | 183 |
{ |
190 |
u = new URL("https://" + server + "?server=" |
|
191 |
+MetacatUtil.getLocalReplicationServerName()+"&action=update"); |
|
192 |
logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString()); |
|
193 |
result = ReplicationService.getURLStream(u); |
|
184 |
// Get ReplicationServer object from server list |
|
185 |
replServer = serverList.serverAt(i); |
|
186 |
// Get server name from ReplicationServer object |
|
187 |
server = replServer.getServerName().trim(); |
|
188 |
InputStream result = null; |
|
189 |
logReplication.info("ReplicationHandler.update - full update started to: " + server); |
|
190 |
// Send command to that server to get updated docid information |
|
191 |
try |
|
192 |
{ |
|
193 |
u = new URL("https://" + server + "?server=" |
|
194 |
+MetacatUtil.getLocalReplicationServerName()+"&action=update"); |
|
195 |
logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString()); |
|
196 |
result = ReplicationService.getURLStream(u); |
|
197 |
} |
|
198 |
catch (Exception e) |
|
199 |
{ |
|
200 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
201 |
logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+ |
|
202 |
"for server " + server + " because "+e.getMessage()); |
|
203 |
continue; |
|
204 |
} |
|
205 |
|
|
206 |
//logReplication.info("ReplicationHandler.update - docid: "+server+" "+result); |
|
207 |
//check if result have error or not, if has skip it. |
|
208 |
// TODO: check for error in stream |
|
209 |
//if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) { |
|
210 |
if (result == null) { |
|
211 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
212 |
logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+ |
|
213 |
"for server " + server + " because "+result); |
|
214 |
continue; |
|
215 |
} |
|
216 |
//Add result to vector |
|
217 |
responses.add(result); |
|
194 | 218 |
} |
195 |
catch (Exception e) |
|
219 |
timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers; |
|
220 |
|
|
221 |
//make sure that there is updated file list |
|
222 |
//If response is null, metacat don't need do anything |
|
223 |
if (responses==null || responses.isEmpty()) |
|
196 | 224 |
{ |
197 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
198 |
logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
|
|
199 |
"for server " + server + " because "+e.getMessage());
|
|
200 |
continue;
|
|
225 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
|
|
226 |
logReplication.info( "ReplicationHandler.update - No updated doc list for "+
|
|
227 |
"every server and failed to replicate");
|
|
228 |
return;
|
|
201 | 229 |
} |
202 | 230 |
|
203 |
//logReplication.info("ReplicationHandler.update - docid: "+server+" "+result); |
|
204 |
//check if result have error or not, if has skip it. |
|
205 |
// TODO: check for error in stream |
|
206 |
//if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) { |
|
207 |
if (result == null) { |
|
208 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
209 |
logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+ |
|
210 |
"for server " + server + " because "+result); |
|
211 |
continue; |
|
212 |
} |
|
213 |
//Add result to vector |
|
214 |
responses.add(result); |
|
215 |
} |
|
216 |
timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers; |
|
217 | 231 |
|
218 |
//make sure that there is updated file list |
|
219 |
//If response is null, metacat don't need do anything |
|
220 |
if (responses==null || responses.isEmpty()) |
|
221 |
{ |
|
222 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
223 |
logReplication.info( "ReplicationHandler.update - No updated doc list for "+ |
|
224 |
"every server and failed to replicate"); |
|
225 |
return; |
|
226 |
} |
|
232 |
//logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+ |
|
233 |
// "document information: "+ responses.toString()); |
|
227 | 234 |
|
235 |
long totalServerListParseTime = 0; |
|
236 |
// go through response vector(it contains updated vector and delete vector |
|
237 |
for(int i=0; i<responses.size(); i++) |
|
238 |
{ |
|
239 |
long startServerListParseTime = System.currentTimeMillis(); |
|
240 |
XMLReader parser; |
|
241 |
ReplMessageHandler message = new ReplMessageHandler(); |
|
242 |
try |
|
243 |
{ |
|
244 |
parser = initParser(message); |
|
245 |
} |
|
246 |
catch (Exception e) |
|
247 |
{ |
|
248 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
249 |
logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " + |
|
250 |
" initParser for message and " +e.getMessage()); |
|
251 |
// stop replication |
|
252 |
return; |
|
253 |
} |
|
228 | 254 |
|
229 |
//logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+ |
|
230 |
// "document information: "+ responses.toString()); |
|
231 |
|
|
232 |
long totalServerListParseTime = 0; |
|
233 |
// go through response vector(it contains updated vector and delete vector |
|
234 |
for(int i=0; i<responses.size(); i++) |
|
235 |
{ |
|
236 |
long startServerListParseTime = System.currentTimeMillis(); |
|
237 |
XMLReader parser; |
|
238 |
ReplMessageHandler message = new ReplMessageHandler(); |
|
239 |
try |
|
255 |
try |
|
256 |
{ |
|
257 |
parser.parse(new InputSource(responses.elementAt(i))); |
|
258 |
} |
|
259 |
catch(Exception e) |
|
260 |
{ |
|
261 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
262 |
logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+ |
|
263 |
"because "+ e.getMessage()); |
|
264 |
continue; |
|
265 |
} |
|
266 |
finally |
|
267 |
{ |
|
268 |
IOUtils.closeQuietly(responses.elementAt(i)); |
|
269 |
} |
|
270 |
//v is the list of updated documents |
|
271 |
Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect()); |
|
272 |
logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName()); |
|
273 |
//d is the list of deleted documents |
|
274 |
Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect()); |
|
275 |
logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName()); |
|
276 |
logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName()); |
|
277 |
logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName()); |
|
278 |
// go though every element in updated document vector |
|
279 |
handleDocList(updateList, DocumentImpl.DOCUMENTTABLE); |
|
280 |
//handle deleted docs |
|
281 |
for(int k=0; k<deleteList.size(); k++) |
|
282 |
{ //delete the deleted documents; |
|
283 |
Vector<String> w = new Vector<String>(deleteList.elementAt(k)); |
|
284 |
String docId = (String)w.elementAt(0); |
|
285 |
try |
|
286 |
{ |
|
287 |
handleDeleteSingleDocument(docId, server); |
|
288 |
} |
|
289 |
catch (Exception ee) |
|
290 |
{ |
|
291 |
continue; |
|
292 |
} |
|
293 |
}//for delete docs |
|
294 |
|
|
295 |
// handle replicate doc in xml_revision |
|
296 |
Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect()); |
|
297 |
logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName()); |
|
298 |
handleDocList(revisionList, DocumentImpl.REVISIONTABLE); |
|
299 |
DOCINSERTNUMBER = 1; |
|
300 |
DOCERRORNUMBER = 1; |
|
301 |
REVINSERTNUMBER = 1; |
|
302 |
REVERRORNUMBER = 1; |
|
303 |
|
|
304 |
// handle system metadata |
|
305 |
Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect(); |
|
306 |
for(int k = 0; k < systemMetadataList.size(); k++) { |
|
307 |
Vector<String> w = systemMetadataList.elementAt(k); |
|
308 |
String guid = (String) w.elementAt(0); |
|
309 |
String remoteserver = (String) w.elementAt(1); |
|
310 |
try { |
|
311 |
handleSystemMetadata(remoteserver, guid); |
|
312 |
} |
|
313 |
catch (Exception ee) { |
|
314 |
logMetacat.error("Error replicating system metedata for guid: " + guid, ee); |
|
315 |
continue; |
|
316 |
} |
|
317 |
} |
|
318 |
|
|
319 |
totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime); |
|
320 |
}//for response |
|
321 |
|
|
322 |
//updated last_checked |
|
323 |
for (int i=0;i<serverList.size(); i++) |
|
240 | 324 |
{ |
241 |
parser = initParser(message); |
|
242 |
} |
|
243 |
catch (Exception e) |
|
244 |
{ |
|
245 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
246 |
logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " + |
|
247 |
" initParser for message and " +e.getMessage()); |
|
248 |
// stop replication |
|
249 |
return; |
|
250 |
} |
|
251 |
|
|
252 |
try |
|
253 |
{ |
|
254 |
parser.parse(new InputSource(responses.elementAt(i))); |
|
255 |
} |
|
256 |
catch(Exception e) |
|
257 |
{ |
|
258 |
logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG); |
|
259 |
logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+ |
|
260 |
"because "+ e.getMessage()); |
|
261 |
continue; |
|
262 |
} |
|
263 |
finally |
|
264 |
{ |
|
265 |
IOUtils.closeQuietly(responses.elementAt(i)); |
|
266 |
} |
|
267 |
//v is the list of updated documents |
|
268 |
Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect()); |
|
269 |
logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName()); |
|
270 |
//d is the list of deleted documents |
|
271 |
Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect()); |
|
272 |
logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName()); |
|
273 |
logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName()); |
|
274 |
logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName()); |
|
275 |
// go though every element in updated document vector |
|
276 |
handleDocList(updateList, DocumentImpl.DOCUMENTTABLE); |
|
277 |
//handle deleted docs |
|
278 |
for(int k=0; k<deleteList.size(); k++) |
|
279 |
{ //delete the deleted documents; |
|
280 |
Vector<String> w = new Vector<String>(deleteList.elementAt(k)); |
|
281 |
String docId = (String)w.elementAt(0); |
|
282 |
try |
|
283 |
{ |
|
284 |
handleDeleteSingleDocument(docId, server); |
|
285 |
} |
|
286 |
catch (Exception ee) |
|
287 |
{ |
|
288 |
continue; |
|
289 |
} |
|
290 |
}//for delete docs |
|
291 |
|
|
292 |
// handle replicate doc in xml_revision |
|
293 |
Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect()); |
|
294 |
logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName()); |
|
295 |
handleDocList(revisionList, DocumentImpl.REVISIONTABLE); |
|
296 |
DOCINSERTNUMBER = 1; |
|
297 |
DOCERRORNUMBER = 1; |
|
298 |
REVINSERTNUMBER = 1; |
|
299 |
REVERRORNUMBER = 1; |
|
300 |
|
|
301 |
// handle system metadata |
|
302 |
Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect(); |
|
303 |
for(int k = 0; k < systemMetadataList.size(); k++) { |
|
304 |
Vector<String> w = systemMetadataList.elementAt(k); |
|
305 |
String guid = (String) w.elementAt(0); |
|
306 |
String remoteserver = (String) w.elementAt(1); |
|
307 |
try { |
|
308 |
handleSystemMetadata(remoteserver, guid); |
|
309 |
} |
|
310 |
catch (Exception ee) { |
|
311 |
logMetacat.error("Error replicating system metedata for guid: " + guid, ee); |
|
312 |
continue; |
|
313 |
} |
|
314 |
} |
|
315 |
|
|
316 |
totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime); |
|
317 |
}//for response |
|
325 |
// Get ReplicationServer object from server list |
|
326 |
replServer = serverList.serverAt(i); |
|
327 |
try |
|
328 |
{ |
|
329 |
updateLastCheckTimeForSingleServer(replServer); |
|
330 |
} |
|
331 |
catch(Exception e) |
|
332 |
{ |
|
333 |
continue; |
|
334 |
} |
|
335 |
}//for |
|
318 | 336 |
|
319 |
//updated last_checked |
|
320 |
for (int i=0;i<serverList.size(); i++) |
|
321 |
{ |
|
322 |
// Get ReplicationServer object from server list |
|
323 |
replServer = serverList.serverAt(i); |
|
324 |
try |
|
325 |
{ |
|
326 |
updateLastCheckTimeForSingleServer(replServer); |
|
327 |
} |
|
328 |
catch(Exception e) |
|
329 |
{ |
|
330 |
continue; |
|
331 |
} |
|
332 |
}//for |
|
333 |
|
|
334 |
long replicationEndTime = System.currentTimeMillis(); |
|
335 |
logMetacat.debug("ReplicationHandler.update - Total replication time: " + |
|
336 |
(replicationEndTime - replicationStartTime)); |
|
337 |
logMetacat.debug("ReplicationHandler.update - time to get server list: " + |
|
338 |
timeToGetServerList); |
|
339 |
logMetacat.debug("ReplicationHandler.update - server list parse time: " + |
|
340 |
totalServerListParseTime); |
|
341 |
logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + |
|
342 |
_xmlDocQueryCount); |
|
343 |
logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + |
|
344 |
_xmlDocQueryTime + " ms"); |
|
345 |
logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + |
|
346 |
_xmlRevQueryCount); |
|
347 |
logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + |
|
348 |
_xmlRevQueryTime + " ms");; |
|
337 |
long replicationEndTime = System.currentTimeMillis(); |
|
338 |
logMetacat.debug("ReplicationHandler.update - Total replication time: " + |
|
339 |
(replicationEndTime - replicationStartTime)); |
|
340 |
logMetacat.debug("ReplicationHandler.update - time to get server list: " + |
|
341 |
timeToGetServerList); |
|
342 |
logMetacat.debug("ReplicationHandler.update - server list parse time: " + |
|
343 |
totalServerListParseTime); |
|
344 |
logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + |
|
345 |
_xmlDocQueryCount); |
|
346 |
logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + |
|
347 |
_xmlDocQueryTime + " ms"); |
|
348 |
logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + |
|
349 |
_xmlRevQueryCount); |
|
350 |
logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + |
|
351 |
_xmlRevQueryTime + " ms");; |
|
349 | 352 |
|
350 |
}//update |
|
353 |
} finally { // need to close all inputstreams unconditionally |
|
354 |
Iterator<InputStream> isit = responses.iterator(); |
|
355 |
while (isit.hasNext()) { |
|
356 |
IOUtils.closeQuietly(isit.next()); |
|
357 |
} |
|
358 |
} |
|
359 |
}//update |
|
351 | 360 |
|
352 | 361 |
/* Handle replicate single xml document*/ |
353 | 362 |
private void handleSingleXMLDocument(String remoteserver, String actions, |
Also available in: Unified diff
fixes #7092: added finally clause to ReplicationHandler.update method to unconditionally close the inputStreams it keeps in a Vector. Needs testing.