40 |
40 |
//initialize db connections to handle any update requests
|
41 |
41 |
MetaCatUtil util = new MetaCatUtil();
|
42 |
42 |
deltaT = util.getOption("deltaT");
|
43 |
|
//break off a thread to do the delta-T check
|
|
43 |
//the default deltaT can be set from metacat.properties
|
|
44 |
//create a thread to do the delta-T check but don't execute it yet
|
44 |
45 |
replicationDaemon = new Timer(true);
|
45 |
|
//replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0,
|
46 |
|
// new Integer(deltaT).intValue() * 1000);
|
47 |
|
//System.out.println("timer scheduled at: " + new Integer(deltaT).intValue()
|
48 |
|
// + " seconds");
|
49 |
|
|
50 |
46 |
}
|
51 |
47 |
|
52 |
48 |
public void destroy()
|
... | ... | |
102 |
98 |
if(rate < 30)
|
103 |
99 |
{
|
104 |
100 |
out.println("Replication deltaT rate cannot be less than 30!");
|
|
101 |
//deltaT<30 is a timing mess!
|
105 |
102 |
rate = 1000;
|
106 |
103 |
}
|
107 |
104 |
}
|
... | ... | |
124 |
121 |
}
|
125 |
122 |
else if(((String[])params.get("action"))[0].equals("update"))
|
126 |
123 |
{ //request an update list from the server
|
127 |
|
if(params.contains("servercheckcode"))
|
128 |
|
{ //the servercheckcode allows this server to check for updated
|
129 |
|
//file from other servers as well as just updated files from
|
130 |
|
//this server.
|
131 |
|
System.out.println("metacatreplication: servercheckcode: " +
|
132 |
|
((int[])params.get("servercheckcode"))[0]);
|
133 |
|
handleUpdateRequest(out, params, response,
|
134 |
|
((int[])params.get("servercheckcode"))[0]);
|
135 |
|
}
|
136 |
|
else
|
137 |
|
{
|
138 |
|
handleUpdateRequest2(out, params, response);
|
139 |
|
}
|
|
124 |
handleUpdateRequest(out, params, response);
|
140 |
125 |
}
|
141 |
126 |
else if(((String[])params.get("action"))[0].equals("read"))
|
142 |
127 |
{ //request a specific document from the server
|
... | ... | |
159 |
144 |
}
|
160 |
145 |
}
|
161 |
146 |
|
|
147 |
/**
|
|
148 |
* when a forcereplication request comes in, this method sends a read request
|
|
149 |
* to the requesting server for the specified docid.
|
|
150 |
*/
|
162 |
151 |
private void handleForceReplicateRequest(PrintWriter out, Hashtable params,
|
163 |
152 |
HttpServletResponse response)
|
164 |
153 |
{
|
165 |
|
System.out.println("in handleforcereplicaterequest");
|
|
154 |
//System.out.println("in handleforcereplicaterequest");
|
166 |
155 |
String server = ((String[])params.get("server"))[0];
|
|
156 |
//the server that the request came from
|
167 |
157 |
String docid = ((String[])params.get("docid"))[0];
|
|
158 |
//the docid of the document to get
|
168 |
159 |
String dbaction = "UPDATE";
|
|
160 |
//default action is update
|
169 |
161 |
boolean override = false;
|
170 |
162 |
int serverCode = 1;
|
171 |
163 |
|
172 |
164 |
try
|
173 |
165 |
{
|
174 |
166 |
if(params.containsKey("dbaction"))
|
175 |
|
{
|
|
167 |
{ //if the url contains a dbaction then the default action is overridden
|
176 |
168 |
dbaction = ((String[])params.get("dbaction"))[0];
|
177 |
169 |
serverCode = MetacatReplication.getServerCode(server);
|
178 |
|
override = true;
|
|
170 |
override = true; //we are now overriding the default action
|
179 |
171 |
}
|
180 |
|
System.out.println("action in forcereplicate is: " + dbaction);
|
181 |
|
System.out.println("serverCode in forcereplicate is: " + serverCode);
|
|
172 |
//System.out.println("action in forcereplicate is: " + dbaction);
|
|
173 |
//System.out.println("serverCode in forcereplicate is: " + serverCode);
|
182 |
174 |
|
183 |
175 |
int serverCheckCode = MetacatReplication.getServerCode(server);
|
184 |
176 |
URL u = new URL("http://" + server + "?action=read&docid=" + docid);
|
185 |
|
System.out.println("sending message: " + u.toString());
|
|
177 |
//System.out.println("sending message: " + u.toString());
|
186 |
178 |
String xmldoc = MetacatReplication.getURLContent(u);
|
|
179 |
//get the document to write
|
187 |
180 |
URL docinfourl = new URL("http://" + server +
|
188 |
181 |
"?action=getdocumentinfo&docid=" +
|
189 |
182 |
docid);
|
190 |
|
System.out.println("sending message: " + docinfourl.toString());
|
|
183 |
//we need to get the document's info so we can set the correct user
|
|
184 |
//and group once we get the document and write it to our DB
|
|
185 |
//System.out.println("sending message: " + docinfourl.toString());
|
191 |
186 |
String docInfoStr = MetacatReplication.getURLContent(docinfourl);
|
192 |
187 |
DocInfoHandler dih = new DocInfoHandler();
|
|
188 |
//dih is the parser for the docinfo xml format
|
193 |
189 |
XMLReader docinfoParser = ReplicationHandler.initParser(dih);
|
194 |
190 |
docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
|
195 |
191 |
Hashtable docinfoHash = dih.getDocInfo();
|
196 |
192 |
String user = (String)docinfoHash.get("user_owner");
|
197 |
193 |
String group = new String(user);
|
|
194 |
//right now the user and group are the same.
|
198 |
195 |
Connection conn = util.openDBConnection();
|
199 |
196 |
DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid,
|
200 |
197 |
user, group, serverCode, override);
|
... | ... | |
258 |
255 |
|
259 |
256 |
/**
|
260 |
257 |
* Sends all of the xml_documents information encoded in xml to a requestor
|
|
258 |
* the format is:
|
|
259 |
* <!ELEMENT documentinfo (docid, docname, doctype, doctitle, user_owner,
|
|
260 |
* user_updated, public_access, rev)
|
|
261 |
* all of the subelements of document info are #PCDATA
|
261 |
262 |
*/
|
262 |
263 |
private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params,
|
263 |
264 |
HttpServletResponse response)
|
... | ... | |
275 |
276 |
sb.append("</doctitle><user_owner>").append(doc.getUserowner());
|
276 |
277 |
sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
|
277 |
278 |
sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
|
278 |
|
sb.append("</public_access></documentinfo>");
|
|
279 |
sb.append("</public_access><rev>").append(doc.getRev());
|
|
280 |
sb.append("</rev></documentinfo>");
|
279 |
281 |
response.setContentType("text/xml");
|
280 |
282 |
out.println(sb.toString());
|
281 |
283 |
conn.close();
|
... | ... | |
313 |
315 |
}
|
314 |
316 |
|
315 |
317 |
/**
|
316 |
|
* Does default handling for update requests
|
|
318 |
* Sends a list of all of the documents on this sever along with their
|
|
319 |
* revision numbers.
|
|
320 |
* The format is:
|
|
321 |
* <!ELEMENT replication (server, updates)>
|
|
322 |
* <!ELEMENT server (#PCDATA)>
|
|
323 |
* <!ELEMENT updates ((updatedDocument | deleteDocument)*)>
|
|
324 |
* <!ELEMENT updatedDocument (docid, rev)>
|
|
325 |
* <!ELEMENT deletedDocument (docid, rev)>
|
|
326 |
* <!ELEMENT docid (#PCDATA)>
|
|
327 |
* <!ELEMENT rev (#PCDATA)>
|
|
328 |
* note that the rev in deletedDocument is always empty. I just left
|
|
329 |
* it in there to make the parser implementation easier.
|
317 |
330 |
*/
|
318 |
331 |
private void handleUpdateRequest(PrintWriter out, Hashtable params,
|
319 |
|
HttpServletResponse response)
|
320 |
|
{
|
321 |
|
handleUpdateRequest(out, params, response, 1);
|
322 |
|
}
|
323 |
|
|
324 |
|
/**
|
325 |
|
* Initiates an update of all server in the xml_replication table.
|
326 |
|
* the remote host sends an update date. The local host (this method)
|
327 |
|
* queries the db for any document that was updated after the update date
|
328 |
|
* and returns a list of those documents to the remote host. It also
|
329 |
|
* sends back a list of the files that were locally deleted.
|
330 |
|
|
331 |
|
* serverCheckCode allows the requestor to request documents from this
|
332 |
|
* server that reside on a different server. Normally, this server
|
333 |
|
* should only replicate files that it owns but in the case of updating
|
334 |
|
* a file that does not belong to this server, it is needed. See
|
335 |
|
* DocumentImpl.write().
|
336 |
|
*/
|
337 |
|
private void handleUpdateRequest(PrintWriter out, Hashtable params,
|
338 |
|
HttpServletResponse response,
|
339 |
|
int serverCheckCode)
|
340 |
|
{
|
341 |
|
System.out.println("incoming update request for dt/time " +
|
342 |
|
((String[])params.get("date"))[0] +
|
343 |
|
" from external metacat");
|
344 |
|
response.setContentType("text/xml");
|
345 |
|
StringBuffer returnXML = new StringBuffer();
|
346 |
|
returnXML.append("<?xml version=\"1.0\"?><replication>");
|
347 |
|
|
348 |
|
StringBuffer sql = new StringBuffer();
|
349 |
|
String updateStr = ((String[])params.get("date"))[0];
|
350 |
|
updateStr = updateStr.replace('+', ' ');
|
351 |
|
//pseudo algorithm:
|
352 |
|
///////////////////////////////////////////////////////////////////////
|
353 |
|
//get the date/time from the requestor, query the db for any documents
|
354 |
|
//that have an update date later than the requested date and send
|
355 |
|
//those docids back to the requestor. If there are no newer documents
|
356 |
|
//then send back a null message.
|
357 |
|
///////////////////////////////////////////////////////////////////////
|
358 |
|
|
359 |
|
//Timestamp update = Timestamp.valueOf(updateStr);
|
360 |
|
SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
|
361 |
|
java.util.Date update = new java.util.Date();
|
362 |
|
ParsePosition pos = new ParsePosition(0);
|
363 |
|
update = formatter.parse(updateStr, pos);
|
364 |
|
String dateString = formatter.format(update);
|
365 |
|
sql.append("select docid, date_updated, server_location from ");
|
366 |
|
sql.append("xml_documents where date_updated > ");
|
367 |
|
sql.append("to_date('").append(dateString);
|
368 |
|
sql.append("','YY-MM-DD HH24:MI:SS')");
|
369 |
|
//System.out.println("sql: " + sql.toString());
|
370 |
|
|
371 |
|
//get any recently deleted documents
|
372 |
|
StringBuffer delsql = new StringBuffer();
|
373 |
|
delsql.append("select docid, date_updated, server_location from ");
|
374 |
|
delsql.append("xml_revisions where docid not in (select docid from ");
|
375 |
|
delsql.append("xml_documents) and date_updated > to_date('");
|
376 |
|
delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
|
377 |
|
|
378 |
|
try
|
379 |
|
{
|
380 |
|
Connection conn = util.openDBConnection();
|
381 |
|
PreparedStatement pstmt = conn.prepareStatement(sql.toString());
|
382 |
|
pstmt.execute();
|
383 |
|
ResultSet rs = pstmt.getResultSet();
|
384 |
|
boolean tablehasrows = rs.next();
|
385 |
|
|
386 |
|
//if a '1' should not represent localhost, add code here to query
|
387 |
|
//xml_replication for the proper serverid number
|
388 |
|
|
389 |
|
returnXML.append("<server>").append(util.getOption("server"));
|
390 |
|
returnXML.append(util.getOption("replicationpath"));
|
391 |
|
returnXML.append("</server><updates>");
|
392 |
|
while(tablehasrows)
|
393 |
|
{
|
394 |
|
String docid = rs.getString(1);
|
395 |
|
String dateUpdated = rs.getString(2);
|
396 |
|
int serverCode = rs.getInt(3);
|
397 |
|
if(serverCode == serverCheckCode)
|
398 |
|
{ //check that this document is from this server.
|
399 |
|
//servers only replicate their own documents!
|
400 |
|
returnXML.append("<updatedDocument><docid>").append(docid);
|
401 |
|
returnXML.append("</docid><date_updated>").append(dateUpdated);
|
402 |
|
returnXML.append("</date_updated></updatedDocument>");
|
403 |
|
}
|
404 |
|
tablehasrows = rs.next();
|
405 |
|
}
|
406 |
|
|
407 |
|
pstmt = conn.prepareStatement(delsql.toString());
|
408 |
|
pstmt.execute();
|
409 |
|
rs = pstmt.getResultSet();
|
410 |
|
tablehasrows = rs.next();
|
411 |
|
while(tablehasrows)
|
412 |
|
{ //handle the deleted documents
|
413 |
|
String docid = rs.getString(1);
|
414 |
|
String dateUpdated = rs.getString(2);
|
415 |
|
int serverCode = rs.getInt(3);
|
416 |
|
if(serverCode == 1)
|
417 |
|
{
|
418 |
|
returnXML.append("<deletedDocument><docid>").append(docid);
|
419 |
|
returnXML.append("</docid><date_updated>").append(dateUpdated);
|
420 |
|
returnXML.append("</date_updated></deletedDocument>");
|
421 |
|
}
|
422 |
|
tablehasrows = rs.next();
|
423 |
|
}
|
424 |
|
|
425 |
|
returnXML.append("</updates></replication>");
|
426 |
|
conn.close();
|
427 |
|
//System.out.println(returnXML.toString());
|
428 |
|
out.print(returnXML.toString());
|
429 |
|
}
|
430 |
|
catch(Exception e)
|
431 |
|
{
|
432 |
|
System.out.println("Exception in metacatReplication: " + e.getMessage());
|
433 |
|
}
|
434 |
|
}
|
435 |
|
|
436 |
|
/**
|
437 |
|
* Sends an update list based on rev numbers instead of dates.
|
438 |
|
*/
|
439 |
|
private void handleUpdateRequest2(PrintWriter out, Hashtable params,
|
440 |
332 |
HttpServletResponse response)
|
441 |
333 |
{
|
442 |
334 |
System.out.println("in handleUpdateRequest2");
|
... | ... | |
484 |
376 |
{ //handle the deleted documents
|
485 |
377 |
doclist.append("<deletedDocument><docid>").append(rs.getString(1));
|
486 |
378 |
doclist.append("</docid><rev></rev></deletedDocument>");
|
|
379 |
//note that rev is always empty for deleted docs
|
487 |
380 |
tablehasrows = rs.next();
|
488 |
381 |
}
|
489 |
382 |
|
... | ... | |
518 |
411 |
}
|
519 |
412 |
|
520 |
413 |
/**
|
521 |
|
* This method is what is run when a seperate thread is broken off to handle
|
522 |
|
* inserting a document from a remote replicated server.
|
|
414 |
* this method handles the timeout for a file lock. when a lock is
|
|
415 |
* granted it is granted for 30 seconds. When this thread runs out
|
|
416 |
* it deletes the docid from the queue, thus eliminating the lock.
|
523 |
417 |
*/
|
524 |
418 |
public void run()
|
525 |
419 |
{
|
526 |
420 |
try
|
527 |
421 |
{
|
528 |
|
System.out.println("thread started for docid: " +
|
529 |
|
(String)fileLocks.elementAt(0));
|
|
422 |
//System.out.println("thread started for docid: " +
|
|
423 |
// (String)fileLocks.elementAt(0));
|
530 |
424 |
Thread.sleep(30000); //the lock will expire in 30 seconds
|
531 |
|
System.out.println("thread for docid: " +
|
532 |
|
(String)fileLocks.elementAt(fileLocks.size() - 1) +
|
533 |
|
" exiting.");
|
|
425 |
//System.out.println("thread for docid: " +
|
|
426 |
// (String)fileLocks.elementAt(fileLocks.size() - 1) +
|
|
427 |
// " exiting.");
|
534 |
428 |
fileLocks.remove(fileLocks.size() - 1);
|
535 |
429 |
//fileLocks is treated as a FIFO queue. If there are more than one lock
|
536 |
430 |
//in the vector, the first one inserted will be removed.
|
cleaned up code, added more complete documentation of replication algorithms and datastructures.