Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-12 10:55:33 -0800 (Tue, 12 Dec 2000) $'
11
 * '$Revision: 595 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32
import org.xml.sax.helpers.DefaultHandler;
33

    
34

    
35

    
36
/**
37
 * This class handles deltaT replication checking.  Whenever this TimerTask
38
 * is fired it checks each server in xml_replication for updates and updates
39
 * the local db as needed.
40
 */
41
public class ReplicationHandler extends TimerTask
42
{
43
  int serverCheckCode = 1;
44
  MetaCatUtil util = new MetaCatUtil();
45
  Hashtable serverList = new Hashtable(); 
46
  Connection conn;
47
  PrintWriter out;
48
  
49
  public ReplicationHandler(PrintWriter o)
50
  {
51
    this.out = o;
52
  }
53
  
54
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
55
  {
56
    this.out = o;
57
    this.serverCheckCode = serverCheckCode;
58
  }
59
  
60
  /**
61
   * Method that implements TimerTask.run().  It runs whenever the timer is 
62
   * fired.
63
   */
64
  public void run()
65
  {
66
    //find out the last_checked time of each server in the server list and
67
    //send a query to each server to see if there are any documents in 
68
    //xml_documents with an update_date > last_checked
69
    try
70
    {
71
      conn = util.openDBConnection();
72
      serverList = buildServerList(conn);
73
      update(serverList, conn);
74
      updateCatalog(serverList, conn);
75
      conn.close();
76
    }
77
    catch (Exception e)
78
    {
79
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
80
    }
81
  }
82
  
83
  /**
84
   * Method that uses revision taging for replication instead of update_date.
85
   */
86
  private void update(Hashtable serverList, Connection conn)
87
  {
88
    /*
89
     Pseudo-algorithm
90
     - request a doc list from each server in xml_replication
91
     - check the rev number of each of those documents agains the 
92
       documents in the local database
93
     - pull any documents that have a lesser rev number on the local server
94
       from the remote server
95
     - delete any documents that still exist in the local xml_documents but
96
       are in the deletedDocuments tag of the remote host response.
97
     - update last_checked to keep track of the last time it was checked.
98
       (this info is theoretically not needed using this system but probably 
99
       should be kept anyway)
100
    */
101
    Enumeration keys;
102
    String server;
103
    String update;
104
    ReplMessageHandler message = new ReplMessageHandler();
105
    Vector responses = new Vector();
106
    PreparedStatement pstmt;
107
    ResultSet rs;
108
    boolean tablehasrows;
109
    boolean flag=false;
110
    String action = new String();
111
    XMLReader parser;
112
    URL u;
113
    
114
    try
115
    {
116
      MetaCatUtil.debugMessage("init parser");
117
      parser = initParser(message);
118
      keys = serverList.keys();
119
      while(keys.hasMoreElements())
120
      {
121
        server = (String)(keys.nextElement());
122
        MetacatReplication.replLog("full update started to: " + server);
123
        u = new URL("http://" + server + "?action=update");
124
        System.out.println("Sending Message: " + u.toString());
125
        String result = MetacatReplication.getURLContent(u);
126
        responses.add(result);
127
      }
128
      
129
      System.out.println("responses: " + responses.toString());
130
      
131
      for(int i=0; i<responses.size(); i++)
132
      { //check each server for updated files
133
        //System.out.println("parsing responses");
134
        parser.parse(new InputSource(
135
                     new StringReader(
136
                     (String)(responses.elementAt(i)))));
137
        Vector v = new Vector(message.getUpdatesVect());
138
        //System.out.println("v: " + v.toString());
139
        Vector d = new Vector(message.getDeletesVect());
140
        //check the revs in u to see if there are any newer ones than
141
        //in the local DB.
142
        for(int j=0; j<v.size(); j++)
143
        {
144
          Vector w = new Vector((Vector)(v.elementAt(j)));
145
          //System.out.println("w: " + w.toString());
146
          String docid = (String)w.elementAt(0);
147
          //System.out.println("docid: " + docid);
148
          int rev = Integer.parseInt((String)w.elementAt(1));
149
          //System.out.println("rev: " + rev);
150
          String docServer = (String)w.elementAt(2);
151
          //System.out.println("docServer: " + docServer);
152
    
153
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
154
                                        "docid like '" + docid + "'");
155
          pstmt.execute();
156
          rs = pstmt.getResultSet();
157
          tablehasrows = rs.next();
158
          if(tablehasrows)
159
          { //check the revs for an update because this document is in the
160
            //local DB, it just might be out of date.
161
            int localrev = rs.getInt(1);
162
            if(localrev == rev)
163
            {
164
              flag = false;
165
            }
166
            else if(localrev < rev)
167
            {//this document needs to be updated so send an read request
168
              action = "UPDATE";
169
              flag = true;
170
            }
171
          }
172
          else
173
          { //insert this document as new because it is not in the local DB
174
            action = "INSERT";
175
            flag = true;
176
          }
177
          
178
          if(flag)
179
          { //if the document needs to be updated or inserted, this is executed
180
            u = new URL("http://" + docServer + "?action=read&docid=" +
181
                          docid);
182
            System.out.println("Sending message: " + u.toString());
183
            String newxmldoc = MetacatReplication.getURLContent(u);
184
            DocInfoHandler dih = new DocInfoHandler();
185
            XMLReader docinfoParser = initParser(dih);
186
            URL docinfoUrl = new URL("http://" + docServer + 
187
                                   "?action=getdocumentinfo&docid=" +
188
                                   docid);
189
            System.out.println("Sending message: " + docinfoUrl.toString());
190
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
191
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
192
            Hashtable docinfoHash = dih.getDocInfo();
193
            int serverCode = MetacatReplication.getServerCode(docServer);
194
            MetaCatUtil.debugMessage("updating doc: " + docid + " action: "+ action);
195
            String newDocid = DocumentImpl.write(conn, 
196
                              new StringReader(newxmldoc),
197
                              null,
198
                              action, 
199
                              docid, 
200
                              (String)docinfoHash.get("user_owner"),
201
                              (String)docinfoHash.get("user_owner"), 
202
                              serverCode, 
203
                              true);
204
            MetacatReplication.replLog("wrote doc " + docid + " from " + 
205
                                        docServer);
206
            System.out.println("wrote doc " + docid + " from " + 
207
                                docServer);
208
          }
209
        }
210
        
211
        for(int k=0; k<d.size(); k++)
212
        { //delete the deleted documents;
213
          Vector w = new Vector((Vector)d.elementAt(k));
214
          String docid = (String)w.elementAt(0);
215
          if(!alreadyDeleted(docid, conn))
216
          {
217
            DocumentImpl.delete(conn, docid, null, null);
218
            System.out.println("Document " + docid + " deleted.");
219
            MetacatReplication.replLog("doc " + docid + " deleted");
220
          }
221
        }
222
        
223
        keys = serverList.keys();
224
        while(keys.hasMoreElements())
225
        {
226
          server = (String)(keys.nextElement()); 
227
          URL dateurl = new URL("http://" + server + "?action=gettime");
228
          String datexml = MetacatReplication.getURLContent(dateurl);
229
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
230
          StringBuffer sql = new StringBuffer();
231
          sql.append("update xml_replication set last_checked = to_date('");
232
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
233
          sql.append("server like '").append(server).append("'");
234
          //System.out.println("sql: " + sql.toString());
235
          pstmt = conn.prepareStatement(sql.toString());
236
          pstmt.executeUpdate();
237
          //conn.commit();
238
          System.out.println("last_checked updated to " + datestr + " on " +
239
                            server);
240
        }
241
      }
242
    }
243
    catch(Exception e)
244
    {
245
      System.out.println("error in update2: " + e.getMessage());
246
      e.printStackTrace(System.out);
247
    }
248
  }
