Revision 577
Added by berkley about 24 years ago
src/edu/ucsb/nceas/metacat/MetacatReplication.java | ||
---|---|---|
125 | 125 |
else if(((String[])params.get("action"))[0].equals("update")) |
126 | 126 |
{ //request an update list from the server |
127 | 127 |
if(params.contains("servercheckcode")) |
128 |
{ |
|
128 |
{ //the servercheckcode allows this server to check for updated |
|
129 |
//file from other servers as well as just updated files from |
|
130 |
//this server. |
|
129 | 131 |
System.out.println("metacatreplication: servercheckcode: " + |
130 | 132 |
((int[])params.get("servercheckcode"))[0]); |
131 | 133 |
handleUpdateRequest(out, params, response, |
... | ... | |
133 | 135 |
} |
134 | 136 |
else |
135 | 137 |
{ |
136 |
handleUpdateRequest(out, params, response); |
|
138 |
handleUpdateRequest2(out, params, response);
|
|
137 | 139 |
} |
138 | 140 |
} |
139 | 141 |
else if(((String[])params.get("action"))[0].equals("read")) |
... | ... | |
163 | 165 |
System.out.println("in handleforcereplicaterequest"); |
164 | 166 |
String server = ((String[])params.get("server"))[0]; |
165 | 167 |
String docid = ((String[])params.get("docid"))[0]; |
168 |
String dbaction = "UPDATE"; |
|
169 |
boolean override = false; |
|
170 |
int serverCode = 1; |
|
171 |
|
|
166 | 172 |
try |
167 | 173 |
{ |
174 |
if(params.containsKey("dbaction")) |
|
175 |
{ |
|
176 |
dbaction = ((String[])params.get("dbaction"))[0]; |
|
177 |
serverCode = MetacatReplication.getServerCode(server); |
|
178 |
override = true; |
|
179 |
} |
|
180 |
System.out.println("action in forcereplicate is: " + dbaction); |
|
181 |
System.out.println("serverCode in forcereplicate is: " + serverCode); |
|
182 |
|
|
168 | 183 |
int serverCheckCode = MetacatReplication.getServerCode(server); |
169 | 184 |
URL u = new URL("http://" + server + "?action=read&docid=" + docid); |
170 | 185 |
System.out.println("sending message: " + u.toString()); |
... | ... | |
181 | 196 |
String user = (String)docinfoHash.get("user_owner"); |
182 | 197 |
String group = new String(user); |
183 | 198 |
Connection conn = util.openDBConnection(); |
184 |
DocumentImpl.write(conn, new StringReader(xmldoc), "UPDATE", docid, user,
|
|
185 |
group, 1);
|
|
199 |
DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid,
|
|
200 |
user, group, serverCode, override);
|
|
186 | 201 |
conn.close(); |
187 | 202 |
} |
188 | 203 |
catch(Exception e) |
... | ... | |
426 | 441 |
} |
427 | 442 |
|
428 | 443 |
/** |
444 |
* Sends an update list based on rev numbers instead of dates. |
|
445 |
*/ |
|
446 |
private void handleUpdateRequest2(PrintWriter out, Hashtable params, |
|
447 |
HttpServletResponse response) |
|
448 |
{ |
|
449 |
System.out.println("in handleUpdateRequest2"); |
|
450 |
try |
|
451 |
{ |
|
452 |
System.out.println("received update request"); |
|
453 |
StringBuffer docsql = new StringBuffer(); |
|
454 |
StringBuffer doclist = new StringBuffer(); |
|
455 |
|
|
456 |
//get all docs that reside on this server |
|
457 |
doclist.append("<?xml version=\"1.0\"?><replication>"); |
|
458 |
doclist.append("<server>").append(util.getOption("server")); |
|
459 |
doclist.append(util.getOption("replicationpath")); |
|
460 |
doclist.append("</server><updates>"); |
|
461 |
|
|
462 |
docsql.append("select docid, rev from xml_documents where "); |
|
463 |
docsql.append("server_location = 1"); |
|
464 |
|
|
465 |
//get any deleted documents |
|
466 |
StringBuffer delsql = new StringBuffer(); |
|
467 |
delsql.append("select distinct docid from "); |
|
468 |
delsql.append("xml_revisions where docid not in (select docid from "); |
|
469 |
delsql.append("xml_documents) and server_location = 1"); |
|
470 |
|
|
471 |
Connection conn = util.openDBConnection(); |
|
472 |
PreparedStatement pstmt = conn.prepareStatement(docsql.toString()); |
|
473 |
pstmt.execute(); |
|
474 |
ResultSet rs = pstmt.getResultSet(); |
|
475 |
boolean tablehasrows = rs.next(); |
|
476 |
while(tablehasrows) |
|
477 |
{ |
|
478 |
doclist.append("<updatedDocument>"); |
|
479 |
doclist.append("<docid>").append(rs.getString(1)); |
|
480 |
doclist.append("</docid><rev>").append(rs.getInt(2)); |
|
481 |
doclist.append("</rev>"); |
|
482 |
doclist.append("</updatedDocument>"); |
|
483 |
tablehasrows = rs.next(); |
|
484 |
} |
|
485 |
|
|
486 |
pstmt = conn.prepareStatement(delsql.toString()); |
|
487 |
pstmt.execute(); |
|
488 |
rs = pstmt.getResultSet(); |
|
489 |
tablehasrows = rs.next(); |
|
490 |
while(tablehasrows) |
|
491 |
{ //handle the deleted documents |
|
492 |
doclist.append("<deletedDocument><docid>").append(rs.getString(1)); |
|
493 |
doclist.append("</docid><rev></rev></deletedDocument>"); |
|
494 |
tablehasrows = rs.next(); |
|
495 |
} |
|
496 |
|
|
497 |
doclist.append("</updates></replication>"); |
|
498 |
//System.out.println("doclist: " + doclist.toString()); |
|
499 |
conn.close(); |
|
500 |
response.setContentType("text/xml"); |
|
501 |
out.println(doclist.toString()); |
|
502 |
} |
|
503 |
catch(Exception e) |
|
504 |
{ |
|
505 |
System.out.println("error in handleupdaterequest2: " + e.getMessage()); |
|
506 |
e.printStackTrace(System.out); |
|
507 |
} |
|
508 |
|
|
509 |
} |
|
510 |
|
|
511 |
/** |
|
429 | 512 |
* Sends the current system date to the remote server. Using this action |
430 | 513 |
* for replication gets rid of any problems with syncronizing clocks |
431 | 514 |
* because a time specific to a document is always kept on its home server. |
src/edu/ucsb/nceas/metacat/DocumentImpl.java | ||
---|---|---|
26 | 26 |
import java.util.Iterator; |
27 | 27 |
import java.util.Stack; |
28 | 28 |
import java.util.TreeSet; |
29 |
import java.util.Enumeration; |
|
29 | 30 |
|
30 | 31 |
import org.xml.sax.AttributeList; |
31 | 32 |
import org.xml.sax.ContentHandler; |
... | ... | |
728 | 729 |
String group, int serverCode ) |
729 | 730 |
throws Exception |
730 | 731 |
{ |
731 |
return write(conn, xml, null, |
|
732 |
action, docid, user, |
|
733 |
group, serverCode); |
|
732 |
return write(conn, xml, null, action, docid, user, group, serverCode); |
|
734 | 733 |
} |
735 | 734 |
|
735 |
public static String write( Connection conn, Reader xml, Reader acl, |
|
736 |
String action, String docid, String user, |
|
737 |
String group, int serverCode) throws Exception |
|
738 |
{ |
|
739 |
return write(conn, xml, acl, action, docid, user, group, serverCode, false); |
|
740 |
} |
|
741 |
|
|
736 | 742 |
/** |
737 | 743 |
* Write an XML file to the database, given a Reader |
738 | 744 |
* |
... | ... | |
744 | 750 |
|
745 | 751 |
public static String write( Connection conn, Reader xml, Reader acl, |
746 | 752 |
String action, String docid, String user, |
747 |
String group, int serverCode )
|
|
748 |
throws Exception |
|
753 |
String group, int serverCode, boolean override)
|
|
754 |
throws Exception
|
|
749 | 755 |
{ |
750 | 756 |
System.out.println("in write"); |
751 | 757 |
MetaCatUtil util = new MetaCatUtil(); |
... | ... | |
754 | 760 |
String newdocid = ac.generate(docid, action); |
755 | 761 |
|
756 | 762 |
System.out.println("action: " + action + " servercode: " + |
757 |
serverCode); |
|
758 |
if(serverCode != 1 && action.equals("UPDATE"))
|
|
763 |
serverCode + " override: " + override);
|
|
764 |
if((serverCode != 1 && action.equals("UPDATE")) && !override)
|
|
759 | 765 |
{ //if this document being written is not a resident of this server then |
760 | 766 |
//we need to try to get a lock from it's resident server. If the |
761 | 767 |
//resident server will not give a lock then we send the user a message |
... | ... | |
841 | 847 |
"merge your changes and try again."); |
842 | 848 |
} |
843 | 849 |
} |
844 |
System.out.println("======THIS GOT TOO FAR======="); |
|
845 | 850 |
|
846 | 851 |
if ( action.equals("UPDATE") ) { |
847 | 852 |
// check for 'write' permission for 'user' to update this document |
... | ... | |
859 | 864 |
conn.commit(); |
860 | 865 |
if ( acl != null ) |
861 | 866 |
{ |
862 |
if ( action.equals("UPDATE") ) |
|
867 |
if ( action.equals("UPDATE") )
|
|
863 | 868 |
{ |
864 | 869 |
Statement stmt = conn.createStatement(); |
865 | 870 |
stmt.execute("DELETE FROM xml_access WHERE docid='"+newdocid +"'"); |
... | ... | |
877 | 882 |
conn.setAutoCommit(true); |
878 | 883 |
throw e; |
879 | 884 |
} |
885 |
|
|
886 |
//force replicate out the new document to each server in our server list. |
|
887 |
if(serverCode == 1) |
|
888 |
{ |
|
889 |
Enumeration keys = (ReplicationHandler.buildServerList(conn)).keys(); |
|
890 |
while(keys.hasMoreElements()) |
|
891 |
{ |
|
892 |
String server = (String)(keys.nextElement()); |
|
893 |
URL comeAndGetIt = new URL("http://" + server + |
|
894 |
"?action=forcereplicate&server=" + |
|
895 |
util.getOption("server") + |
|
896 |
util.getOption("replicationpath") + |
|
897 |
"&docid=" + newdocid + "&dbaction=" + |
|
898 |
action); |
|
899 |
System.out.println("sending message: " + comeAndGetIt.toString()); |
|
900 |
String message = MetacatReplication.getURLContent(comeAndGetIt); |
|
901 |
} |
|
902 |
} |
|
880 | 903 |
|
881 | 904 |
if ( (docid != null) && !(newdocid.equals(docid)) ) |
882 | 905 |
{ |
src/edu/ucsb/nceas/metacat/ReplMessageHandler.java | ||
---|---|---|
96 | 96 |
indivDelete.add(new String(ch, start, length)); |
97 | 97 |
} |
98 | 98 |
|
99 |
if(currentTag.equals("rev") && update) |
|
100 |
{ |
|
101 |
indivUpdate.add(new String(ch, start, length)); |
|
102 |
indivUpdate.add(server); |
|
103 |
} |
|
104 |
else if(currentTag.equals("rev") && delete) |
|
105 |
{ |
|
106 |
indivDelete.add(new String(ch, start, length)); |
|
107 |
indivDelete.add(server); |
|
108 |
} |
|
109 |
|
|
99 | 110 |
if(currentTag.equals("date_updated") && update) |
100 | 111 |
{ |
101 | 112 |
indivUpdate.add(new String(ch, start, length)); |
... | ... | |
122 | 133 |
{ |
123 | 134 |
return deletes; |
124 | 135 |
} |
125 |
|
|
126 | 136 |
} |
src/edu/ucsb/nceas/metacat/ReplicationHandler.java | ||
---|---|---|
70 | 70 |
{ |
71 | 71 |
conn = util.openDBConnection(); |
72 | 72 |
serverList = buildServerList(conn); |
73 |
update(serverList, conn); |
|
73 |
update2(serverList, conn);
|
|
74 | 74 |
conn.close(); |
75 | 75 |
} |
76 | 76 |
catch (Exception e) |
... | ... | |
80 | 80 |
} |
81 | 81 |
|
82 | 82 |
/** |
83 |
* Method that uses revision taging for replication instead of update_date. |
|
84 |
*/ |
|
85 |
private void update2(Hashtable serverList, Connection conn) |
|
86 |
{ |
|
87 |
/* |
|
88 |
Pseudo-algorithm |
|
89 |
- request a doc list from each server in xml_replication |
|
90 |
- check the rev number of each of those documents agains the |
|
91 |
documents in the local database |
|
92 |
- pull any documents that have a lesser rev number on the local server |
|
93 |
from the remote server |
|
94 |
- delete any documents that still exist in the local xml_documents but |
|
95 |
are in the deletedDocuments tag of the remote host response. |
|
96 |
- update last_checked to keep track of the last time it was checked. |
|
97 |
(this info is theoretically not needed using this system but probably |
|
98 |
should be kept anyway) |
|
99 |
*/ |
|
100 |
Enumeration keys; |
|
101 |
String server; |
|
102 |
String update; |
|
103 |
ReplMessageHandler message = new ReplMessageHandler(); |
|
104 |
Vector responses = new Vector(); |
|
105 |
PreparedStatement pstmt; |
|
106 |
ResultSet rs; |
|
107 |
boolean tablehasrows; |
|
108 |
boolean flag=false; |
|
109 |
String action = new String(); |
|
110 |
XMLReader parser; |
|
111 |
URL u; |
|
112 |
|
|
113 |
//System.out.println("in update2"); |
|
114 |
|
|
115 |
try |
|
116 |
{ |
|
117 |
//System.out.println("init parser"); |
|
118 |
parser = initParser(message); |
|
119 |
keys = serverList.keys(); |
|
120 |
while(keys.hasMoreElements()) |
|
121 |
{ |
|
122 |
//System.out.println("get responses"); |
|
123 |
server = (String)(keys.nextElement()); |
|
124 |
u = new URL("http://" + server + "?action=update"); |
|
125 |
String result = MetacatReplication.getURLContent(u); |
|
126 |
//System.out.println(result); |
|
127 |
responses.add(result); |
|
128 |
} |
|
129 |
|
|
130 |
//System.out.println("responses: " + responses.toString()); |
|
131 |
|
|
132 |
for(int i=0; i<responses.size(); i++) |
|
133 |
{ //check each server for updated files |
|
134 |
//System.out.println("parsing responses"); |
|
135 |
parser.parse(new InputSource( |
|
136 |
new StringReader( |
|
137 |
(String)(responses.elementAt(i))))); |
|
138 |
Vector v = new Vector(message.getUpdatesVect()); |
|
139 |
//System.out.println("v: " + v.toString()); |
|
140 |
Vector d = new Vector(message.getDeletesVect()); |
|
141 |
//check the revs in u to see if there are any newer ones than |
|
142 |
//in the local DB. |
|
143 |
for(int j=0; j<v.size(); j++) |
|
144 |
{ |
|
145 |
Vector w = new Vector((Vector)(v.elementAt(j))); |
|
146 |
//System.out.println("w: " + w.toString()); |
|
147 |
String docid = (String)w.elementAt(0); |
|
148 |
//System.out.println("docid: " + docid); |
|
149 |
int rev = Integer.parseInt((String)w.elementAt(1)); |
|
150 |
//System.out.println("rev: " + rev); |
|
151 |
String docServer = (String)w.elementAt(2); |
|
152 |
//System.out.println("docServer: " + docServer); |
|
153 |
|
|
154 |
pstmt = conn.prepareStatement("select rev from xml_documents where "+ |
|
155 |
"docid like '" + docid + "'"); |
|
156 |
pstmt.execute(); |
|
157 |
rs = pstmt.getResultSet(); |
|
158 |
tablehasrows = rs.next(); |
|
159 |
if(tablehasrows) |
|
160 |
{ //check the revs for an update because this document is in the |
|
161 |
//local DB, it just might be out of date. |
|
162 |
int localrev = rs.getInt(1); |
|
163 |
if(localrev == rev) |
|
164 |
{ |
|
165 |
flag = false; |
|
166 |
} |
|
167 |
else if(localrev < rev) |
|
168 |
{//this document needs to be updated so send an read request |
|
169 |
action = "UPDATE"; |
|
170 |
flag = true; |
|
171 |
} |
|
172 |
} |
|
173 |
else |
|
174 |
{ //insert this document as new because it is not in the local DB |
|
175 |
action = "INSERT"; |
|
176 |
flag = true; |
|
177 |
} |
|
178 |
|
|
179 |
if(flag) |
|
180 |
{ //if the document needs to be updated or inserted, this is executed |
|
181 |
u = new URL("http://" + docServer + "?action=read&docid=" + |
|
182 |
docid); |
|
183 |
String newxmldoc = MetacatReplication.getURLContent(u); |
|
184 |
DocInfoHandler dih = new DocInfoHandler(); |
|
185 |
XMLReader docinfoParser = initParser(dih); |
|
186 |
URL docinfoUrl = new URL("http://" + docServer + |
|
187 |
"?action=getdocumentinfo&docid=" + |
|
188 |
docid); |
|
189 |
String docInfoStr = MetacatReplication.getURLContent(docinfoUrl); |
|
190 |
docinfoParser.parse(new InputSource(new StringReader(docInfoStr))); |
|
191 |
Hashtable docinfoHash = dih.getDocInfo(); |
|
192 |
int serverCode = MetacatReplication.getServerCode(docServer); |
|
193 |
System.out.println("updating doc: " + docid + " action: "+ action); |
|
194 |
String newDocid = DocumentImpl.write(conn, |
|
195 |
new StringReader(newxmldoc), |
|
196 |
null, |
|
197 |
action, |
|
198 |
docid, |
|
199 |
(String)docinfoHash.get("user_owner"), |
|
200 |
(String)docinfoHash.get("user_owner"), |
|
201 |
serverCode, |
|
202 |
true); |
|
203 |
} |
|
204 |
} |
|
205 |
|
|
206 |
for(int k=0; k<d.size(); k++) |
|
207 |
{ //delete the deleted documents; |
|
208 |
Vector w = new Vector((Vector)d.elementAt(k)); |
|
209 |
String docid = (String)w.elementAt(0); |
|
210 |
|
|
211 |
DocumentImpl.delete(conn, docid, null, null); |
|
212 |
System.out.println("Document " + docid + " deleted."); |
|
213 |
} |
|
214 |
|
|
215 |
keys = serverList.keys(); |
|
216 |
while(keys.hasMoreElements()) |
|
217 |
{ |
|
218 |
server = (String)(keys.nextElement()); |
|
219 |
URL dateurl = new URL("http://" + server + "?action=gettime"); |
|
220 |
String datexml = MetacatReplication.getURLContent(dateurl); |
|
221 |
String datestr = datexml.substring(11, datexml.indexOf('<', 11)); |
|
222 |
StringBuffer sql = new StringBuffer(); |
|
223 |
sql.append("update xml_replication set last_checked = to_date('"); |
|
224 |
sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where "); |
|
225 |
sql.append("server like '").append(server).append("'"); |
|
226 |
//System.out.println("sql: " + sql.toString()); |
|
227 |
pstmt = conn.prepareStatement(sql.toString()); |
|
228 |
pstmt.executeUpdate(); |
|
229 |
//conn.commit(); |
|
230 |
System.out.println("last_checked updated to " + datestr + " on " + |
|
231 |
server); |
|
232 |
} |
|
233 |
} |
|
234 |
} |
|
235 |
catch(Exception e) |
|
236 |
{ |
|
237 |
System.out.println("error in update2: " + e.getMessage()); |
|
238 |
e.printStackTrace(System.out); |
|
239 |
} |
|
240 |
} |
|
241 |
|
|
242 |
/** |
|
83 | 243 |
* Method to check each server in the servetList for updates |
84 | 244 |
*/ |
85 | 245 |
private void update(Hashtable serverList, Connection conn) |
... | ... | |
265 | 425 |
* and it's last update time. |
266 | 426 |
* @param conn a connection to the database |
267 | 427 |
*/ |
268 |
private Hashtable buildServerList(Connection conn)
|
|
428 |
public static Hashtable buildServerList(Connection conn)
|
|
269 | 429 |
{ |
270 | 430 |
Hashtable sl = new Hashtable(); |
271 | 431 |
PreparedStatement pstmt; |
Also available in: Unified diff
added replication_on_insert handling. Changed replication from date_updated base replication to revision number replication.