Project

General

Profile

« Previous | Next » 

Revision 522

Added by berkley about 24 years ago

changed naming scheme

View differences:

src/edu/ucsb/nceas/metacat/replMessageHandler.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class that handles xml messages  passed by  the replication handler
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13

  
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.sql.*;
17
import java.util.Stack;
18
import java.util.Vector;
19
import java.util.Enumeration;
20
import java.util.EmptyStackException;
21

  
22
import org.xml.sax.Attributes;
23
import org.xml.sax.SAXException;
24
import org.xml.sax.SAXParseException;
25
import org.xml.sax.ext.DeclHandler;
26
import org.xml.sax.ext.LexicalHandler;
27
import org.xml.sax.helpers.DefaultHandler;
28

  
29
/** 
30
 * A database aware Class implementing callback bethods for the SAX parser to
31
 * call when processing the XML messages from the replication handler
32
 */
33
public class replMessageHandler extends DefaultHandler 
34
{
35
  private Vector updates = new Vector();
36
  private Vector indivUpdate = new Vector();
37
  String currentTag = new String();
38
  
39
  public replMessageHandler()
40
  {
41
  }
42
  
43
  /**
44
   * This method starts a new vector for each updatedDocument tag.
45
   */
46
  public void startElement(String uri, String localName, String qName, 
47
                           Attributes attributes) throws SAXException
48
  {
49
    currentTag = localName;
50
    if(localName.equals("updatedDocument"))
51
    {
52
      indivUpdate = new Vector();
53
    }
54
  }
55
  
56
  /**
57
   * This method write the indivUpdate to updates when it finds the end of
58
   */
59
  public void endElement(String uri, String localName, String qName) 
60
              throws SAXException
61
  {
62
    if(localName.equals("updatedDocument"))
63
    {
64
      updates.add(new Vector(indivUpdate));
65
    }
66
  }
67
  
68
  /**
69
   * Take the data out of the docid and date_updated fields
70
   */
71
  public void characters(char[] ch, int start, int length) throws SAXException
72
  {
73
    if(currentTag.equals("docid"))
74
    {
75
      indivUpdate.add(new String(ch, start, length));
76
    }
77
    if(currentTag.equals("date_updated"))
78
    {
79
      indivUpdate.add(new String(ch, start, length));
80
    }
81
  }
82
  
83
  public Vector getResultVect()
84
  {
85
    return updates;
86
  }
87
  
88
}
89 0

  
src/edu/ucsb/nceas/metacat/replicationHandler.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import org.xml.sax.AttributeList;
22
import org.xml.sax.ContentHandler;
23
import org.xml.sax.DTDHandler;
24
import org.xml.sax.EntityResolver;
25
import org.xml.sax.ErrorHandler;
26
import org.xml.sax.InputSource;
27
import org.xml.sax.XMLReader;
28
import org.xml.sax.SAXException;
29
import org.xml.sax.SAXParseException;
30
import org.xml.sax.helpers.XMLReaderFactory;
31

  
32
/**
33
 * This class handles deltaT replication checking.  Whenever this TimerTask
34
 * is fired it checks each server in xml_replication for updates and updates
35
 * the local db as needed.
36
 */