249
  
250
  /**
251
   * updates xml_catalog with entries from other servers.
252
   */
253
  private void updateCatalog(Hashtable serverList, Connection conn)
254
  {
255
    System.out.println("in updateCatalog");
256
    try
257
    {
258
      String server;
259
      Enumeration keys = serverList.keys();
260
      while(keys.hasMoreElements())
261
      { //go through each server
262
        server = (String)(keys.nextElement());
263
        URL u = new URL("http://" + server + "?action=getcatalog");
264
        System.out.println("sending message " + u.toString());
265
        String catxml = MetacatReplication.getURLContent(u);
266
        //System.out.println("catxml: " + catxml);
267
        CatalogMessageHandler cmh = new CatalogMessageHandler();
268
        XMLReader catparser = initParser(cmh);
269
        catparser.parse(new InputSource(new StringReader(catxml)));
270
        //parse the returned catalog xml and put it into a vector
271
        Vector remoteCatalog = cmh.getCatalogVect();
272
        
273
        String localcatxml = MetacatReplication.getCatalogXML();
274
        //System.out.println("localcatxml: " + localcatxml);
275
        cmh = new CatalogMessageHandler();
276
        catparser = initParser(cmh);
277
        catparser.parse(new InputSource(new StringReader(localcatxml)));
278
        Vector localCatalog = cmh.getCatalogVect();
279
        
280
        //now we have the catalog from the remote server and this local server
281
        //we now need to compare the two and merge the differences.
282
        //the comparison is base on the public_id fields which is the 4th
283
        //entry in each row vector.
284
        Vector publicId = new Vector();
285
        for(int i=0; i<localCatalog.size(); i++)
286
        {
287
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
288
          //System.out.println("v1: " + v.toString());
289
          publicId.add(new String((String)v.elementAt(3)));
290
          //System.out.println("adding " + (String)v.elementAt(3));
291
        }
292
        
293
        for(int i=0; i<remoteCatalog.size(); i++)
294
        {
295
          Vector v = (Vector)remoteCatalog.elementAt(i);
296
          //System.out.println("v2: " + v.toString());
297
          //System.out.println("i: " + i);
298
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
299
          //System.out.println("publicID: " + publicId.toString());
300
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
301
          if(!publicId.contains(v.elementAt(3)))
302
          { //so we don't have this public id in our local table so we need to
303
            //add it.
304
            //System.out.println("in if");
305
            StringBuffer sql = new StringBuffer();
306
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
307
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
308
            sql.append("?,?)");
309
            //System.out.println("sql: " + sql.toString());
310
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
311
            pstmt.setString(1, (String)v.elementAt(0));
312
            pstmt.setString(2, (String)v.elementAt(1));
313
            pstmt.setString(3, (String)v.elementAt(2));
314
            pstmt.setString(4, (String)v.elementAt(3));
315
            pstmt.setString(5, (String)v.elementAt(4));
316
            pstmt.execute();
317
          }
318
        }
319
      }
320
    }
321
    catch(Exception e)
322
    {
323
      System.out.println("error in updateCatalog: " + e.getMessage());
324
      e.printStackTrace(System.out);
325
    }
326
  }
