Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-06 13:26:52 -0800 (Wed, 06 Dec 2000) $'
11
 * '$Revision: 584 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32
import org.xml.sax.helpers.DefaultHandler;
33

    
34

    
35

    
36
/**
37
 * This class handles deltaT replication checking.  Whenever this TimerTask
38
 * is fired it checks each server in xml_replication for updates and updates
39
 * the local db as needed.
40
 */
41
public class ReplicationHandler extends TimerTask
42
{
43
  int serverCheckCode = 1;
44
  MetaCatUtil util = new MetaCatUtil();
45
  Hashtable serverList = new Hashtable(); 
46
  Connection conn;
47
  PrintWriter out;
48
  
49
  public ReplicationHandler(PrintWriter o)
50
  {
51
    this.out = o;
52
  }
53
  
54
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
55
  {
56
    this.out = o;
57
    this.serverCheckCode = serverCheckCode;
58
  }
59
  
60
  /**
61
   * Method that implements TimerTask.run().  It runs whenever the timer is 
62
   * fired.
63
   */
64
  public void run()
65
  {
66
    //find out the last_checked time of each server in the server list and
67
    //send a query to each server to see if there are any documents in 
68
    //xml_documents with an update_date > last_checked
69
    try
70
    {
71
      conn = util.openDBConnection();
72
      serverList = buildServerList(conn);
73
      update(serverList, conn);
74
      conn.close();
75
    }
76
    catch (Exception e)
77
    {
78
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
79
    }
80
  }
81
  
82
  /**
83
   * Method that uses revision taging for replication instead of update_date.
84
   */
85
  private void update(Hashtable serverList, Connection conn)
86
  {
87
    /*
88
     Pseudo-algorithm
89
     - request a doc list from each server in xml_replication
90
     - check the rev number of each of those documents agains the 
91
       documents in the local database
92
     - pull any documents that have a lesser rev number on the local server
93
       from the remote server
94
     - delete any documents that still exist in the local xml_documents but
95
       are in the deletedDocuments tag of the remote host response.
96
     - update last_checked to keep track of the last time it was checked.
97
       (this info is theoretically not needed using this system but probably 
98
       should be kept anyway)
99
    */
100
    Enumeration keys;
101
    String server;
102
    String update;
103
    ReplMessageHandler message = new ReplMessageHandler();
104
    Vector responses = new Vector();
105
    PreparedStatement pstmt;
106
    ResultSet rs;
107
    boolean tablehasrows;
108
    boolean flag=false;
109
    String action = new String();
110
    XMLReader parser;
111
    URL u;
112
    
113
    //System.out.println("in update2");
114
    
115
    try
116
    {
117
      //System.out.println("init parser");
118
      parser = initParser(message);
119
      keys = serverList.keys();
120
      while(keys.hasMoreElements())
121
      {
122
        //System.out.println("get responses");
123
        server = (String)(keys.nextElement());
124
        MetacatReplication.replLog("full update started to: " + server);
125
        u = new URL("http://" + server + "?action=update");
126
        //System.out.println(u.toString());
127
        String result = MetacatReplication.getURLContent(u);
128
        //System.out.println(result);
129
        responses.add(result);
130
      }
131
      
132
      //System.out.println("responses: " + responses.toString());
133
      
134
      for(int i=0; i<responses.size(); i++)
135
      { //check each server for updated files
136
        //System.out.println("parsing responses");
137
        parser.parse(new InputSource(
138
                     new StringReader(
139
                     (String)(responses.elementAt(i)))));
140
        Vector v = new Vector(message.getUpdatesVect());
141
        //System.out.println("v: " + v.toString());
142
        Vector d = new Vector(message.getDeletesVect());
143
        //check the revs in u to see if there are any newer ones than
144
        //in the local DB.
145
        for(int j=0; j<v.size(); j++)
146
        {
147
          Vector w = new Vector((Vector)(v.elementAt(j)));
148
          //System.out.println("w: " + w.toString());
149
          String docid = (String)w.elementAt(0);
150
          //System.out.println("docid: " + docid);
151
          int rev = Integer.parseInt((String)w.elementAt(1));
152
          //System.out.println("rev: " + rev);
153
          String docServer = (String)w.elementAt(2);
154
          //System.out.println("docServer: " + docServer);
155
    
156
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
157
                                        "docid like '" + docid + "'");
158
          pstmt.execute();
159
          rs = pstmt.getResultSet();
160
          tablehasrows = rs.next();
161
          if(tablehasrows)
162
          { //check the revs for an update because this document is in the
163
            //local DB, it just might be out of date.
164
            int localrev = rs.getInt(1);
165
            if(localrev == rev)
166
            {
167
              flag = false;
168
            }
169
            else if(localrev < rev)
170
            {//this document needs to be updated so send an read request
171
              action = "UPDATE";
172
              flag = true;
173
            }
174
          }
175
          else
176
          { //insert this document as new because it is not in the local DB
177
            action = "INSERT";
178
            flag = true;
179
          }
180
          
181
          if(flag)
182
          { //if the document needs to be updated or inserted, this is executed
183
            u = new URL("http://" + docServer + "?action=read&docid=" +
184
                          docid);
185
            MetacatReplication.replLog("reading doc " + docid + " from " +
186
                                        docServer);
187
            //System.out.println(u.toString());
188
            String newxmldoc = MetacatReplication.getURLContent(u);
189
            DocInfoHandler dih = new DocInfoHandler();
190
            XMLReader docinfoParser = initParser(dih);
191
            URL docinfoUrl = new URL("http://" + docServer + 
192
                                   "?action=getdocumentinfo&docid=" +
193
                                   docid);
194
            //System.out.println(docinfoUrl.toString());
195
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
196
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
197
            Hashtable docinfoHash = dih.getDocInfo();
198
            int serverCode = MetacatReplication.getServerCode(docServer);
199
            //System.out.println("updating doc: " + docid + " action: "+ action);
200
            String newDocid = DocumentImpl.write(conn, 
201
                              new StringReader(newxmldoc),
202
                              null,
203
                              action, 
204
                              docid, 
205
                              (String)docinfoHash.get("user_owner"),
206
                              (String)docinfoHash.get("user_owner"), 
207
                              serverCode, 
208
                              true);
209
            MetacatReplication.replLog("wrote doc " + docid + " from " + 
210
                                        docServer);
211
          }
212
        }
213
        
214
        for(int k=0; k<d.size(); k++)
215
        { //delete the deleted documents;
216
          Vector w = new Vector((Vector)d.elementAt(k));
217
          String docid = (String)w.elementAt(0);
218
          if(!alreadyDeleted(docid, conn))
219
          {
220
            DocumentImpl.delete(conn, docid, null, null);
221
            //System.out.println("Document " + docid + " deleted.");
222
            MetacatReplication.replLog("doc " + docid + " deleted");
223
          }
224
        }
225
        
226
        keys = serverList.keys();
227
        while(keys.hasMoreElements())
228
        {
229
          server = (String)(keys.nextElement()); 
230
          URL dateurl = new URL("http://" + server + "?action=gettime");
231
          String datexml = MetacatReplication.getURLContent(dateurl);
232
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
233
          StringBuffer sql = new StringBuffer();
234
          sql.append("update xml_replication set last_checked = to_date('");
235
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
236
          sql.append("server like '").append(server).append("'");
237
          //System.out.println("sql: " + sql.toString());
238
          pstmt = conn.prepareStatement(sql.toString());
239
          pstmt.executeUpdate();
240
          //conn.commit();
241
          //System.out.println("last_checked updated to " + datestr + " on " +
242
          //                  server);
243
        }
244
      }
245
    }
246
    catch(Exception e)
247
    {
248
      System.out.println("error in update2: " + e.getMessage());
249
      e.printStackTrace(System.out);
250
    }
251
  }