37
public class replicationHandler extends TimerTask
38
{
39
  MetaCatUtil util = new MetaCatUtil();
40
  Hashtable serverList = new Hashtable(); 
41
  Connection conn;
42
  PrintWriter out;
43
  
44
  public replicationHandler(PrintWriter o)
45
  {
46
    this.out = o;
47
  }
48
  
49
  /**
50
   * Method that implements TimerTask.run().  It runs whenever the timer is 
51
   * fired.
52
   */
53
  public void run()
54
  {
55
    System.out.println("replicationHandler is running");
56
    //find out the last_checked time of each server in the server list and
57
    //send a query to each server to see if there are any documents in 
58
    //xml_documents with an update_date > last_checked
59
    try
60
    {
61
      conn = util.openDBConnection();
62
      serverList = buildServerList(conn);
63
      System.out.println("Server list: " + serverList.toString());
64
      update(serverList, conn);
65
    }
66
    catch (Exception e)
67
    {
68
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
69
    }
70
  }
71
  
72
  private void update(Hashtable serverList, Connection conn)
73
  {
74
    PreparedStatement pstmt;
75
    Enumeration keys;
76
    int istreamInt;
77
    char istreamChar;
78
    StringBuffer serverResponse = new StringBuffer();
79
    String server;
80
    String update;
81
    Vector responses = new Vector();
82
    replMessageHandler message = new replMessageHandler();
83
    
84
    try
85
    {
86
      //build a list of servers with updated documents.  Choose the newest
87
      //one out of the list, update this server, update last_checked
88
      
89
      keys = serverList.keys();
90
      while(keys.hasMoreElements())
91
      { //update from one server at a time
92
        server = (String)(keys.nextElement());
93
        update = (String)(serverList.get(server));
94
        //send the server a date and it will send back any docid that has 
95
        //been modified after that date
96
        
97
        update = update.replace(' ', '+');
98
        
99
        URL u = new URL("http://" + server + "?update=" + update);
100
        InputStreamReader istream = new InputStreamReader(u.openStream());
101
        while((istreamInt = istream.read()) != -1)
102
        {
103
          istreamChar = (char)istreamInt;
104
          serverResponse.append(istreamChar);
105
        }
106
        responses.add(serverResponse.toString()); //list of updates
107
      }
108
      //System.out.println("responses: " + responses.toString());
109

  
110
      //initialize the parser
111
      XMLReader parser = initParser(message);
112
      for(int i=0; i<responses.size(); i++)
113
      { //parse the xml and get the result
114
        parser.parse(new InputSource(
115
                     new StringReader(
116
                     (String)(responses.elementAt(i)))));
117
        Vector v = new Vector(message.getResultVect());
118
        for(int j=0; j<v.size(); j++)
119
        {
120
          Vector w = new Vector((Vector)(v.elementAt(j)));
121
          System.out.print("param " + j + ": " + w.toString());
122
          //so now we have a list of the documents that need to be updated, so 
123
          //now we need to request them. from the server and update them here  
124
        }
125
        System.out.println("");
126
      }
127
      
128
      
129
    }
130
    catch(Exception e)
131
    {
132
      System.out.println("Error in replicationHandler.update(): " + 
133
                         e.getMessage());
134
    }
135
  }
136
  
137
  /**
138
   * Method to initialize the message parser
139
   */
140
  private static XMLReader initParser(replMessageHandler rmh)
141
          throws Exception
142
  {
143
    XMLReader parser = null;
144

  
145
    try {
146
      ContentHandler chandler = rmh;
147

  
148
      // Get an instance of the parser
149
      MetaCatUtil util = new MetaCatUtil();
150
      String parserName = util.getOption("saxparser");
151
      parser = XMLReaderFactory.createXMLReader(parserName);
152

  
153
      // Turn off validation
154
      parser.setFeature("http://xml.org/sax/features/validation", false);
155
      
156
      // Set Handlers in the parser
157
      /*
158
      parser.setProperty("http://xml.org/sax/properties/declaration-handler",
159
                         chandler);
160
      parser.setProperty("http://xml.org/sax/properties/lexical-handler",
161
                         chandler);
162
      */
163
      parser.setContentHandler((ContentHandler)chandler);
164
      parser.setErrorHandler((ErrorHandler)chandler);
165

  
166
    } catch (Exception e) {
167
      throw e;
168
    }
169

  
170
    return parser;
171
  }
172
  
173
  /**
174
   * Method to query xml_replication and build a hashtable of each server
175
   * and it's last update time.
176
   * @param conn a connection to the database
177
   */
178
  private Hashtable buildServerList(Connection conn)
179
  {
180
    Hashtable sl = new Hashtable();
181
    PreparedStatement pstmt;   
182
    try
183
    {
184
      pstmt = conn.prepareStatement("select server, last_checked from " +
185
                                    "xml_replication");
186
      pstmt.execute();
187
      ResultSet rs = pstmt.getResultSet();
188
      boolean tableHasRows = rs.next();
189
      while(tableHasRows)
190
      {
191
        sl.put(rs.getString(1), rs.getString(2));
192
        tableHasRows = rs.next();
193
      }
194
    }
195
    catch(Exception e)
196
    {
197
      System.out.println("error in replicationHandler.buildServerList(): " +
198
                         e.getMessage());
199
    }
200
    return sl;
201
  }
202
}
203 0

  
src/edu/ucsb/nceas/metacat/metacatReplication.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13

  
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24
import oracle.xml.parser.v2.*;
25
import org.xml.sax.*;
26

  
27
public class metacatReplication extends HttpServlet
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  MetaCatUtil util = new MetaCatUtil();
32
  