327
  
328
  /**
329
   * Method that returns true if docid has already been "deleted" from metacat.
330
   * This method really implements a truth table for deleted documents
331
   * The table is (a docid in one of the tables is represented by the X):
332
   * xml_docs      xml_revs      deleted?
333
   * ------------------------------------
334
   *   X             X             FALSE
335
   *   X             _             FALSE
336
   *   _             X             TRUE
337
   *   _             _             TRUE
338
   */
339
  private static boolean alreadyDeleted(String docid, Connection conn)
340
  {
341
    try
342
    {
343
      boolean xml_docs = false;
344
      boolean xml_revs = false;
345
      
346
      StringBuffer sb = new StringBuffer();
347
      sb.append("select docid from xml_revisions where docid like '");
348
      sb.append(docid).append("'");
349
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
350
      pstmt.execute();
351
      ResultSet rs = pstmt.getResultSet();
352
      boolean tablehasrows = rs.next();
353
      if(tablehasrows)
354
      {
355
        xml_revs = true;
356
      }
357
      
358
      sb = new StringBuffer();
359
      sb.append("select docid from xml_documents where docid like '");
360
      sb.append(docid).append("'");
361
      pstmt = conn.prepareStatement(sb.toString());
362
      pstmt.execute();
363
      rs = pstmt.getResultSet();
364
      tablehasrows = rs.next();
365
      if(tablehasrows)
366
      {
367
        xml_docs = true;
368
      }
369
      
370
      if(xml_docs && xml_revs)
371
      {
372
        return false;
373
      }
374
      else if(xml_docs && !xml_revs)
375
      {
376
        return false;
377
      }
378
      else if(!xml_docs && xml_revs)
379
      {
380
        return true;
381
      }
382
      else if(!xml_docs && !xml_revs)
383
      {
384
        return true;
385
      }
386
    }
387
    catch(Exception e)
388
    {
389
      System.out.println("error in alreadyDeleted: " + e.getMessage());
390
      e.printStackTrace(System.out);
391
    }
392
    return false;
393
  }
