Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-11-13 15:31:02 -0800 (Mon, 13 Nov 2000) $'
11
 * '$Revision: 533 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import org.xml.sax.AttributeList;
22
import org.xml.sax.ContentHandler;
23
import org.xml.sax.DTDHandler;
24
import org.xml.sax.EntityResolver;
25
import org.xml.sax.ErrorHandler;
26
import org.xml.sax.InputSource;
27
import org.xml.sax.XMLReader;
28
import org.xml.sax.SAXException;
29
import org.xml.sax.SAXParseException;
30
import org.xml.sax.helpers.XMLReaderFactory;
31

    
32
/**
33
 * This class handles deltaT replication checking.  Whenever this TimerTask
34
 * is fired it checks each server in xml_replication for updates and updates
35
 * the local db as needed.
36
 */
37
public class ReplicationHandler extends TimerTask
38
{
39
  MetaCatUtil util = new MetaCatUtil();
40
  Hashtable serverList = new Hashtable(); 
41
  Connection conn;
42
  PrintWriter out;
43
  
44
  public ReplicationHandler(PrintWriter o)
45
  {
46
    this.out = o;
47
  }
48
  
49
  /**
50
   * Method that implements TimerTask.run().  It runs whenever the timer is 
51
   * fired.
52
   */
53
  public void run()
54
  {
55
    //find out the last_checked time of each server in the server list and
56
    //send a query to each server to see if there are any documents in 
57
    //xml_documents with an update_date > last_checked
58
    try
59
    {
60
      conn = util.openDBConnection();
61
      serverList = buildServerList(conn);
62
      update(serverList, conn);
63
      conn.close();
64
    }
65
    catch (Exception e)
66
    {
67
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
68
    }
69
  }
70
  
71
  /**
72
   * Method to check each server in the servetList for updates
73
   */
74
  private void update(Hashtable serverList, Connection conn)
75
  {
76
    /*
77
     Pseudo-algorithm:
78
     -build a list of servers that need to be checked
79
     -check each server to see if any documents were modified after the 
80
      server's update_date
81
     -if there are documents that need to be updated pull them here
82
      and update the one currently in the database or add a new one
83
      if it is not already here.
84
     -update each servers' update_date to the current time
85
    */
86
    PreparedStatement pstmt;
87
    Enumeration keys;
88
    int istreamInt;
89
    char istreamChar;
90
    StringBuffer serverResponse = new StringBuffer();
91
    String server;
92
    String update;
93
    Vector responses = new Vector();
94
    ReplMessageHandler message = new ReplMessageHandler();
95
    Hashtable updateDocs = new Hashtable();
96
    URL u;
97
    InputStreamReader istream;
98
    
99
    try
100
    {
101
      //build a list of servers with updated documents.  Choose the newest
102
      //one out of the list, update this server, update last_checked
103
      
104
      keys = serverList.keys();
105
      while(keys.hasMoreElements())
106
      { //update from one server at a time
107
        server = (String)(keys.nextElement());
108
        update = (String)(serverList.get(server));
109
        //send the server a date and it will send back any docid that has 
110
        //been modified after that date
111
        
112
        update = update.replace(' ', '+');
113
        
114
        u = new URL("http://" + server + "?update=" + update);
115
        istream = new InputStreamReader(u.openStream());
116
        while((istreamInt = istream.read()) != -1)
117
        {
118
          istreamChar = (char)istreamInt;
119
          serverResponse.append(istreamChar);
120
        }
121
        responses.add(serverResponse.toString()); //list of updates
122
      }
123

    
124
      //initialize the parser
125
      XMLReader parser = initParser(message);
126
      for(int i=0; i<responses.size(); i++)
127
      { //parse the xml and get the result
128
        parser.parse(new InputSource(
129
                     new StringReader(
130
                     (String)(responses.elementAt(i)))));
131
        Vector v = new Vector(message.getResultVect());
132
        for(int j=0; j<v.size(); j++)
133
        { //go through each update vector and update or insert
134
          //a new document
135
          Vector w = new Vector((Vector)(v.elementAt(j)));
136
          String docid = (String)w.elementAt(0);
137
          String docServer = (String)w.elementAt(2);
138

    
139
          //send a message to the server requesting each document
140
          URL getDocURL = new URL("http://" + docServer + "?getdocument="+
141
                      docid);
142
          InputStreamReader getDocIstream = new InputStreamReader(
143
                                                getDocURL.openStream());
144

    
145
          //update the document into the DB
146
          String action = getAction(docid);
147
          String newDocid = DocumentImpl.write(conn, getDocIstream, action, 
148
                                               docid, null, null);
149
          //System.out.println("newDocid: " + newDocid);
150
        }
151
      }
152
      //update the last_update field for each server to the current date/time
153
      
154
      
155
    }
156
    catch(Exception e)
157
    {
158
      System.out.println("Error in replicationHandler.update(): " + 
159
                         e.getMessage());
160
      e.printStackTrace(System.out);
161
    }
162
  }
163
  
164
  /**
165
   * Checks to see if a document is already in the DB.  Returns
166
   * "UPDATE" if it is, "INSERT" if it isn't
167
   */
168
  private static String getAction(String docid)
169
  {
170
    try
171
    {
172
      MetaCatUtil util = new MetaCatUtil();
173
      StringBuffer sql = new StringBuffer();
174
      sql.append("select docid from xml_documents where docid like '");
175
      sql.append(docid).append("'");
176
      Connection conn = util.openDBConnection();
177
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
178
      pstmt.execute();
179
      ResultSet rs = pstmt.getResultSet();
180

    
181
      if(rs.next())
182
      {
183
        conn.close();
184
        return "UPDATE";
185
      }
186
      else
187
      {
188
        conn.close();
189
        return "INSERT";
190
      }
191
    }
192
    catch(Exception e)
193
    {
194
      System.out.println("error in replicationHandler.getAction: " + 
195
                          e.getMessage());
196
    }
197
    return "";
198
  }
199
  
200
  /**
201
   * Method to initialize the message parser
202
   */
203
  private static XMLReader initParser(ReplMessageHandler rmh)
204
          throws Exception
205
  {
206
    XMLReader parser = null;
207

    
208
    try {
209
      ContentHandler chandler = rmh;
210

    
211
      // Get an instance of the parser
212
      MetaCatUtil util = new MetaCatUtil();
213
      String parserName = util.getOption("saxparser");
214
      parser = XMLReaderFactory.createXMLReader(parserName);
215

    
216
      // Turn off validation
217
      parser.setFeature("http://xml.org/sax/features/validation", false);
218
      
219
      parser.setContentHandler((ContentHandler)chandler);
220
      parser.setErrorHandler((ErrorHandler)chandler);
221

    
222
    } catch (Exception e) {
223
      throw e;
224
    }
225

    
226
    return parser;
227
  }
228
  
229
  /**
230
   * Method to query xml_replication and build a hashtable of each server
231
   * and it's last update time.
232
   * @param conn a connection to the database
233
   */
234
  private Hashtable buildServerList(Connection conn)
235
  {
236
    Hashtable sl = new Hashtable();
237
    PreparedStatement pstmt;   
238
    try
239
    {
240
      pstmt = conn.prepareStatement("select server, last_checked from " +
241
                                    "xml_replication");
242
      pstmt.execute();
243
      ResultSet rs = pstmt.getResultSet();
244
      boolean tableHasRows = rs.next();
245
      while(tableHasRows)
246
      {
247
        sl.put(rs.getString(1), rs.getString(2));
248
        tableHasRows = rs.next();
249
      }
250
    }
251
    catch(Exception e)
252
    {
253
      System.out.println("error in replicationHandler.buildServerList(): " +
254
                         e.getMessage());
255
    }
256
    return sl;
257
  }
258
}
(33-33/36)