33
  /**
34
   * Initialize the servlet by creating appropriate database connections
35
   */
36
  public void init(ServletConfig config) throws ServletException 
37
  {
38
    //initialize db connections to handle any update requests
39
    MetaCatUtil util = new MetaCatUtil();
40
    deltaT = util.getOption("deltaT");
41
    //break off a thread to do the delta-T check
42
    replicationDaemon = new Timer(true);
43
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
44
    //                                      new Integer(deltaT).intValue() * 1000);
45
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
46
    //                   + " seconds");
47
    
48
  }
49
  
50
  public void destroy() 
51
  {
52
    replicationDaemon.cancel();
53
    System.out.println("Replication daemon cancelled.");
54
  }
55
  
56
  public void doGet (HttpServletRequest request, HttpServletResponse response)
57
                     throws ServletException, IOException 
58
  {
59
    // Process the data and send back the response
60
    handleGetOrPost(request, response);
61
  }
62

  
63
  public void doPost(HttpServletRequest request, HttpServletResponse response)
64
                     throws ServletException, IOException 
65
  {
66
    // Process the data and send back the response
67
    handleGetOrPost(request, response);
68
  }
69
  
70
  private void handleGetOrPost(HttpServletRequest request, 
71
                               HttpServletResponse response) 
72
                               throws ServletException, IOException 
73
  {
74
    PrintWriter out = response.getWriter();
75
    
76
    Hashtable params = new Hashtable();
77
    Enumeration paramlist = request.getParameterNames();
78
    
79
    while (paramlist.hasMoreElements()) 
80
    {
81
      String name = (String)paramlist.nextElement();
82
      String[] value = request.getParameterValues(name);
83
      params.put(name, value);  
84
    }
85
    
86
    if(params.containsKey("stop"))
87
    {
88
      if(((String[])params.get("stop"))[0].equals("true"))
89
      {  
90
        replicationDaemon.cancel();
91
        out.println("Replication Handler Stopped");
92
      }
93
    }
94
    
95
    if(params.containsKey("start"))
96
    {
97
      if(((String[])params.get("start"))[0].equals("true"))
98
      {
99
        int rate;
100
        if(params.containsKey("rate"))
101
          rate = new Integer(
102
                 new String(((String[])params.get("rate"))[0])).intValue();
103
        else
104
          rate = 1000;
105

  
106
        out.println("New rate is: " + rate + " seconds.");
107
        replicationDaemon.cancel();
108
        replicationDaemon = new Timer(true);
109
        replicationDaemon.scheduleAtFixedRate(new replicationHandler(out), 0, 
110
                                              rate * 1000);
111
        out.println("Replication Handler Started");
112
      }
113
    }
114
    
115
    if(params.containsKey("update"))
116
    {
117
      handleUpdateRequest(out, params, response);
118
    }
119
  }
120
  
121
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
122
                                   HttpServletResponse response)