252
  
253
  /**
254
   * Method that returns true if docid has already been "deleted" from metacat.
255
   * This method really implements a truth table for deleted documents
256
   * The table is (a document in one of the tables is represented by the X):
257
   * xml_docs      xml_revs      deleted?
258
   * ------------------------------------
259
   *   X             X             FALSE
260
   *   X             _             FALSE
261
   *   _             X             TRUE
262
   *   _             _             TRUE
263
   */
264
  private static boolean alreadyDeleted(String docid, Connection conn)
265
  {
266
    try
267
    {
268
      boolean xml_docs = false;
269
      boolean xml_revs = false;
270
      
271
      StringBuffer sb = new StringBuffer();
272
      sb.append("select docid from xml_revisions where docid like '");
273
      sb.append(docid).append("'");
274
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
275
      pstmt.execute();
276
      ResultSet rs = pstmt.getResultSet();
277
      boolean tablehasrows = rs.next();
278
      if(tablehasrows)
279
      {
280
        xml_revs = true;
281
      }
282
      
283
      sb = new StringBuffer();
284
      sb.append("select docid from xml_documents where docid like '");
285
      sb.append(docid).append("'");
286
      pstmt = conn.prepareStatement(sb.toString());
287
      pstmt.execute();
288
      rs = pstmt.getResultSet();
289
      tablehasrows = rs.next();
290
      if(tablehasrows)
291
      {
292
        xml_docs = true;
293
      }
294
      
295
      if(xml_docs && xml_revs)
296
      {
297
        return false;
298
      }
299
      else if(xml_docs && !xml_revs)
300
      {
301
        return false;
302
      }
303
      else if(!xml_docs && xml_revs)
304
      {
305
        return true;
306
      }
307
      else if(!xml_docs && !xml_revs)
308
      {
309
        return true;
310
      }
311
    }
312
    catch(Exception e)
313
    {
314
      System.out.println("error in alreadyDeleted: " + e.getMessage());
315
      e.printStackTrace(System.out);
316
    }
317
    return false;
318
  }
