Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-11-20 09:11:53 -0800 (Mon, 20 Nov 2000) $'
11
 * '$Revision: 554 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32

    
33
/**
34
 * This class handles deltaT replication checking.  Whenever this TimerTask
35
 * is fired it checks each server in xml_replication for updates and updates
36
 * the local db as needed.
37
 */
38
public class ReplicationHandler extends TimerTask
39
{
40
  MetaCatUtil util = new MetaCatUtil();
41
  Hashtable serverList = new Hashtable(); 
42
  Connection conn;
43
  PrintWriter out;
44
  
45
  public ReplicationHandler(PrintWriter o)
46
  {
47
    this.out = o;
48
  }
49
  
50
  /**
51
   * Method that implements TimerTask.run().  It runs whenever the timer is 
52
   * fired.
53
   */
54
  public void run()
55
  {
56
    //find out the last_checked time of each server in the server list and
57
    //send a query to each server to see if there are any documents in 
58
    //xml_documents with an update_date > last_checked
59
    try
60
    {
61
      conn = util.openDBConnection();
62
      serverList = buildServerList(conn);
63
      update(serverList, conn);
64
      conn.close();
65
    }
66
    catch (Exception e)
67
    {
68
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
69
    }
70
  }
71
  
72
  /**
73
   * Method to check each server in the servetList for updates
74
   */
75
  private void update(Hashtable serverList, Connection conn)
76
  {
77
    /*
78
     Pseudo-algorithm:
79
     -build a list of servers that need to be checked
80
     -check each server to see if any documents were modified after the 
81
      server's update_date
82
     -if there are documents that need to be updated pull them here
83
      and update the one currently in the database or add a new one
84
      if it is not already here.
85
     -update each servers' update_date to the current time
86
    */
87
    PreparedStatement pstmt;
88
    Enumeration keys;
89
    int istreamInt;
90
    char istreamChar;
91
    StringBuffer serverResponse = new StringBuffer();
92
    String server;
93
    String update;
94
    Vector responses = new Vector();
95
    ReplMessageHandler message = new ReplMessageHandler();
96
    Hashtable updateDocs = new Hashtable();
97
    URL u;
98
    InputStreamReader istream;
99
    
100
    try
101
    {
102
      //build a list of servers with updated documents.  Choose the newest
103
      //one out of the list, update this server, update last_checked
104
      
105
      keys = serverList.keys();
106
      while(keys.hasMoreElements())
107
      { //update from one server at a time
108
        server = (String)(keys.nextElement());
109
        update = (String)(serverList.get(server));
110
        //send the server a date and it will send back any docid that has 
111
        //been modified after that date
112
        
113
        update = update.replace(' ', '+'); 
114
        
115
        u = new URL("http://" + server + "?action=update&date=" + update);
116
        istream = new InputStreamReader(u.openStream());
117
        serverResponse = new StringBuffer();
118
        while((istreamInt = istream.read()) != -1)
119
        {
120
          istreamChar = (char)istreamInt;
121
          serverResponse.append(istreamChar);
122
        }
123
        responses.add(serverResponse.toString()); //list of updates
124
      }
125

    
126
      //initialize the parser
127
      XMLReader parser = initParser(message);
128
      //System.out.println("response: " + responses.toString());
129
      //System.out.println("<--end response-->");
130
      for(int i=0; i<responses.size(); i++)
131
      { //parse the xml and get the result
132
        parser.parse(new InputSource(
133
                     new StringReader(
134
                     (String)(responses.elementAt(i)))));
135
        Vector v = new Vector(message.getUpdatesVect());
136
        Vector d = new Vector(message.getDeletesVect());
137
        for(int j=0; j<v.size(); j++)
138
        { //go through each update vector and update or insert
139
          //a new document
140
          Vector w = new Vector((Vector)(v.elementAt(j)));
141
          String docid = (String)w.elementAt(0);
142
          String docServer = (String)w.elementAt(2);
143

    
144
          //send a message to the server requesting each document
145
          URL getDocURL = new URL("http://" + docServer + 
146
                                  "?action=getdocument&docid="+ docid);
147
          InputStreamReader getDocIstream = new InputStreamReader(
148
                                                getDocURL.openStream()); 
149

    
150
          //the following while loop should not be needed.  see the note
151
          //below before the DocumentImpl.write() call.
152
          serverResponse = new StringBuffer();
153
          while((istreamInt = getDocIstream.read()) != -1)
154
          {
155
            istreamChar = (char)istreamInt;
156
            serverResponse.append(istreamChar);
157
          }
158
          //System.out.println("<<<<<<document>>>>>");
159
          //System.out.println(serverResponse.toString());
160
          //System.out.println("<<<<<<end document>>>>>");
161
          
162
          pstmt = conn.prepareStatement("select serverid from " +
163
                                         "xml_replication where server " +
164
                                         "like '" + docServer + "'");
165
          pstmt.execute();
166
          ResultSet rs = pstmt.getResultSet();
167
          boolean tablehasrows = rs.next();
168
          int serverCode = 0;
169
          if(tablehasrows)
170
          {
171
            serverCode = rs.getInt(1);
172
            //System.out.println("servercode: " + serverCode);
173
          }
174
          else
175
          {
176
            System.out.println("error: server not registered");
177
          }
178
          
179
          if(serverCode != 0)
180
          {
181
            //update the document into the DB
182
            String action = getAction(docid);
183
            //System.out.println("action: " + action + " docid: " + docid);
184
          
185
            //note that getDocIstream is commented out below.  This should
186
            //work as a param to this method but it doesn'.  I don't know why
187
            //but putting a string reader there works but not an 
188
            //inputStreamReader.
189
            String newDocid = DocumentImpl.write(conn, 
190
                                               new StringReader(serverResponse.toString())
191
                                               /*getDocIstream*/, 
192
                                               action, 
193
                                               docid, null, null, serverCode);
194
            System.out.println("newDocid: " + newDocid + " " + action + "ED");
195
          }
196
        }
197
        
198
        for(int k=0; k<d.size(); k++)
199
        {
200
          Vector w = new Vector((Vector)d.elementAt(k));
201
          String docid = (String)w.elementAt(0);
202
          
203
          DocumentImpl.delete(conn, docid, null, null);
204
        }
205
      }
206

    
207
      //update the last_update field for each server to the current date/time      
208
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
209
      java.util.Date newDate = new java.util.Date(System.currentTimeMillis());
210
      ParsePosition pos = new ParsePosition(0);
211
      String dateString = formatter.format(newDate);
212
      //System.out.println("dateString: " + dateString);
213
      StringBuffer sql = new StringBuffer();
214
      sql.append("update xml_replication set last_checked = to_date('");
215
      sql.append(dateString).append("', 'YY-MM-DD HH24:MI:SS')");
216
      //System.out.println("sql: " + sql.toString());
217
      pstmt = conn.prepareStatement(sql.toString());
218
      pstmt.executeUpdate();
219
      //conn.commit();
220
      System.out.println("last_checked updated: " + dateString);
221
  
222
    }
223
    catch(Exception e)
224
    {
225
      System.out.println("Error in replicationHandler.update(): " + 
226
                         e.getMessage());
227
      e.printStackTrace(System.out);
228
    }
229
  }
230
  
231
  /**
232
   * Checks to see if a document is already in the DB.  Returns
233
   * "UPDATE" if it is, "INSERT" if it isn't
234
   */
235
  private static String getAction(String docid)
236
  {
237
    try
238
    {
239
      MetaCatUtil util = new MetaCatUtil();
240
      StringBuffer sql = new StringBuffer();
241
      sql.append("select docid from xml_documents where docid like '");
242
      sql.append(docid).append("'");
243
      Connection conn = util.openDBConnection();
244
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
245
      pstmt.execute();
246
      ResultSet rs = pstmt.getResultSet();
247

    
248
      if(rs.next())
249
      {
250
        conn.close();
251
        return "UPDATE";
252
      }
253
      else
254
      {
255
        conn.close();
256
        return "INSERT";
257
      }
258
    }
259
    catch(Exception e)
260
    {
261
      System.out.println("error in replicationHandler.getAction: " + 
262
                          e.getMessage());
263
    }
264
    return "";
265
  }
266
  
267
  /**
268
   * Method to initialize the message parser
269
   */
270
  private static XMLReader initParser(ReplMessageHandler rmh)
271
          throws Exception
272
  {
273
    XMLReader parser = null;
274

    
275
    try {
276
      ContentHandler chandler = rmh;
277

    
278
      // Get an instance of the parser
279
      MetaCatUtil util = new MetaCatUtil();
280
      String parserName = util.getOption("saxparser");
281
      parser = XMLReaderFactory.createXMLReader(parserName);
282

    
283
      // Turn off validation
284
      parser.setFeature("http://xml.org/sax/features/validation", false);
285
      
286
      parser.setContentHandler((ContentHandler)chandler);
287
      parser.setErrorHandler((ErrorHandler)chandler);
288

    
289
    } catch (Exception e) {
290
      throw e;
291
    }
292

    
293
    return parser;
294
  }
295
  
296
  /**
297
   * Method to query xml_replication and build a hashtable of each server
298
   * and it's last update time.
299
   * @param conn a connection to the database
300
   */
301
  private Hashtable buildServerList(Connection conn)
302
  {
303
    Hashtable sl = new Hashtable();
304
    PreparedStatement pstmt;   
305
    try
306
    {
307
      pstmt = conn.prepareStatement("select server, last_checked from " +
308
                                    "xml_replication");
309
      pstmt.execute();
310
      ResultSet rs = pstmt.getResultSet();
311
      boolean tableHasRows = rs.next();
312
      while(tableHasRows)
313
      {
314
        String server = rs.getString(1);
315
        String last_checked = rs.getString(2);
316
        if(!server.equals("localhost"))
317
        {
318
          sl.put(server, last_checked);
319
        }
320
        tableHasRows = rs.next();
321
      }
322
    }
323
    catch(Exception e)
324
    {
325
      System.out.println("error in replicationHandler.buildServerList(): " +
326
                         e.getMessage());
327
    }
328
    return sl;
329
  }
330
}
(33-33/34)