123
  {
124
    System.out.println("incoming request for dt/time " + 
125
                       ((String[])params.get("update"))[0] +
126
                       " from external metacat");
127
    response.setContentType("text/xml");
128
    out.println("<replication><textmessage>In metacatReplication</textmessage>");
129
    
130
    StringBuffer sql = new StringBuffer();
131
    StringBuffer returnXML = new StringBuffer();
132
    String updateStr = ((String[])params.get("update"))[0];
133
    updateStr = updateStr.replace('+', ' ');
134
    //pseudo algorithm:
135
    ///////////////////////////////////////////////////////////////////////
136
    //get the date/time from the requestor, query the db for any documents
137
    //that have an update date later than the requested date and send
138
    //those docids back to the requestor.  If there are no newer documents
139
    //then send back an up-to-date message.
140
    ///////////////////////////////////////////////////////////////////////
141
    
142
    //Timestamp update = Timestamp.valueOf(updateStr);
143
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
144
    java.util.Date update = new java.util.Date();
145
    ParsePosition pos = new ParsePosition(0);
146
    update = formatter.parse(updateStr, pos);
147
    String dateString = formatter.format(update);
148
    sql.append("select docid, date_updated from xml_documents where ");
149
    sql.append("date_updated > ");
150
    sql.append("to_date('").append(dateString).append("','YY-MM-DD HH24:MI:SS')");
151
    //System.out.println("sql: " + sql.toString());
152
    
153
    try
154
    {
155
      Connection conn = util.openDBConnection();
156
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
157
      pstmt.execute();
158
      ResultSet rs = pstmt.getResultSet();
159
      boolean tablehasrows = rs.next();
160
      returnXML.append("<server>").append(util.getOption("server"));
161
      returnXML.append("</server><updates>");
162
      while(tablehasrows)
163
      {
164
        returnXML.append("<updatedDocument><docid>").append(rs.getString(1));
165
        returnXML.append("</docid><date_updated>").append(rs.getString(2));
166
        returnXML.append("</date_updated></updatedDocument>");
167
        tablehasrows = rs.next();
168
      }
169
      returnXML.append("</updates></replication>");
170
      out.print(returnXML.toString());
171
    }
172
    catch(Exception e)
173
    {
174
      System.out.println("Exception in metacatReplication: " + e.getMessage());
175
    }
176
  }
177
}
178 0

  
src/edu/ucsb/nceas/metacat/MetacatReplication.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13

  
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24
import oracle.xml.parser.v2.*;
25
import org.xml.sax.*;
26

  
27
public class MetacatReplication extends HttpServlet
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  MetaCatUtil util = new MetaCatUtil();
32
  
33
  /**
34
   * Initialize the servlet by creating appropriate database connections
35
   */
36
  public void init(ServletConfig config) throws ServletException 
37
  {
38
    //initialize db connections to handle any update requests
39
    MetaCatUtil util = new MetaCatUtil();
40
    deltaT = util.getOption("deltaT");
41
    //break off a thread to do the delta-T check
42
    replicationDaemon = new Timer(true);
43
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
44
    //                                      new Integer(deltaT).intValue() * 1000);
45
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
46
    //                   + " seconds");
47
    
48
  }
49
  
50
  public void destroy() 
51
  {
52
    replicationDaemon.cancel();
53
    System.out.println("Replication daemon cancelled.");
54
  }
55
  
56
  public void doGet (HttpServletRequest request, HttpServletResponse response)
57
                     throws ServletException, IOException 
58
  {
59
    // Process the data and send back the response
60
    handleGetOrPost(request, response);
61
  }
62

  
63
  public void doPost(HttpServletRequest request, HttpServletResponse response)
64
                     throws ServletException, IOException 
65
  {
66
    // Process the data and send back the response
67
    handleGetOrPost(request, response);
68
  }
69
  
70
  private void handleGetOrPost(HttpServletRequest request, 
71
                               HttpServletResponse response) 
72
                               throws ServletException, IOException 
