Project

General

Profile

« Previous | Next » 

Revision 520

Added by berkley about 24 years ago

a timer based thread that does Delta-T replication checking.

View differences:

src/edu/ucsb/nceas/metacat/replicationHandler.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

  
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import org.xml.sax.AttributeList;
22
import org.xml.sax.ContentHandler;
23
import org.xml.sax.DTDHandler;
24
import org.xml.sax.EntityResolver;
25
import org.xml.sax.ErrorHandler;
26
import org.xml.sax.InputSource;
27
import org.xml.sax.XMLReader;
28
import org.xml.sax.SAXException;
29
import org.xml.sax.SAXParseException;
30
import org.xml.sax.helpers.XMLReaderFactory;
31

  
32
/**
33
 * This class handles deltaT replication checking.  Whenever this TimerTask
34
 * is fired it checks each server in xml_replication for updates and updates
35
 * the local db as needed.
36
 */
37
public class replicationHandler extends TimerTask
38
{
39
  MetaCatUtil util = new MetaCatUtil();
40
  Hashtable serverList = new Hashtable(); 
41
  Connection conn;
42
  PrintWriter out;
43
  
44
  public replicationHandler(PrintWriter o)
45
  {
46
    this.out = o;
47
  }
48
  
49
  /**
50
   * Method that implements TimerTask.run().  It runs whenever the timer is 
51
   * fired.
52
   */
53
  public void run()
54
  {
55
    System.out.println("replicationHandler is running");
56
    //find out the last_checked time of each server in the server list and
57
    //send a query to each server to see if there are any documents in 
58
    //xml_documents with an update_date > last_checked
59
    try
60
    {
61
      conn = util.openDBConnection();
62
      serverList = buildServerList(conn);
63
      System.out.println("Server list: " + serverList.toString());
64
      update(serverList, conn);
65
    }
66
    catch (Exception e)
67
    {
68
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
69
    }
70
  }
71
  
72
  private void update(Hashtable serverList, Connection conn)
73
  {
74
    PreparedStatement pstmt;
75
    Enumeration keys;
76
    int istreamInt;
77
    char istreamChar;
78
    StringBuffer serverResponse = new StringBuffer();
79
    String server;
80
    String update;
81
    Vector responses = new Vector();
82
    replMessageHandler message = new replMessageHandler();
83
    
84
    try
85
    {
86
      //build a list of servers with updated documents.  Choose the newest
87
      //one out of the list, update this server, update last_checked
88
      
89
      keys = serverList.keys();
90
      while(keys.hasMoreElements())
91
      { //update from one server at a time
92
        server = (String)(keys.nextElement());
93
        update = (String)(serverList.get(server));
94
        //send the server a date and it will send back any docid that has 
95
        //been modified after that date
96
        
97
        update = update.replace(' ', '+');
98
        
99
        URL u = new URL("http://" + server + "?update=" + update);
100
        InputStreamReader istream = new InputStreamReader(u.openStream());
101
        while((istreamInt = istream.read()) != -1)
102
        {
103
          istreamChar = (char)istreamInt;
104
          serverResponse.append(istreamChar);
105
        }
106
        responses.add(serverResponse.toString()); //list of updates
107
      }
108
      //System.out.println("responses: " + responses.toString());
109

  
110
      //initialize the parser
111
      XMLReader parser = initParser(message);
112
      for(int i=0; i<responses.size(); i++)
113
      { //parse the xml and get the result
114
        parser.parse(new InputSource(
115
                     new StringReader(
116
                     (String)(responses.elementAt(i)))));
117
        Vector v = new Vector(message.getResultVect());
118
        for(int j=0; j<v.size(); j++)
119
        {
120
          Vector w = new Vector((Vector)(v.elementAt(j)));
121
          System.out.print("param " + j + ": " + w.toString());
122
          //so now we have a list of the documents that need to be updated, so 
123
          //now we need to request them. from the server and update them here  
124
        }
125
        System.out.println("");
126
      }
127
      
128
      
129
    }
130
    catch(Exception e)
131
    {
132
      System.out.println("Error in replicationHandler.update(): " + 
133
                         e.getMessage());
134
    }
135
  }
136
  
137
  /**
138
   * Method to initialize the message parser
139
   */
140
  private static XMLReader initParser(replMessageHandler rmh)
141
          throws Exception
142
  {
143
    XMLReader parser = null;
144

  
145
    try {
146
      ContentHandler chandler = rmh;
147

  
148
      // Get an instance of the parser
149
      MetaCatUtil util = new MetaCatUtil();
150
      String parserName = util.getOption("saxparser");
151
      parser = XMLReaderFactory.createXMLReader(parserName);
152

  
153
      // Turn off validation
154
      parser.setFeature("http://xml.org/sax/features/validation", false);
155
      
156
      // Set Handlers in the parser
157
      /*
158
      parser.setProperty("http://xml.org/sax/properties/declaration-handler",
159
                         chandler);
160
      parser.setProperty("http://xml.org/sax/properties/lexical-handler",
161
                         chandler);
162
      */
163
      parser.setContentHandler((ContentHandler)chandler);
164
      parser.setErrorHandler((ErrorHandler)chandler);
165

  
166
    } catch (Exception e) {
167
      throw e;
168
    }
169

  
170
    return parser;
171
  }
172
  
173
  /**
174
   * Method to query xml_replication and build a hashtable of each server
175
   * and it's last update time.
176
   * @param conn a connection to the database
177
   */
178
  private Hashtable buildServerList(Connection conn)
179
  {
180
    Hashtable sl = new Hashtable();
181
    PreparedStatement pstmt;   
182
    try
183
    {
184
      pstmt = conn.prepareStatement("select server, last_checked from " +
185
                                    "xml_replication");
186
      pstmt.execute();
187
      ResultSet rs = pstmt.getResultSet();
188
      boolean tableHasRows = rs.next();
189
      while(tableHasRows)
190
      {
191
        sl.put(rs.getString(1), rs.getString(2));
192
        tableHasRows = rs.next();
193
      }
194
    }
195
    catch(Exception e)
196
    {
197
      System.out.println("error in replicationHandler.buildServerList(): " +
198
                         e.getMessage());
199
    }
200
    return sl;
201
  }
202
}
0 203

  

Also available in: Unified diff