394
  
395
  /**
396
   * Checks to see if a document is already in the DB.  Returns
397
   * "UPDATE" if it is, "INSERT" if it isn't
398
   */
399
  private static String getAction(String docid)
400
  {
401
    try
402
    {
403
      MetaCatUtil util = new MetaCatUtil();
404
      StringBuffer sql = new StringBuffer();
405
      sql.append("select docid from xml_documents where docid like '");
406
      sql.append(docid).append("'");
407
      Connection conn = util.openDBConnection();
408
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
409
      pstmt.execute();
410
      ResultSet rs = pstmt.getResultSet();
411

    
412
      if(rs.next())
413
      {
414
        conn.close();
415
        return "UPDATE";
416
      }
417
      else
418
      {
419
        conn.close();
420
        return "INSERT";
421
      }
422
    }
423
    catch(Exception e)
424
    {
425
      System.out.println("error in replicationHandler.getAction: " + 
426
                          e.getMessage());
427
    }
428
    return "";
429
  }
430
  
431
  /**
432
   * Method to query xml_replication and build a hashtable of each server
433
   * and it's last update time.
434
   * @param conn a connection to the database
435
   */
436
  public static Hashtable buildServerList(Connection conn)
437
  {
438
    Hashtable sl = new Hashtable();
439
    PreparedStatement pstmt;   
440
    try
441
    {
442
      pstmt = conn.prepareStatement("select server, last_checked from " +
443
                                    "xml_replication");
444
      pstmt.execute();
445
      ResultSet rs = pstmt.getResultSet();
446
      boolean tableHasRows = rs.next();
447
      while(tableHasRows)
448
      {
449
        String server = rs.getString(1);
450
        String last_checked = rs.getString(2);
451
        if(!server.equals("localhost"))
452
        {
453
          sl.put(server, last_checked);
454
        }
455
        tableHasRows = rs.next();
456
      }
457
    }
458
    catch(Exception e)
459
    {
460
      System.out.println("error in replicationHandler.buildServerList(): " +
461
                         e.getMessage());
462
    }
463
    return sl;
464
  }
465
  
466
  /**
467
   * Method to initialize the message parser
468
   */
469
  public static XMLReader initParser(DefaultHandler dh)
470
          throws Exception
471
  {
472
    XMLReader parser = null;
473

    
474
    try {
475
      ContentHandler chandler = dh;
476

    
477
      // Get an instance of the parser
478
      MetaCatUtil util = new MetaCatUtil();
479
      String parserName = util.getOption("saxparser");
480
      parser = XMLReaderFactory.createXMLReader(parserName);
481

    
482
      // Turn off validation
483
      parser.setFeature("http://xml.org/sax/features/validation", false);
484
      
485
      parser.setContentHandler((ContentHandler)chandler);
486
      parser.setErrorHandler((ErrorHandler)chandler);
487

    
488
    } catch (Exception e) {
489
      throw e;
490
    }
491

    
492
    return parser;
493
  }
494
}
(37-37/38)