73
  {
74
    PrintWriter out = response.getWriter();
75
    
76
    Hashtable params = new Hashtable();
77
    Enumeration paramlist = request.getParameterNames();
78
    
79
    while (paramlist.hasMoreElements()) 
80
    {
81
      String name = (String)paramlist.nextElement();
82
      String[] value = request.getParameterValues(name);
83
      params.put(name, value);  
84
    }
85
    
86
    if(params.containsKey("stop"))
87
    {
88
      if(((String[])params.get("stop"))[0].equals("true"))
89
      {  
90
        replicationDaemon.cancel();
91
        out.println("Replication Handler Stopped");
92
      }
93
    }
94
    
95
    if(params.containsKey("start"))
96
    {
97
      if(((String[])params.get("start"))[0].equals("true"))
98
      {
99
        int rate;
100
        if(params.containsKey("rate"))
101
          rate = new Integer(
102
                 new String(((String[])params.get("rate"))[0])).intValue();
103
        else
104
          rate = 1000;
105

  
106
        out.println("New rate is: " + rate + " seconds.");
107
        replicationDaemon.cancel();
108
        replicationDaemon = new Timer(true);
109
        replicationDaemon.scheduleAtFixedRate(new replicationHandler(out), 0, 
110
                                              rate * 1000);
111
        out.println("Replication Handler Started");
112
      }
113
    }
114
    
115
    if(params.containsKey("update"))
116
    {
117
      handleUpdateRequest(out, params, response);
118
    }
119
  }
120
  
121
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
122
                                   HttpServletResponse response)
123
  {
124
    System.out.println("incoming request for dt/time " + 
125
                       ((String[])params.get("update"))[0] +
126
                       " from external metacat");
127
    response.setContentType("text/xml");
128
    out.println("<replication><textmessage>In metacatReplication</textmessage>");
129
    
130
    StringBuffer sql = new StringBuffer();
131
    StringBuffer returnXML = new StringBuffer();
132
    String updateStr = ((String[])params.get("update"))[0];
133
    updateStr = updateStr.replace('+', ' ');
134
    //pseudo algorithm:
135
    ///////////////////////////////////////////////////////////////////////
136
    //get the date/time from the requestor, query the db for any documents
137
    //that have an update date later than the requested date and send
138
    //those docids back to the requestor.  If there are no newer documents
139
    //then send back an up-to-date message.
140
    ///////////////////////////////////////////////////////////////////////
141
    
142
    //Timestamp update = Timestamp.valueOf(updateStr);
143
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
144
    java.util.Date update = new java.util.Date();
145
    ParsePosition pos = new ParsePosition(0);
146
    update = formatter.parse(updateStr, pos);
147
    String dateString = formatter.format(update);
148
    sql.append("select docid, date_updated from xml_documents where ");
149
    sql.append("date_updated > ");
150
    sql.append("to_date('").append(dateString).append("','YY-MM-DD HH24:MI:SS')");
151
    //System.out.println("sql: " + sql.toString());
152
    
153
    try
154
    {
155
      Connection conn = util.openDBConnection();
156
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
157
      pstmt.execute();
158
      ResultSet rs = pstmt.getResultSet();
159
      boolean tablehasrows = rs.next();
160
      returnXML.append("<server>").append(util.getOption("server"));
161
      returnXML.append("</server><updates>");
162
      while(tablehasrows)
163
      {
164
        returnXML.append("<updatedDocument><docid>").append(rs.getString(1));
165
        returnXML.append("</docid><date_updated>").append(rs.getString(2));
166
        returnXML.append("</date_updated></updatedDocument>");
167
        tablehasrows = rs.next();
168
      }
169
      returnXML.append("</updates></replication>");
170
      out.print(returnXML.toString());
171
    }
172
    catch(Exception e)
173
    {
174
      System.out.println("Exception in metacatReplication: " + e.getMessage());
175
    }
176
  }
177
}
0 178

  
src/edu/ucsb/nceas/metacat/ReplMessageHandler.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class that handles xml messages  passed by  the replication handler
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13

  
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.sql.*;
17
import java.util.Stack;
18
import java.util.Vector;
19
import java.util.Enumeration;
20
import java.util.EmptyStackException;
21

  
22
import org.xml.sax.Attributes;
23
import org.xml.sax.SAXException;
24
import org.xml.sax.SAXParseException;
25
import org.xml.sax.ext.DeclHandler;
26
import org.xml.sax.ext.LexicalHandler;
27
import org.xml.sax.helpers.DefaultHandler;
28

  
29
/** 
30
 * A database aware Class implementing callback bethods for the SAX parser to
31
 * call when processing the XML messages from the replication handler
32
 */
