Revision 522
Added by berkley about 24 years ago
src/edu/ucsb/nceas/metacat/replMessageHandler.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A class that handles xml messages passed by the replication handler |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.sql.*; |
|
17 |
import java.util.Stack; |
|
18 |
import java.util.Vector; |
|
19 |
import java.util.Enumeration; |
|
20 |
import java.util.EmptyStackException; |
|
21 |
|
|
22 |
import org.xml.sax.Attributes; |
|
23 |
import org.xml.sax.SAXException; |
|
24 |
import org.xml.sax.SAXParseException; |
|
25 |
import org.xml.sax.ext.DeclHandler; |
|
26 |
import org.xml.sax.ext.LexicalHandler; |
|
27 |
import org.xml.sax.helpers.DefaultHandler; |
|
28 |
|
|
29 |
/** |
|
30 |
* A database aware Class implementing callback bethods for the SAX parser to |
|
31 |
* call when processing the XML messages from the replication handler |
|
32 |
*/ |
|
33 |
public class replMessageHandler extends DefaultHandler |
|
34 |
{ |
|
35 |
private Vector updates = new Vector(); |
|
36 |
private Vector indivUpdate = new Vector(); |
|
37 |
String currentTag = new String(); |
|
38 |
|
|
39 |
public replMessageHandler() |
|
40 |
{ |
|
41 |
} |
|
42 |
|
|
43 |
/** |
|
44 |
* This method starts a new vector for each updatedDocument tag. |
|
45 |
*/ |
|
46 |
public void startElement(String uri, String localName, String qName, |
|
47 |
Attributes attributes) throws SAXException |
|
48 |
{ |
|
49 |
currentTag = localName; |
|
50 |
if(localName.equals("updatedDocument")) |
|
51 |
{ |
|
52 |
indivUpdate = new Vector(); |
|
53 |
} |
|
54 |
} |
|
55 |
|
|
56 |
/** |
|
57 |
* This method write the indivUpdate to updates when it finds the end of |
|
58 |
*/ |
|
59 |
public void endElement(String uri, String localName, String qName) |
|
60 |
throws SAXException |
|
61 |
{ |
|
62 |
if(localName.equals("updatedDocument")) |
|
63 |
{ |
|
64 |
updates.add(new Vector(indivUpdate)); |
|
65 |
} |
|
66 |
} |
|
67 |
|
|
68 |
/** |
|
69 |
* Take the data out of the docid and date_updated fields |
|
70 |
*/ |
|
71 |
public void characters(char[] ch, int start, int length) throws SAXException |
|
72 |
{ |
|
73 |
if(currentTag.equals("docid")) |
|
74 |
{ |
|
75 |
indivUpdate.add(new String(ch, start, length)); |
|
76 |
} |
|
77 |
if(currentTag.equals("date_updated")) |
|
78 |
{ |
|
79 |
indivUpdate.add(new String(ch, start, length)); |
|
80 |
} |
|
81 |
} |
|
82 |
|
|
83 |
public Vector getResultVect() |
|
84 |
{ |
|
85 |
return updates; |
|
86 |
} |
|
87 |
|
|
88 |
} |
|
89 | 0 |
src/edu/ucsb/nceas/metacat/replicationHandler.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A class to asyncronously do delta-T replication checking |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.sql.*; |
|
17 |
import java.util.*; |
|
18 |
import java.lang.Thread; |
|
19 |
import java.io.*; |
|
20 |
import java.net.*; |
|
21 |
import org.xml.sax.AttributeList; |
|
22 |
import org.xml.sax.ContentHandler; |
|
23 |
import org.xml.sax.DTDHandler; |
|
24 |
import org.xml.sax.EntityResolver; |
|
25 |
import org.xml.sax.ErrorHandler; |
|
26 |
import org.xml.sax.InputSource; |
|
27 |
import org.xml.sax.XMLReader; |
|
28 |
import org.xml.sax.SAXException; |
|
29 |
import org.xml.sax.SAXParseException; |
|
30 |
import org.xml.sax.helpers.XMLReaderFactory; |
|
31 |
|
|
32 |
/** |
|
33 |
* This class handles deltaT replication checking. Whenever this TimerTask |
|
34 |
* is fired it checks each server in xml_replication for updates and updates |
|
35 |
* the local db as needed. |
|
36 |
*/ |
|
37 |
public class replicationHandler extends TimerTask |
|
38 |
{ |
|
39 |
MetaCatUtil util = new MetaCatUtil(); |
|
40 |
Hashtable serverList = new Hashtable(); |
|
41 |
Connection conn; |
|
42 |
PrintWriter out; |
|
43 |
|
|
44 |
public replicationHandler(PrintWriter o) |
|
45 |
{ |
|
46 |
this.out = o; |
|
47 |
} |
|
48 |
|
|
49 |
/** |
|
50 |
* Method that implements TimerTask.run(). It runs whenever the timer is |
|
51 |
* fired. |
|
52 |
*/ |
|
53 |
public void run() |
|
54 |
{ |
|
55 |
System.out.println("replicationHandler is running"); |
|
56 |
//find out the last_checked time of each server in the server list and |
|
57 |
//send a query to each server to see if there are any documents in |
|
58 |
//xml_documents with an update_date > last_checked |
|
59 |
try |
|
60 |
{ |
|
61 |
conn = util.openDBConnection(); |
|
62 |
serverList = buildServerList(conn); |
|
63 |
System.out.println("Server list: " + serverList.toString()); |
|
64 |
update(serverList, conn); |
|
65 |
} |
|
66 |
catch (Exception e) |
|
67 |
{ |
|
68 |
System.out.println("Error in replicationHandler.run(): " + e.getMessage()); |
|
69 |
} |
|
70 |
} |
|
71 |
|
|
72 |
private void update(Hashtable serverList, Connection conn) |
|
73 |
{ |
|
74 |
PreparedStatement pstmt; |
|
75 |
Enumeration keys; |
|
76 |
int istreamInt; |
|
77 |
char istreamChar; |
|
78 |
StringBuffer serverResponse = new StringBuffer(); |
|
79 |
String server; |
|
80 |
String update; |
|
81 |
Vector responses = new Vector(); |
|
82 |
replMessageHandler message = new replMessageHandler(); |
|
83 |
|
|
84 |
try |
|
85 |
{ |
|
86 |
//build a list of servers with updated documents. Choose the newest |
|
87 |
//one out of the list, update this server, update last_checked |
|
88 |
|
|
89 |
keys = serverList.keys(); |
|
90 |
while(keys.hasMoreElements()) |
|
91 |
{ //update from one server at a time |
|
92 |
server = (String)(keys.nextElement()); |
|
93 |
update = (String)(serverList.get(server)); |
|
94 |
//send the server a date and it will send back any docid that has |
|
95 |
//been modified after that date |
|
96 |
|
|
97 |
update = update.replace(' ', '+'); |
|
98 |
|
|
99 |
URL u = new URL("http://" + server + "?update=" + update); |
|
100 |
InputStreamReader istream = new InputStreamReader(u.openStream()); |
|
101 |
while((istreamInt = istream.read()) != -1) |
|
102 |
{ |
|
103 |
istreamChar = (char)istreamInt; |
|
104 |
serverResponse.append(istreamChar); |
|
105 |
} |
|
106 |
responses.add(serverResponse.toString()); //list of updates |
|
107 |
} |
|
108 |
//System.out.println("responses: " + responses.toString()); |
|
109 |
|
|
110 |
//initialize the parser |
|
111 |
XMLReader parser = initParser(message); |
|
112 |
for(int i=0; i<responses.size(); i++) |
|
113 |
{ //parse the xml and get the result |
|
114 |
parser.parse(new InputSource( |
|
115 |
new StringReader( |
|
116 |
(String)(responses.elementAt(i))))); |
|
117 |
Vector v = new Vector(message.getResultVect()); |
|
118 |
for(int j=0; j<v.size(); j++) |
|
119 |
{ |
|
120 |
Vector w = new Vector((Vector)(v.elementAt(j))); |
|
121 |
System.out.print("param " + j + ": " + w.toString()); |
|
122 |
//so now we have a list of the documents that need to be updated, so |
|
123 |
//now we need to request them. from the server and update them here |
|
124 |
} |
|
125 |
System.out.println(""); |
|
126 |
} |
|
127 |
|
|
128 |
|
|
129 |
} |
|
130 |
catch(Exception e) |
|
131 |
{ |
|
132 |
System.out.println("Error in replicationHandler.update(): " + |
|
133 |
e.getMessage()); |
|
134 |
} |
|
135 |
} |
|
136 |
|
|
137 |
/** |
|
138 |
* Method to initialize the message parser |
|
139 |
*/ |
|
140 |
private static XMLReader initParser(replMessageHandler rmh) |
|
141 |
throws Exception |
|
142 |
{ |
|
143 |
XMLReader parser = null; |
|
144 |
|
|
145 |
try { |
|
146 |
ContentHandler chandler = rmh; |
|
147 |
|
|
148 |
// Get an instance of the parser |
|
149 |
MetaCatUtil util = new MetaCatUtil(); |
|
150 |
String parserName = util.getOption("saxparser"); |
|
151 |
parser = XMLReaderFactory.createXMLReader(parserName); |
|
152 |
|
|
153 |
// Turn off validation |
|
154 |
parser.setFeature("http://xml.org/sax/features/validation", false); |
|
155 |
|
|
156 |
// Set Handlers in the parser |
|
157 |
/* |
|
158 |
parser.setProperty("http://xml.org/sax/properties/declaration-handler", |
|
159 |
chandler); |
|
160 |
parser.setProperty("http://xml.org/sax/properties/lexical-handler", |
|
161 |
chandler); |
|
162 |
*/ |
|
163 |
parser.setContentHandler((ContentHandler)chandler); |
|
164 |
parser.setErrorHandler((ErrorHandler)chandler); |
|
165 |
|
|
166 |
} catch (Exception e) { |
|
167 |
throw e; |
|
168 |
} |
|
169 |
|
|
170 |
return parser; |
|
171 |
} |
|
172 |
|
|
173 |
/** |
|
174 |
* Method to query xml_replication and build a hashtable of each server |
|
175 |
* and it's last update time. |
|
176 |
* @param conn a connection to the database |
|
177 |
*/ |
|
178 |
private Hashtable buildServerList(Connection conn) |
|
179 |
{ |
|
180 |
Hashtable sl = new Hashtable(); |
|
181 |
PreparedStatement pstmt; |
|
182 |
try |
|
183 |
{ |
|
184 |
pstmt = conn.prepareStatement("select server, last_checked from " + |
|
185 |
"xml_replication"); |
|
186 |
pstmt.execute(); |
|
187 |
ResultSet rs = pstmt.getResultSet(); |
|
188 |
boolean tableHasRows = rs.next(); |
|
189 |
while(tableHasRows) |
|
190 |
{ |
|
191 |
sl.put(rs.getString(1), rs.getString(2)); |
|
192 |
tableHasRows = rs.next(); |
|
193 |
} |
|
194 |
} |
|
195 |
catch(Exception e) |
|
196 |
{ |
|
197 |
System.out.println("error in replicationHandler.buildServerList(): " + |
|
198 |
e.getMessage()); |
|
199 |
} |
|
200 |
return sl; |
|
201 |
} |
|
202 |
} |
|
203 | 0 |
src/edu/ucsb/nceas/metacat/metacatReplication.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A Class that implements replication for metacat |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.util.*; |
|
17 |
import java.io.*; |
|
18 |
import java.sql.*; |
|
19 |
import java.net.*; |
|
20 |
import java.lang.*; |
|
21 |
import java.text.*; |
|
22 |
import javax.servlet.*; |
|
23 |
import javax.servlet.http.*; |
|
24 |
import oracle.xml.parser.v2.*; |
|
25 |
import org.xml.sax.*; |
|
26 |
|
|
27 |
public class metacatReplication extends HttpServlet |
|
28 |
{ |
|
29 |
private String deltaT; |
|
30 |
Timer replicationDaemon; |
|
31 |
MetaCatUtil util = new MetaCatUtil(); |
|
32 |
|
|
33 |
/** |
|
34 |
* Initialize the servlet by creating appropriate database connections |
|
35 |
*/ |
|
36 |
public void init(ServletConfig config) throws ServletException |
|
37 |
{ |
|
38 |
//initialize db connections to handle any update requests |
|
39 |
MetaCatUtil util = new MetaCatUtil(); |
|
40 |
deltaT = util.getOption("deltaT"); |
|
41 |
//break off a thread to do the delta-T check |
|
42 |
replicationDaemon = new Timer(true); |
|
43 |
//replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, |
|
44 |
// new Integer(deltaT).intValue() * 1000); |
|
45 |
//System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() |
|
46 |
// + " seconds"); |
|
47 |
|
|
48 |
} |
|
49 |
|
|
50 |
public void destroy() |
|
51 |
{ |
|
52 |
replicationDaemon.cancel(); |
|
53 |
System.out.println("Replication daemon cancelled."); |
|
54 |
} |
|
55 |
|
|
56 |
public void doGet (HttpServletRequest request, HttpServletResponse response) |
|
57 |
throws ServletException, IOException |
|
58 |
{ |
|
59 |
// Process the data and send back the response |
|
60 |
handleGetOrPost(request, response); |
|
61 |
} |
|
62 |
|
|
63 |
public void doPost(HttpServletRequest request, HttpServletResponse response) |
|
64 |
throws ServletException, IOException |
|
65 |
{ |
|
66 |
// Process the data and send back the response |
|
67 |
handleGetOrPost(request, response); |
|
68 |
} |
|
69 |
|
|
70 |
private void handleGetOrPost(HttpServletRequest request, |
|
71 |
HttpServletResponse response) |
|
72 |
throws ServletException, IOException |
|
73 |
{ |
|
74 |
PrintWriter out = response.getWriter(); |
|
75 |
|
|
76 |
Hashtable params = new Hashtable(); |
|
77 |
Enumeration paramlist = request.getParameterNames(); |
|
78 |
|
|
79 |
while (paramlist.hasMoreElements()) |
|
80 |
{ |
|
81 |
String name = (String)paramlist.nextElement(); |
|
82 |
String[] value = request.getParameterValues(name); |
|
83 |
params.put(name, value); |
|
84 |
} |
|
85 |
|
|
86 |
if(params.containsKey("stop")) |
|
87 |
{ |
|
88 |
if(((String[])params.get("stop"))[0].equals("true")) |
|
89 |
{ |
|
90 |
replicationDaemon.cancel(); |
|
91 |
out.println("Replication Handler Stopped"); |
|
92 |
} |
|
93 |
} |
|
94 |
|
|
95 |
if(params.containsKey("start")) |
|
96 |
{ |
|
97 |
if(((String[])params.get("start"))[0].equals("true")) |
|
98 |
{ |
|
99 |
int rate; |
|
100 |
if(params.containsKey("rate")) |
|
101 |
rate = new Integer( |
|
102 |
new String(((String[])params.get("rate"))[0])).intValue(); |
|
103 |
else |
|
104 |
rate = 1000; |
|
105 |
|
|
106 |
out.println("New rate is: " + rate + " seconds."); |
|
107 |
replicationDaemon.cancel(); |
|
108 |
replicationDaemon = new Timer(true); |
|
109 |
replicationDaemon.scheduleAtFixedRate(new replicationHandler(out), 0, |
|
110 |
rate * 1000); |
|
111 |
out.println("Replication Handler Started"); |
|
112 |
} |
|
113 |
} |
|
114 |
|
|
115 |
if(params.containsKey("update")) |
|
116 |
{ |
|
117 |
handleUpdateRequest(out, params, response); |
|
118 |
} |
|
119 |
} |
|
120 |
|
|
121 |
private void handleUpdateRequest(PrintWriter out, Hashtable params, |
|
122 |
HttpServletResponse response) |
|
123 |
{ |
|
124 |
System.out.println("incoming request for dt/time " + |
|
125 |
((String[])params.get("update"))[0] + |
|
126 |
" from external metacat"); |
|
127 |
response.setContentType("text/xml"); |
|
128 |
out.println("<replication><textmessage>In metacatReplication</textmessage>"); |
|
129 |
|
|
130 |
StringBuffer sql = new StringBuffer(); |
|
131 |
StringBuffer returnXML = new StringBuffer(); |
|
132 |
String updateStr = ((String[])params.get("update"))[0]; |
|
133 |
updateStr = updateStr.replace('+', ' '); |
|
134 |
//pseudo algorithm: |
|
135 |
/////////////////////////////////////////////////////////////////////// |
|
136 |
//get the date/time from the requestor, query the db for any documents |
|
137 |
//that have an update date later than the requested date and send |
|
138 |
//those docids back to the requestor. If there are no newer documents |
|
139 |
//then send back an up-to-date message. |
|
140 |
/////////////////////////////////////////////////////////////////////// |
|
141 |
|
|
142 |
//Timestamp update = Timestamp.valueOf(updateStr); |
|
143 |
SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss"); |
|
144 |
java.util.Date update = new java.util.Date(); |
|
145 |
ParsePosition pos = new ParsePosition(0); |
|
146 |
update = formatter.parse(updateStr, pos); |
|
147 |
String dateString = formatter.format(update); |
|
148 |
sql.append("select docid, date_updated from xml_documents where "); |
|
149 |
sql.append("date_updated > "); |
|
150 |
sql.append("to_date('").append(dateString).append("','YY-MM-DD HH24:MI:SS')"); |
|
151 |
//System.out.println("sql: " + sql.toString()); |
|
152 |
|
|
153 |
try |
|
154 |
{ |
|
155 |
Connection conn = util.openDBConnection(); |
|
156 |
PreparedStatement pstmt = conn.prepareStatement(sql.toString()); |
|
157 |
pstmt.execute(); |
|
158 |
ResultSet rs = pstmt.getResultSet(); |
|
159 |
boolean tablehasrows = rs.next(); |
|
160 |
returnXML.append("<server>").append(util.getOption("server")); |
|
161 |
returnXML.append("</server><updates>"); |
|
162 |
while(tablehasrows) |
|
163 |
{ |
|
164 |
returnXML.append("<updatedDocument><docid>").append(rs.getString(1)); |
|
165 |
returnXML.append("</docid><date_updated>").append(rs.getString(2)); |
|
166 |
returnXML.append("</date_updated></updatedDocument>"); |
|
167 |
tablehasrows = rs.next(); |
|
168 |
} |
|
169 |
returnXML.append("</updates></replication>"); |
|
170 |
out.print(returnXML.toString()); |
|
171 |
} |
|
172 |
catch(Exception e) |
|
173 |
{ |
|
174 |
System.out.println("Exception in metacatReplication: " + e.getMessage()); |
|
175 |
} |
|
176 |
} |
|
177 |
} |
|
178 | 0 |
src/edu/ucsb/nceas/metacat/MetacatReplication.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A Class that implements replication for metacat |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.util.*; |
|
17 |
import java.io.*; |
|
18 |
import java.sql.*; |
|
19 |
import java.net.*; |
|
20 |
import java.lang.*; |
|
21 |
import java.text.*; |
|
22 |
import javax.servlet.*; |
|
23 |
import javax.servlet.http.*; |
|
24 |
import oracle.xml.parser.v2.*; |
|
25 |
import org.xml.sax.*; |
|
26 |
|
|
27 |
public class MetacatReplication extends HttpServlet |
|
28 |
{ |
|
29 |
private String deltaT; |
|
30 |
Timer replicationDaemon; |
|
31 |
MetaCatUtil util = new MetaCatUtil(); |
|
32 |
|
|
33 |
/** |
|
34 |
* Initialize the servlet by creating appropriate database connections |
|
35 |
*/ |
|
36 |
public void init(ServletConfig config) throws ServletException |
|
37 |
{ |
|
38 |
//initialize db connections to handle any update requests |
|
39 |
MetaCatUtil util = new MetaCatUtil(); |
|
40 |
deltaT = util.getOption("deltaT"); |
|
41 |
//break off a thread to do the delta-T check |
|
42 |
replicationDaemon = new Timer(true); |
|
43 |
//replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, |
|
44 |
// new Integer(deltaT).intValue() * 1000); |
|
45 |
//System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() |
|
46 |
// + " seconds"); |
|
47 |
|
|
48 |
} |
|
49 |
|
|
50 |
public void destroy() |
|
51 |
{ |
|
52 |
replicationDaemon.cancel(); |
|
53 |
System.out.println("Replication daemon cancelled."); |
|
54 |
} |
|
55 |
|
|
56 |
public void doGet (HttpServletRequest request, HttpServletResponse response) |
|
57 |
throws ServletException, IOException |
|
58 |
{ |
|
59 |
// Process the data and send back the response |
|
60 |
handleGetOrPost(request, response); |
|
61 |
} |
|
62 |
|
|
63 |
public void doPost(HttpServletRequest request, HttpServletResponse response) |
|
64 |
throws ServletException, IOException |
|
65 |
{ |
|
66 |
// Process the data and send back the response |
|
67 |
handleGetOrPost(request, response); |
|
68 |
} |
|
69 |
|
|
70 |
private void handleGetOrPost(HttpServletRequest request, |
|
71 |
HttpServletResponse response) |
|
72 |
throws ServletException, IOException |
|
73 |
{ |
|
74 |
PrintWriter out = response.getWriter(); |
|
75 |
|
|
76 |
Hashtable params = new Hashtable(); |
|
77 |
Enumeration paramlist = request.getParameterNames(); |
|
78 |
|
|
79 |
while (paramlist.hasMoreElements()) |
|
80 |
{ |
|
81 |
String name = (String)paramlist.nextElement(); |
|
82 |
String[] value = request.getParameterValues(name); |
|
83 |
params.put(name, value); |
|
84 |
} |
|
85 |
|
|
86 |
if(params.containsKey("stop")) |
|
87 |
{ |
|
88 |
if(((String[])params.get("stop"))[0].equals("true")) |
|
89 |
{ |
|
90 |
replicationDaemon.cancel(); |
|
91 |
out.println("Replication Handler Stopped"); |
|
92 |
} |
|
93 |
} |
|
94 |
|
|
95 |
if(params.containsKey("start")) |
|
96 |
{ |
|
97 |
if(((String[])params.get("start"))[0].equals("true")) |
|
98 |
{ |
|
99 |
int rate; |
|
100 |
if(params.containsKey("rate")) |
|
101 |
rate = new Integer( |
|
102 |
new String(((String[])params.get("rate"))[0])).intValue(); |
|
103 |
else |
|
104 |
rate = 1000; |
|
105 |
|
|
106 |
out.println("New rate is: " + rate + " seconds."); |
|
107 |
replicationDaemon.cancel(); |
|
108 |
replicationDaemon = new Timer(true); |
|
109 |
replicationDaemon.scheduleAtFixedRate(new replicationHandler(out), 0, |
|
110 |
rate * 1000); |
|
111 |
out.println("Replication Handler Started"); |
|
112 |
} |
|
113 |
} |
|
114 |
|
|
115 |
if(params.containsKey("update")) |
|
116 |
{ |
|
117 |
handleUpdateRequest(out, params, response); |
|
118 |
} |
|
119 |
} |
|
120 |
|
|
121 |
private void handleUpdateRequest(PrintWriter out, Hashtable params, |
|
122 |
HttpServletResponse response) |
|
123 |
{ |
|
124 |
System.out.println("incoming request for dt/time " + |
|
125 |
((String[])params.get("update"))[0] + |
|
126 |
" from external metacat"); |
|
127 |
response.setContentType("text/xml"); |
|
128 |
out.println("<replication><textmessage>In metacatReplication</textmessage>"); |
|
129 |
|
|
130 |
StringBuffer sql = new StringBuffer(); |
|
131 |
StringBuffer returnXML = new StringBuffer(); |
|
132 |
String updateStr = ((String[])params.get("update"))[0]; |
|
133 |
updateStr = updateStr.replace('+', ' '); |
|
134 |
//pseudo algorithm: |
|
135 |
/////////////////////////////////////////////////////////////////////// |
|
136 |
//get the date/time from the requestor, query the db for any documents |
|
137 |
//that have an update date later than the requested date and send |
|
138 |
//those docids back to the requestor. If there are no newer documents |
|
139 |
//then send back an up-to-date message. |
|
140 |
/////////////////////////////////////////////////////////////////////// |
|
141 |
|
|
142 |
//Timestamp update = Timestamp.valueOf(updateStr); |
|
143 |
SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss"); |
|
144 |
java.util.Date update = new java.util.Date(); |
|
145 |
ParsePosition pos = new ParsePosition(0); |
|
146 |
update = formatter.parse(updateStr, pos); |
|
147 |
String dateString = formatter.format(update); |
|
148 |
sql.append("select docid, date_updated from xml_documents where "); |
|
149 |
sql.append("date_updated > "); |
|
150 |
sql.append("to_date('").append(dateString).append("','YY-MM-DD HH24:MI:SS')"); |
|
151 |
//System.out.println("sql: " + sql.toString()); |
|
152 |
|
|
153 |
try |
|
154 |
{ |
|
155 |
Connection conn = util.openDBConnection(); |
|
156 |
PreparedStatement pstmt = conn.prepareStatement(sql.toString()); |
|
157 |
pstmt.execute(); |
|
158 |
ResultSet rs = pstmt.getResultSet(); |
|
159 |
boolean tablehasrows = rs.next(); |
|
160 |
returnXML.append("<server>").append(util.getOption("server")); |
|
161 |
returnXML.append("</server><updates>"); |
|
162 |
while(tablehasrows) |
|
163 |
{ |
|
164 |
returnXML.append("<updatedDocument><docid>").append(rs.getString(1)); |
|
165 |
returnXML.append("</docid><date_updated>").append(rs.getString(2)); |
|
166 |
returnXML.append("</date_updated></updatedDocument>"); |
|
167 |
tablehasrows = rs.next(); |
|
168 |
} |
|
169 |
returnXML.append("</updates></replication>"); |
|
170 |
out.print(returnXML.toString()); |
|
171 |
} |
|
172 |
catch(Exception e) |
|
173 |
{ |
|
174 |
System.out.println("Exception in metacatReplication: " + e.getMessage()); |
|
175 |
} |
|
176 |
} |
|
177 |
} |
|
0 | 178 |
src/edu/ucsb/nceas/metacat/ReplMessageHandler.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A class that handles xml messages passed by the replication handler |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.sql.*; |
|
17 |
import java.util.Stack; |
|
18 |
import java.util.Vector; |
|
19 |
import java.util.Enumeration; |
|
20 |
import java.util.EmptyStackException; |
|
21 |
|
|
22 |
import org.xml.sax.Attributes; |
|
23 |
import org.xml.sax.SAXException; |
|
24 |
import org.xml.sax.SAXParseException; |
|
25 |
import org.xml.sax.ext.DeclHandler; |
|
26 |
import org.xml.sax.ext.LexicalHandler; |
|
27 |
import org.xml.sax.helpers.DefaultHandler; |
|
28 |
|
|
29 |
/** |
|
30 |
* A database aware Class implementing callback bethods for the SAX parser to |
|
31 |
* call when processing the XML messages from the replication handler |
|
32 |
*/ |
|
33 |
public class ReplMessageHandler extends DefaultHandler |
|
34 |
{ |
|
35 |
private Vector updates = new Vector(); |
|
36 |
private Vector indivUpdate = new Vector(); |
|
37 |
String currentTag = new String(); |
|
38 |
|
|
39 |
public ReplMessageHandler() |
|
40 |
{ |
|
41 |
} |
|
42 |
|
|
43 |
/** |
|
44 |
* This method starts a new vector for each updatedDocument tag. |
|
45 |
*/ |
|
46 |
public void startElement(String uri, String localName, String qName, |
|
47 |
Attributes attributes) throws SAXException |
|
48 |
{ |
|
49 |
currentTag = localName; |
|
50 |
if(localName.equals("updatedDocument")) |
|
51 |
{ |
|
52 |
indivUpdate = new Vector(); |
|
53 |
} |
|
54 |
} |
|
55 |
|
|
56 |
/** |
|
57 |
* This method write the indivUpdate to updates when it finds the end of |
|
58 |
*/ |
|
59 |
public void endElement(String uri, String localName, String qName) |
|
60 |
throws SAXException |
|
61 |
{ |
|
62 |
if(localName.equals("updatedDocument")) |
|
63 |
{ |
|
64 |
updates.add(new Vector(indivUpdate)); |
|
65 |
} |
|
66 |
} |
|
67 |
|
|
68 |
/** |
|
69 |
* Take the data out of the docid and date_updated fields |
|
70 |
*/ |
|
71 |
public void characters(char[] ch, int start, int length) throws SAXException |
|
72 |
{ |
|
73 |
if(currentTag.equals("docid")) |
|
74 |
{ |
|
75 |
indivUpdate.add(new String(ch, start, length)); |
|
76 |
} |
|
77 |
if(currentTag.equals("date_updated")) |
|
78 |
{ |
|
79 |
indivUpdate.add(new String(ch, start, length)); |
|
80 |
} |
|
81 |
} |
|
82 |
|
|
83 |
public Vector getResultVect() |
|
84 |
{ |
|
85 |
return updates; |
|
86 |
} |
|
87 |
|
|
88 |
} |
|
0 | 89 |
src/edu/ucsb/nceas/metacat/ReplicationHandler.java | ||
---|---|---|
1 |
/** |
|
2 |
* '$RCSfile$' |
|
3 |
* Purpose: A class to asyncronously do delta-T replication checking |
|
4 |
* Copyright: 2000 Regents of the University of California and the |
|
5 |
* National Center for Ecological Analysis and Synthesis |
|
6 |
* Authors: Chad Berkley |
|
7 |
* Release: @release@ |
|
8 |
* |
|
9 |
* '$Author$' |
|
10 |
* '$Date$' |
|
11 |
* '$Revision$' |
|
12 |
*/ |
|
13 |
|
|
14 |
package edu.ucsb.nceas.metacat; |
|
15 |
|
|
16 |
import java.sql.*; |
|
17 |
import java.util.*; |
|
18 |
import java.lang.Thread; |
|
19 |
import java.io.*; |
|
20 |
import java.net.*; |
|
21 |
import org.xml.sax.AttributeList; |
|
22 |
import org.xml.sax.ContentHandler; |
|
23 |
import org.xml.sax.DTDHandler; |
|
24 |
import org.xml.sax.EntityResolver; |
|
25 |
import org.xml.sax.ErrorHandler; |
|
26 |
import org.xml.sax.InputSource; |
|
27 |
import org.xml.sax.XMLReader; |
|
28 |
import org.xml.sax.SAXException; |
|
29 |
import org.xml.sax.SAXParseException; |
|
30 |
import org.xml.sax.helpers.XMLReaderFactory; |
|
31 |
|
|
32 |
/** |
|
33 |
* This class handles deltaT replication checking. Whenever this TimerTask |
|
34 |
* is fired it checks each server in xml_replication for updates and updates |
|
35 |
* the local db as needed. |
|
36 |
*/ |
|
37 |
public class ReplicationHandler extends TimerTask |
|
38 |
{ |
|
39 |
MetaCatUtil util = new MetaCatUtil(); |
|
40 |
Hashtable serverList = new Hashtable(); |
|
41 |
Connection conn; |
|
42 |
PrintWriter out; |
|
43 |
|
|
44 |
public ReplicationHandler(PrintWriter o) |
|
45 |
{ |
|
46 |
this.out = o; |
|
47 |
} |
|
48 |
|
|
49 |
/** |
|
50 |
* Method that implements TimerTask.run(). It runs whenever the timer is |
|
51 |
* fired. |
|
52 |
*/ |
|
53 |
public void run() |
|
54 |
{ |
|
55 |
System.out.println("replicationHandler is running"); |
|
56 |
//find out the last_checked time of each server in the server list and |
|
57 |
//send a query to each server to see if there are any documents in |
|
58 |
//xml_documents with an update_date > last_checked |
|
59 |
try |
|
60 |
{ |
|
61 |
conn = util.openDBConnection(); |
|
62 |
serverList = buildServerList(conn); |
|
63 |
System.out.println("Server list: " + serverList.toString()); |
|
64 |
update(serverList, conn); |
|
65 |
} |
|
66 |
catch (Exception e) |
|
67 |
{ |
|
68 |
System.out.println("Error in replicationHandler.run(): " + e.getMessage()); |
|
69 |
} |
|
70 |
} |
|
71 |
|
|
72 |
/** |
|
73 |
* Method to check each server in the servetList for updates |
|
74 |
*/ |
|
75 |
private void update(Hashtable serverList, Connection conn) |
|
76 |
{ |
|
77 |
/* |
|
78 |
Pseudo-algorithm: |
|
79 |
-build a list of servers that need to be checked |
|
80 |
-check each server to see if any documents were modified after the |
|
81 |
server's update_date |
|
82 |
-if there are documents that need to be updated pull them here |
|
83 |
and update the one currently in the database or add a new one |
|
84 |
if it is not already here. |
|
85 |
-update each servers' update_date to the current time |
|
86 |
*/ |
|
87 |
PreparedStatement pstmt; |
|
88 |
Enumeration keys; |
|
89 |
int istreamInt; |
|
90 |
char istreamChar; |
|
91 |
StringBuffer serverResponse = new StringBuffer(); |
|
92 |
String server; |
|
93 |
String update; |
|
94 |
Vector responses = new Vector(); |
|
95 |
ReplMessageHandler message = new ReplMessageHandler(); |
|
96 |
|
|
97 |
try |
|
98 |
{ |
|
99 |
//build a list of servers with updated documents. Choose the newest |
|
100 |
//one out of the list, update this server, update last_checked |
|
101 |
|
|
102 |
keys = serverList.keys(); |
|
103 |
while(keys.hasMoreElements()) |
|
104 |
{ //update from one server at a time |
|
105 |
server = (String)(keys.nextElement()); |
|
106 |
update = (String)(serverList.get(server)); |
|
107 |
//send the server a date and it will send back any docid that has |
|
108 |
//been modified after that date |
|
109 |
|
|
110 |
update = update.replace(' ', '+'); |
|
111 |
|
|
112 |
URL u = new URL("http://" + server + "?update=" + update); |
|
113 |
InputStreamReader istream = new InputStreamReader(u.openStream()); |
|
114 |
while((istreamInt = istream.read()) != -1) |
|
115 |
{ |
|
116 |
istreamChar = (char)istreamInt; |
|
117 |
serverResponse.append(istreamChar); |
|
118 |
} |
|
119 |
responses.add(serverResponse.toString()); //list of updates |
|
120 |
} |
|
121 |
//System.out.println("responses: " + responses.toString()); |
|
122 |
|
|
123 |
//initialize the parser |
|
124 |
XMLReader parser = initParser(message); |
|
125 |
for(int i=0; i<responses.size(); i++) |
|
126 |
{ //parse the xml and get the result |
|
127 |
parser.parse(new InputSource( |
|
128 |
new StringReader( |
|
129 |
(String)(responses.elementAt(i))))); |
|
130 |
Vector v = new Vector(message.getResultVect()); |
|
131 |
for(int j=0; j<v.size(); j++) |
|
132 |
{ |
|
133 |
Vector w = new Vector((Vector)(v.elementAt(j))); |
|
134 |
System.out.print("param " + j + ": " + w.toString()); |
|
135 |
//so now we have a list of the documents that need to be updated, so |
|
136 |
//now we need to request them. from the server and update them here |
|
137 |
} |
|
138 |
System.out.println(""); |
|
139 |
} |
|
140 |
|
|
141 |
|
|
142 |
} |
|
143 |
catch(Exception e) |
|
144 |
{ |
|
145 |
System.out.println("Error in replicationHandler.update(): " + |
|
146 |
e.getMessage()); |
|
147 |
} |
|
148 |
} |
|
149 |
|
|
150 |
/** |
|
151 |
* Method to initialize the message parser |
|
152 |
*/ |
|
153 |
private static XMLReader initParser(ReplMessageHandler rmh) |
|
154 |
throws Exception |
|
155 |
{ |
|
156 |
XMLReader parser = null; |
|
157 |
|
|
158 |
try { |
|
159 |
ContentHandler chandler = rmh; |
|
160 |
|
|
161 |
// Get an instance of the parser |
|
162 |
MetaCatUtil util = new MetaCatUtil(); |
|
163 |
String parserName = util.getOption("saxparser"); |
|
164 |
parser = XMLReaderFactory.createXMLReader(parserName); |
|
165 |
|
|
166 |
// Turn off validation |
|
167 |
parser.setFeature("http://xml.org/sax/features/validation", false); |
|
168 |
|
|
169 |
parser.setContentHandler((ContentHandler)chandler); |
|
170 |
parser.setErrorHandler((ErrorHandler)chandler); |
|
171 |
|
|
172 |
} catch (Exception e) { |
|
173 |
throw e; |
|
174 |
} |
|
175 |
|
|
176 |
return parser; |
|
177 |
} |
|
178 |
|
|
179 |
/** |
|
180 |
* Method to query xml_replication and build a hashtable of each server |
|
181 |
* and it's last update time. |
|
182 |
* @param conn a connection to the database |
|
183 |
*/ |
|
184 |
private Hashtable buildServerList(Connection conn) |
|
185 |
{ |
|
186 |
Hashtable sl = new Hashtable(); |
|
187 |
PreparedStatement pstmt; |
|
188 |
try |
|
189 |
{ |
|
190 |
pstmt = conn.prepareStatement("select server, last_checked from " + |
|
191 |
"xml_replication"); |
|
192 |
pstmt.execute(); |
|
193 |
ResultSet rs = pstmt.getResultSet(); |
|
194 |
boolean tableHasRows = rs.next(); |
|
195 |
while(tableHasRows) |
|
196 |
{ |
|
197 |
sl.put(rs.getString(1), rs.getString(2)); |
|
198 |
tableHasRows = rs.next(); |
|
199 |
} |
|
200 |
} |
|
201 |
catch(Exception e) |
|
202 |
{ |
|
203 |
System.out.println("error in replicationHandler.buildServerList(): " + |
|
204 |
e.getMessage()); |
|
205 |
} |
|
206 |
return sl; |
|
207 |
} |
|
208 |
} |
|
0 | 209 |
Also available in: Unified diff
changed naming scheme