Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: jones $'
10
 *     '$Date: 2000-11-27 13:55:08 -0800 (Mon, 27 Nov 2000) $'
11
 * '$Revision: 566 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32
import org.xml.sax.helpers.DefaultHandler;
33

    
34

    
35

    
36
/**
37
 * This class handles deltaT replication checking.  Whenever this TimerTask
38
 * is fired it checks each server in xml_replication for updates and updates
39
 * the local db as needed.
40
 */
41
public class ReplicationHandler extends TimerTask
42
{
43
  MetaCatUtil util = new MetaCatUtil();
44
  Hashtable serverList = new Hashtable(); 
45
  Connection conn;
46
  PrintWriter out;
47
  
48
  public ReplicationHandler(PrintWriter o)
49
  {
50
    this.out = o;
51
  }
52
  
53
  /**
54
   * Method that implements TimerTask.run().  It runs whenever the timer is 
55
   * fired.
56
   */
57
  public void run()
58
  {
59
    //find out the last_checked time of each server in the server list and
60
    //send a query to each server to see if there are any documents in 
61
    //xml_documents with an update_date > last_checked
62
    try
63
    {
64
      conn = util.openDBConnection();
65
      serverList = buildServerList(conn);
66
      update(serverList, conn);
67
      conn.close();
68
    }
69
    catch (Exception e)
70
    {
71
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
72
    }
73
  }
74
  
75
  /**
76
   * Method to check each server in the servetList for updates
77
   */
78
  private void update(Hashtable serverList, Connection conn)
79
  {
80
    /*
81
     Pseudo-algorithm:
82
     -build a list of servers that need to be checked
83
     -check each server to see if any documents were modified after the 
84
      server's update_date
85
     -if there are documents that need to be updated pull them here
86
      and update the one currently in the database or add a new one
87
      if it is not already here.
88
     -update each servers' update_date to the current time
89
    */
90
    PreparedStatement pstmt;
91
    Enumeration keys;
92
    int istreamInt;
93
    char istreamChar;
94
    StringBuffer serverResponse = new StringBuffer();
95
    String server;
96
    String update;
97
    Vector responses = new Vector();
98
    ReplMessageHandler message = new ReplMessageHandler();
99
    Hashtable updateDocs = new Hashtable();
100
    URL u;
101
    InputStreamReader istream;
102
    
103
    try
104
    {
105
      //build a list of servers with updated documents.  Choose the newest
106
      //one out of the list, update this server, update last_checked
107
      
108
      keys = serverList.keys();
109
      while(keys.hasMoreElements())
110
      { //update from one server at a time
111
        server = (String)(keys.nextElement());
112
        update = (String)(serverList.get(server));
113
        //send the server a date and it will send back any docid that has 
114
        //been modified after that date
115
        
116
        update = update.replace(' ', '+'); 
117
        
118
        u = new URL("http://" + server + "?action=update&date=" + update);
119
        istream = new InputStreamReader(u.openStream());
120
        serverResponse = new StringBuffer();
121
        while((istreamInt = istream.read()) != -1)
122
        {
123
          istreamChar = (char)istreamInt;
124
          serverResponse.append(istreamChar);
125
        }
126
        responses.add(serverResponse.toString()); //list of updates
127
      }
128

    
129
      //initialize the parser
130
      XMLReader parser = initParser(message);
131
      //System.out.println("response: " + responses.toString());
132
      //System.out.println("<--end response-->");
133
      for(int i=0; i<responses.size(); i++)
134
      { //parse the xml and get the result
135
        parser.parse(new InputSource(
136
                     new StringReader(
137
                     (String)(responses.elementAt(i)))));
138
        Vector v = new Vector(message.getUpdatesVect());
139
        Vector d = new Vector(message.getDeletesVect());
140
        for(int j=0; j<v.size(); j++)
141
        { //go through each update vector and update or insert
142
          //a new document
143
          Vector w = new Vector((Vector)(v.elementAt(j)));
144
          String docid = (String)w.elementAt(0);
145
          String docServer = (String)w.elementAt(2);
146

    
147
          //send a message to the server requesting each document
148
          URL getDocURL = new URL("http://" + docServer + 
149
                                  "?action=read&docid="+ docid);
150
          InputStreamReader getDocIstream = new InputStreamReader(
151
                                                getDocURL.openStream()); 
152

    
153
          //the following while loop should not be needed.  see the note
154
          //below before the DocumentImpl.write() call.
155
          serverResponse = new StringBuffer();
156
          while((istreamInt = getDocIstream.read()) != -1)
157
          {
158
            istreamChar = (char)istreamInt;
159
            serverResponse.append(istreamChar);
160
          }
161
          //System.out.println("<<<<<<document>>>>>");
162
          //System.out.println(serverResponse.toString());
163
          //System.out.println("<<<<<<end document>>>>>");
164
          
165
          pstmt = conn.prepareStatement("select serverid from " +
166
                                         "xml_replication where server " +
167
                                         "like '" + docServer + "'");
168
          pstmt.execute();
169
          ResultSet rs = pstmt.getResultSet();
170
          boolean tablehasrows = rs.next();
171
          int serverCode = 0;
172
          if(tablehasrows)
173
          {
174
            serverCode = rs.getInt(1);
175
            //System.out.println("servercode: " + serverCode);
176
          }
177
          else
178
          {
179
            System.out.println("error: server not registered");
180
          }
181
          
182
          if(serverCode != 0)
183
          {
184
            //update the document into the DB
185
            String action = getAction(docid);
186
            //System.out.println("action: " + action + " docid: " + docid);
187
          
188
            //note that getDocIstream is commented out below.  This should
189
            //work as a param to this method but it doesn'.  I don't know why
190
            //but putting a string reader there works but not an 
191
            //inputStreamReader.
192
            DocInfoHandler dih = new DocInfoHandler();
193
            XMLReader docinfoParser = initParser(dih);
194
            URL docinfoUrl = new URL("http://" + docServer + 
195
                                     "?action=getdocumentinfo&docid=" +
196
                                     docid);
197
            InputStreamReader isr = new InputStreamReader(
198
                                        docinfoUrl.openStream());
199
            StringBuffer docInfoBuffer = new StringBuffer();
200
            while((istreamInt = isr.read()) != 1)
201
            {
202
              istreamChar = (char)istreamInt;
203
              docInfoBuffer.append(istreamChar);
204
            }
205
            
206
            docinfoParser.parse(new InputSource(new StringReader(
207
                                                docInfoBuffer.toString())));
208
            Hashtable docinfoHash = dih.getDocInfo();
209
            
210
            String newDocid = DocumentImpl.write(conn, 
211
                               new StringReader(serverResponse.toString())
212
                               /*getDocIstream*/, 
213
                               action, 
214
                               docid, (String)docinfoHash.get("user_owner"), 
215
                               (String)docinfoHash.get("user_owner"), 
216
                               serverCode);
217
            System.out.println("newDocid: " + newDocid + " " + action + "ED");
218
          }
219
        }
220
        
221
        for(int k=0; k<d.size(); k++)
222
        {
223
          Vector w = new Vector((Vector)d.elementAt(k));
224
          String docid = (String)w.elementAt(0);
225
          
226
          DocumentImpl.delete(conn, docid, null, null);
227
          System.out.println("Document " + docid + " deleted.");
228
        }
229
      }
230

    
231
      //update the last_update field for each server to the current date/time      
232
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
233
      java.util.Date newDate = new java.util.Date(System.currentTimeMillis());
234
      ParsePosition pos = new ParsePosition(0);
235
      String dateString = formatter.format(newDate);
236
      //System.out.println("dateString: " + dateString);
237
      StringBuffer sql = new StringBuffer();
238
      sql.append("update xml_replication set last_checked = to_date('");
239
      sql.append(dateString).append("', 'YY-MM-DD HH24:MI:SS')");
240
      //System.out.println("sql: " + sql.toString());
241
      pstmt = conn.prepareStatement(sql.toString());
242
      pstmt.executeUpdate();
243
      //conn.commit();
244
      System.out.println("last_checked updated: " + dateString);
245
  
246
    }
247
    catch(Exception e)
248
    {
249
      System.out.println("Error in replicationHandler.update(): " + 
250
                         e.getMessage());
251
      e.printStackTrace(System.out);
252
    }
253
  }
254
  
255
  /**
256
   * Checks to see if a document is already in the DB.  Returns
257
   * "UPDATE" if it is, "INSERT" if it isn't
258
   */
259
  private static String getAction(String docid)
260
  {
261
    try
262
    {
263
      MetaCatUtil util = new MetaCatUtil();
264
      StringBuffer sql = new StringBuffer();
265
      sql.append("select docid from xml_documents where docid like '");
266
      sql.append(docid).append("'");
267
      Connection conn = util.openDBConnection();
268
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
269
      pstmt.execute();
270
      ResultSet rs = pstmt.getResultSet();
271

    
272
      if(rs.next())
273
      {
274
        conn.close();
275
        return "UPDATE";
276
      }
277
      else
278
      {
279
        conn.close();
280
        return "INSERT";
281
      }
282
    }
283
    catch(Exception e)
284
    {
285
      System.out.println("error in replicationHandler.getAction: " + 
286
                          e.getMessage());
287
    }
288
    return "";
289
  }
290
  
291
  /**
292
   * Method to initialize the message parser
293
   */
294
  private static XMLReader initParser(DefaultHandler dh)
295
          throws Exception
296
  {
297
    XMLReader parser = null;
298

    
299
    try {
300
      ContentHandler chandler = dh;
301

    
302
      // Get an instance of the parser
303
      MetaCatUtil util = new MetaCatUtil();
304
      String parserName = util.getOption("saxparser");
305
      parser = XMLReaderFactory.createXMLReader(parserName);
306

    
307
      // Turn off validation
308
      parser.setFeature("http://xml.org/sax/features/validation", false);
309
      
310
      parser.setContentHandler((ContentHandler)chandler);
311
      parser.setErrorHandler((ErrorHandler)chandler);
312

    
313
    } catch (Exception e) {
314
      throw e;
315
    }
316

    
317
    return parser;
318
  }
319
  
320
  /**
321
   * Method to query xml_replication and build a hashtable of each server
322
   * and it's last update time.
323
   * @param conn a connection to the database
324
   */
325
  private Hashtable buildServerList(Connection conn)
326
  {
327
    Hashtable sl = new Hashtable();
328
    PreparedStatement pstmt;   
329
    try
330
    {
331
      pstmt = conn.prepareStatement("select server, last_checked from " +
332
                                    "xml_replication");
333
      pstmt.execute();
334
      ResultSet rs = pstmt.getResultSet();
335
      boolean tableHasRows = rs.next();
336
      while(tableHasRows)
337
      {
338
        String server = rs.getString(1);
339
        String last_checked = rs.getString(2);
340
        if(!server.equals("localhost"))
341
        {
342
          sl.put(server, last_checked);
343
        }
344
        tableHasRows = rs.next();
345
      }
346
    }
347
    catch(Exception e)
348
    {
349
      System.out.println("error in replicationHandler.buildServerList(): " +
350
                         e.getMessage());
351
    }
352
    return sl;
353
  }
354
}
(35-35/36)