33
public class ReplMessageHandler extends DefaultHandler 
34
{
35
  private Vector updates = new Vector();
36
  private Vector indivUpdate = new Vector();
37
  String currentTag = new String();
38
  
39
  public ReplMessageHandler()
40
  {
41
  }
42
  
43
  /**
44
   * This method starts a new vector for each updatedDocument tag.
45
   */
46
  public void startElement(String uri, String localName, String qName, 
47
                           Attributes attributes) throws SAXException
48
  {
49
    currentTag = localName;
50
    if(localName.equals("updatedDocument"))
51
    {
52
      indivUpdate = new Vector();
53
    }
54
  }
55
  
56
  /**
57
   * This method write the indivUpdate to updates when it finds the end of
58
   */
59
  public void endElement(String uri, String localName, String qName) 
60
              throws SAXException
61
  {
62
    if(localName.equals("updatedDocument"))
63
    {
64
      updates.add(new Vector(indivUpdate));
65
    }
66
  }
67
  
68
  /**
69
   * Take the data out of the docid and date_updated fields
70
   */
71
  public void characters(char[] ch, int start, int length) throws SAXException
72
  {
73
    if(currentTag.equals("docid"))
74
    {
75
      indivUpdate.add(new String(ch, start, length));
76
    }
77
    if(currentTag.equals("date_updated"))
78
    {
79
      indivUpdate.add(new String(ch, start, length));
80
    }
81
  }
82
  
83
  public Vector getResultVect()
84
  {
85
    return updates;
86
  }
87
  
88
}
0 89

  
src/edu/ucsb/nceas/metacat/ReplicationHandler.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import org.xml.sax.AttributeList;
22
import org.xml.sax.ContentHandler;
23
import org.xml.sax.DTDHandler;
24
import org.xml.sax.EntityResolver;
25
import org.xml.sax.ErrorHandler;
26
import org.xml.sax.InputSource;
27
import org.xml.sax.XMLReader;
28
import org.xml.sax.SAXException;
29
import org.xml.sax.SAXParseException;
30
import org.xml.sax.helpers.XMLReaderFactory;
31

  
32
/**
33
 * This class handles deltaT replication checking.  Whenever this TimerTask
34
 * is fired it checks each server in xml_replication for updates and updates
35
 * the local db as needed.
36
 */