319
  
320
  /**
321
   * Checks to see if a document is already in the DB.  Returns
322
   * "UPDATE" if it is, "INSERT" if it isn't
323
   */
324
  private static String getAction(String docid)
325
  {
326
    try
327
    {
328
      MetaCatUtil util = new MetaCatUtil();
329
      StringBuffer sql = new StringBuffer();
330
      sql.append("select docid from xml_documents where docid like '");
331
      sql.append(docid).append("'");
332
      Connection conn = util.openDBConnection();
333
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
334
      pstmt.execute();
335
      ResultSet rs = pstmt.getResultSet();
336

    
337
      if(rs.next())
338
      {
339
        conn.close();
340
        return "UPDATE";
341
      }
342
      else
343
      {
344
        conn.close();
345
        return "INSERT";
346
      }
347
    }
348
    catch(Exception e)
349
    {
350
      System.out.println("error in replicationHandler.getAction: " + 
351
                          e.getMessage());
352
    }
353
    return "";
354
  }
355
  
356
  /**
357
   * Method to query xml_replication and build a hashtable of each server
358
   * and it's last update time.
359
   * @param conn a connection to the database
360
   */
361
  public static Hashtable buildServerList(Connection conn)
362
  {
363
    Hashtable sl = new Hashtable();
364
    PreparedStatement pstmt;   
365
    try
366
    {
367
      pstmt = conn.prepareStatement("select server, last_checked from " +
368
                                    "xml_replication");
369
      pstmt.execute();
370
      ResultSet rs = pstmt.getResultSet();
371
      boolean tableHasRows = rs.next();
372
      while(tableHasRows)
373
      {
374
        String server = rs.getString(1);
375
        String last_checked = rs.getString(2);
376
        if(!server.equals("localhost"))
377
        {
378
          sl.put(server, last_checked);
379
        }
380
        tableHasRows = rs.next();
381
      }
382
    }
383
    catch(Exception e)
384
    {
385
      System.out.println("error in replicationHandler.buildServerList(): " +
386
                         e.getMessage());
387
    }
388
    return sl;
389
  }
390
  
391
  /**
392
   * Method to initialize the message parser
393
   */
394
  public static XMLReader initParser(DefaultHandler dh)
395
          throws Exception
396
  {
397
    XMLReader parser = null;
398

    
399
    try {
400
      ContentHandler chandler = dh;
401

    
402
      // Get an instance of the parser
403
      MetaCatUtil util = new MetaCatUtil();
404
      String parserName = util.getOption("saxparser");
405
      parser = XMLReaderFactory.createXMLReader(parserName);
406

    
407
      // Turn off validation
408
      parser.setFeature("http://xml.org/sax/features/validation", false);
409
      
410
      parser.setContentHandler((ContentHandler)chandler);
411
      parser.setErrorHandler((ErrorHandler)chandler);
412

    
413
    } catch (Exception e) {
414
      throw e;
415
    }
416

    
417
    return parser;
418
  }
419
}
(36-36/37)