37
public class ReplicationHandler extends TimerTask
38
{
39
  MetaCatUtil util = new MetaCatUtil();
40
  Hashtable serverList = new Hashtable(); 
41
  Connection conn;
42
  PrintWriter out;
43
  
44
  public ReplicationHandler(PrintWriter o)
45
  {
46
    this.out = o;
47
  }
48
  
49
  /**
50
   * Method that implements TimerTask.run().  It runs whenever the timer is 
51
   * fired.
52
   */
53
  public void run()
54
  {
55
    System.out.println("replicationHandler is running");
56
    //find out the last_checked time of each server in the server list and
57
    //send a query to each server to see if there are any documents in 
58
    //xml_documents with an update_date > last_checked
59
    try
60
    {
61
      conn = util.openDBConnection();
62
      serverList = buildServerList(conn);
63
      System.out.println("Server list: " + serverList.toString());
64
      update(serverList, conn);
65
    }
66
    catch (Exception e)
67
    {
68
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
69
    }
70
  }
71
  
72
  /**
73
   * Method to check each server in the servetList for updates
74
   */
75
  private void update(Hashtable serverList, Connection conn)
76
  {
77
    /*
78
     Pseudo-algorithm:
79
     -build a list of servers that need to be checked
80
     -check each server to see if any documents were modified after the 
81
      server's update_date
82
     -if there are documents that need to be updated pull them here
83
      and update the one currently in the database or add a new one
84
      if it is not already here.
85
     -update each servers' update_date to the current time
86
    */
87
    PreparedStatement pstmt;
88
    Enumeration keys;
89
    int istreamInt;
90
    char istreamChar;
91
    StringBuffer serverResponse = new StringBuffer();
92
    String server;
93
    String update;
94
    Vector responses = new Vector();
95
    ReplMessageHandler message = new ReplMessageHandler();
96
    
97
    try
98
    {
99
      //build a list of servers with updated documents.  Choose the newest
100
      //one out of the list, update this server, update last_checked
101
      
102
      keys = serverList.keys();
103
      while(keys.hasMoreElements())
104
      { //update from one server at a time
105
        server = (String)(keys.nextElement());
106
        update = (String)(serverList.get(server));
107
        //send the server a date and it will send back any docid that has 
108
        //been modified after that date
109
        
110
        update = update.replace(' ', '+');
111
        
112
        URL u = new URL("http://" + server + "?update=" + update);
113
        InputStreamReader istream = new InputStreamReader(u.openStream());
114
        while((istreamInt = istream.read()) != -1)
115
        {
116
          istreamChar = (char)istreamInt;
117
          serverResponse.append(istreamChar);
118
        }
119
        responses.add(serverResponse.toString()); //list of updates
120
      }
121
      //System.out.println("responses: " + responses.toString());
122

  
123
      //initialize the parser
124
      XMLReader parser = initParser(message);
125
      for(int i=0; i<responses.size(); i++)
126
      { //parse the xml and get the result
127
        parser.parse(new InputSource(
128
                     new StringReader(
129
                     (String)(responses.elementAt(i)))));
130
        Vector v = new Vector(message.getResultVect());
131
        for(int j=0; j<v.size(); j++)
132
        {
133
          Vector w = new Vector((Vector)(v.elementAt(j)));
134
          System.out.print("param " + j + ": " + w.toString());
135
          //so now we have a list of the documents that need to be updated, so 
136
          //now we need to request them. from the server and update them here  
137
        }
138
        System.out.println("");
139
      }
140
      
141
      
142
    }
143
    catch(Exception e)
144
    {
145
      System.out.println("Error in replicationHandler.update(): " + 
146
                         e.getMessage());
147
    }
148
  }
149
  
150
  /**
151
   * Method to initialize the message parser
152
   */
153
  private static XMLReader initParser(ReplMessageHandler rmh)
154
          throws Exception
155
  {
156
    XMLReader parser = null;
157

  
158
    try {
159
      ContentHandler chandler = rmh;
160

  
161
      // Get an instance of the parser
162
      MetaCatUtil util = new MetaCatUtil();
163
      String parserName = util.getOption("saxparser");
164
      parser = XMLReaderFactory.createXMLReader(parserName);
165

  
166
      // Turn off validation
167
      parser.setFeature("http://xml.org/sax/features/validation", false);
168
      
169
      parser.setContentHandler((ContentHandler)chandler);
170
      parser.setErrorHandler((ErrorHandler)chandler);
171

  
172
    } catch (Exception e) {
173
      throw e;
174
    }
175

  
176
    return parser;
177
  }
178
  
179
  /**
180
   * Method to query xml_replication and build a hashtable of each server
181
   * and it's last update time.
182
   * @param conn a connection to the database
183
   */
184
  private Hashtable buildServerList(Connection conn)
185
  {
186
    Hashtable sl = new Hashtable();
187
    PreparedStatement pstmt;   
188
    try
189
    {
190
      pstmt = conn.prepareStatement("select server, last_checked from " +
191
                                    "xml_replication");
192
      pstmt.execute();
193
      ResultSet rs = pstmt.getResultSet();
194
      boolean tableHasRows = rs.next();
195
      while(tableHasRows)
196
      {
197
        sl.put(rs.getString(1), rs.getString(2));
198
        tableHasRows = rs.next();
199
      }
200
    }
201
    catch(Exception e)
202
    {
203
      System.out.println("error in replicationHandler.buildServerList(): " +
204
                         e.getMessage());
205
    }
206
    return sl;
207
  }
208
}
0 209

  

Also available in: Unified diff