Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-07 15:11:29 -0800 (Thu, 07 Dec 2000) $'
11
 * '$Revision: 590 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32
import org.xml.sax.helpers.DefaultHandler;
33

    
34

    
35

    
36
/**
37
 * This class handles deltaT replication checking.  Whenever this TimerTask
38
 * is fired it checks each server in xml_replication for updates and updates
39
 * the local db as needed.
40
 */
41
public class ReplicationHandler extends TimerTask
42
{
43
  int serverCheckCode = 1;
44
  MetaCatUtil util = new MetaCatUtil();
45
  Hashtable serverList = new Hashtable(); 
46
  Connection conn;
47
  PrintWriter out;
48
  
49
  public ReplicationHandler(PrintWriter o)
50
  {
51
    this.out = o;
52
  }
53
  
54
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
55
  {
56
    this.out = o;
57
    this.serverCheckCode = serverCheckCode;
58
  }
59
  
60
  /**
61
   * Method that implements TimerTask.run().  It runs whenever the timer is 
62
   * fired.
63
   */
64
  public void run()
65
  {
66
    //find out the last_checked time of each server in the server list and
67
    //send a query to each server to see if there are any documents in 
68
    //xml_documents with an update_date > last_checked
69
    try
70
    {
71
      conn = util.openDBConnection();
72
      serverList = buildServerList(conn);
73
      update(serverList, conn);
74
      updateCatalog(serverList, conn);
75
      conn.close();
76
    }
77
    catch (Exception e)
78
    {
79
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
80
    }
81
  }
82
  
83
  /**
84
   * Method that uses revision taging for replication instead of update_date.
85
   */
86
  private void update(Hashtable serverList, Connection conn)
87
  {
88
    /*
89
     Pseudo-algorithm
90
     - request a doc list from each server in xml_replication
91
     - check the rev number of each of those documents agains the 
92
       documents in the local database
93
     - pull any documents that have a lesser rev number on the local server
94
       from the remote server
95
     - delete any documents that still exist in the local xml_documents but
96
       are in the deletedDocuments tag of the remote host response.
97
     - update last_checked to keep track of the last time it was checked.
98
       (this info is theoretically not needed using this system but probably 
99
       should be kept anyway)
100
    */
101
    Enumeration keys;
102
    String server;
103
    String update;
104
    ReplMessageHandler message = new ReplMessageHandler();
105
    Vector responses = new Vector();
106
    PreparedStatement pstmt;
107
    ResultSet rs;
108
    boolean tablehasrows;
109
    boolean flag=false;
110
    String action = new String();
111
    XMLReader parser;
112
    URL u;
113
    
114
    //System.out.println("in update2");
115
    
116
    try
117
    {
118
      MetaCatUtil.debugMessage("init parser");
119
      parser = initParser(message);
120
      keys = serverList.keys();
121
      while(keys.hasMoreElements())
122
      {
123
        MetaCatUtil.debugMessage("get responses");
124
        server = (String)(keys.nextElement());
125
        MetacatReplication.replLog("full update started to: " + server);
126
        u = new URL("http://" + server + "?action=update");
127
        MetaCatUtil.debugMessage(u.toString());
128
        String result = MetacatReplication.getURLContent(u);
129
        MetaCatUtil.debugMessage(result);
130
        responses.add(result);
131
      }
132
      
133
      MetaCatUtil.debugMessage("responses: " + responses.toString());
134
      
135
      for(int i=0; i<responses.size(); i++)
136
      { //check each server for updated files
137
        //System.out.println("parsing responses");
138
        parser.parse(new InputSource(
139
                     new StringReader(
140
                     (String)(responses.elementAt(i)))));
141
        Vector v = new Vector(message.getUpdatesVect());
142
        //System.out.println("v: " + v.toString());
143
        Vector d = new Vector(message.getDeletesVect());
144
        //check the revs in u to see if there are any newer ones than
145
        //in the local DB.
146
        for(int j=0; j<v.size(); j++)
147
        {
148
          Vector w = new Vector((Vector)(v.elementAt(j)));
149
          //System.out.println("w: " + w.toString());
150
          String docid = (String)w.elementAt(0);
151
          //System.out.println("docid: " + docid);
152
          int rev = Integer.parseInt((String)w.elementAt(1));
153
          //System.out.println("rev: " + rev);
154
          String docServer = (String)w.elementAt(2);
155
          //System.out.println("docServer: " + docServer);
156
    
157
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
158
                                        "docid like '" + docid + "'");
159
          pstmt.execute();
160
          rs = pstmt.getResultSet();
161
          tablehasrows = rs.next();
162
          if(tablehasrows)
163
          { //check the revs for an update because this document is in the
164
            //local DB, it just might be out of date.
165
            int localrev = rs.getInt(1);
166
            if(localrev == rev)
167
            {
168
              flag = false;
169
            }
170
            else if(localrev < rev)
171
            {//this document needs to be updated so send an read request
172
              action = "UPDATE";
173
              flag = true;
174
            }
175
          }
176
          else
177
          { //insert this document as new because it is not in the local DB
178
            action = "INSERT";
179
            flag = true;
180
          }
181
          
182
          if(flag)
183
          { //if the document needs to be updated or inserted, this is executed
184
            u = new URL("http://" + docServer + "?action=read&docid=" +
185
                          docid);
186
            MetacatReplication.replLog("reading doc " + docid + " from " +
187
                                        docServer);
188
            //System.out.println(u.toString());
189
            String newxmldoc = MetacatReplication.getURLContent(u);
190
            DocInfoHandler dih = new DocInfoHandler();
191
            XMLReader docinfoParser = initParser(dih);
192
            URL docinfoUrl = new URL("http://" + docServer + 
193
                                   "?action=getdocumentinfo&docid=" +
194
                                   docid);
195
            //System.out.println(docinfoUrl.toString());
196
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
197
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
198
            Hashtable docinfoHash = dih.getDocInfo();
199
            int serverCode = MetacatReplication.getServerCode(docServer);
200
            MetaCatUtil.debugMessage("updating doc: " + docid + " action: "+ action);
201
            String newDocid = DocumentImpl.write(conn, 
202
                              new StringReader(newxmldoc),
203
                              null,
204
                              action, 
205
                              docid, 
206
                              (String)docinfoHash.get("user_owner"),
207
                              (String)docinfoHash.get("user_owner"), 
208
                              serverCode, 
209
                              true);
210
            MetacatReplication.replLog("wrote doc " + docid + " from " + 
211
                                        docServer);
212
          }
213
        }
214
        
215
        for(int k=0; k<d.size(); k++)
216
        { //delete the deleted documents;
217
          Vector w = new Vector((Vector)d.elementAt(k));
218
          String docid = (String)w.elementAt(0);
219
          if(!alreadyDeleted(docid, conn))
220
          {
221
            DocumentImpl.delete(conn, docid, null, null);
222
            MetaCatUtil.debugMessage("Document " + docid + " deleted.");
223
            MetacatReplication.replLog("doc " + docid + " deleted");
224
          }
225
        }
226
        
227
        keys = serverList.keys();
228
        while(keys.hasMoreElements())
229
        {
230
          server = (String)(keys.nextElement()); 
231
          URL dateurl = new URL("http://" + server + "?action=gettime");
232
          String datexml = MetacatReplication.getURLContent(dateurl);
233
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
234
          StringBuffer sql = new StringBuffer();
235
          sql.append("update xml_replication set last_checked = to_date('");
236
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
237
          sql.append("server like '").append(server).append("'");
238
          //System.out.println("sql: " + sql.toString());
239
          pstmt = conn.prepareStatement(sql.toString());
240
          pstmt.executeUpdate();
241
          //conn.commit();
242
          MetaCatUtil.debugMessage("last_checked updated to " + datestr + " on " +
243
                            server);
244
        }
245
      }
246
    }
247
    catch(Exception e)
248
    {
249
      System.out.println("error in update2: " + e.getMessage());
250
      e.printStackTrace(System.out);
251
    }
252
  }
253
  
254
  /**
255
   * updates xml_catalog with entries from other servers.
256
   */
257
  private void updateCatalog(Hashtable serverList, Connection conn)
258
  {
259
    System.out.println("in updateCatalog");
260
    try
261
    {
262
      String server;
263
      Enumeration keys = serverList.keys();
264
      while(keys.hasMoreElements())
265
      { //go through each server
266
        server = (String)(keys.nextElement());
267
        System.out.println("server: " + server);
268
        URL u = new URL("http://" + server + "?action=getcatalog");
269
        String catxml = MetacatReplication.getURLContent(u);
270
        System.out.println("catxml: " + catxml);
271
        CatalogMessageHandler cmh = new CatalogMessageHandler();
272
        XMLReader catparser = initParser(cmh);
273
        catparser.parse(new InputSource(new StringReader(catxml)));
274
        //parse the returned catalog xml and put it into a vector
275
        Vector remoteCatalog = cmh.getCatalogVect();
276
        
277
        String localcatxml = MetacatReplication.getCatalogXML();
278
        System.out.println("localcatxml: " + localcatxml);
279
        cmh = new CatalogMessageHandler();
280
        catparser = initParser(cmh);
281
        catparser.parse(new InputSource(new StringReader(localcatxml)));
282
        Vector localCatalog = cmh.getCatalogVect();
283
        
284
        //now we have the catalog from the remote server and this local server
285
        //we now need to compare the two and merge the differences.
286
        //the comparison is base on the public_id fields which is the 4th
287
        //entry in each row vector.
288
        Vector publicId = new Vector();
289
        for(int i=0; i<localCatalog.size(); i++)
290
        {
291
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
292
          System.out.println("v1: " + v.toString());
293
          publicId.add(new String((String)v.elementAt(3)));
294
          System.out.println("adding " + (String)v.elementAt(3));
295
        }
296
        
297
        for(int i=0; i<remoteCatalog.size(); i++)
298
        {
299
          Vector v = (Vector)remoteCatalog.elementAt(i);
300
          System.out.println("v2: " + v.toString());
301
          //System.out.println("i: " + i);
302
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
303
          System.out.println("publicID: " + publicId.toString());
304
          System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
305
          if(!publicId.contains(v.elementAt(3)))
306
          { //so we don't have this public id in our local table so we need to
307
            //add it.
308
            System.out.println("in if");
309
            StringBuffer sql = new StringBuffer();
310
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
311
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
312
            sql.append("?,?)");
313
            System.out.println("sql: " + sql.toString());
314
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
315
            pstmt.setString(1, (String)v.elementAt(0));
316
            pstmt.setString(2, (String)v.elementAt(1));
317
            pstmt.setString(3, (String)v.elementAt(2));
318
            pstmt.setString(4, (String)v.elementAt(3));
319
            pstmt.setString(5, (String)v.elementAt(4));
320
            pstmt.execute();
321
          }
322
        }
323
      }
324
    }
325
    catch(Exception e)
326
    {
327
      System.out.println("error in updateCatalog: " + e.getMessage());
328
      e.printStackTrace(System.out);
329
    }
330
  }
331
  
332
  /**
333
   * Method that returns true if docid has already been "deleted" from metacat.
334
   * This method really implements a truth table for deleted documents
335
   * The table is (a docid in one of the tables is represented by the X):
336
   * xml_docs      xml_revs      deleted?
337
   * ------------------------------------
338
   *   X             X             FALSE
339
   *   X             _             FALSE
340
   *   _             X             TRUE
341
   *   _             _             TRUE
342
   */
343
  private static boolean alreadyDeleted(String docid, Connection conn)
344
  {
345
    try
346
    {
347
      boolean xml_docs = false;
348
      boolean xml_revs = false;
349
      
350
      StringBuffer sb = new StringBuffer();
351
      sb.append("select docid from xml_revisions where docid like '");
352
      sb.append(docid).append("'");
353
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
354
      pstmt.execute();
355
      ResultSet rs = pstmt.getResultSet();
356
      boolean tablehasrows = rs.next();
357
      if(tablehasrows)
358
      {
359
        xml_revs = true;
360
      }
361
      
362
      sb = new StringBuffer();
363
      sb.append("select docid from xml_documents where docid like '");
364
      sb.append(docid).append("'");
365
      pstmt = conn.prepareStatement(sb.toString());
366
      pstmt.execute();
367
      rs = pstmt.getResultSet();
368
      tablehasrows = rs.next();
369
      if(tablehasrows)
370
      {
371
        xml_docs = true;
372
      }
373
      
374
      if(xml_docs && xml_revs)
375
      {
376
        return false;
377
      }
378
      else if(xml_docs && !xml_revs)
379
      {
380
        return false;
381
      }
382
      else if(!xml_docs && xml_revs)
383
      {
384
        return true;
385
      }
386
      else if(!xml_docs && !xml_revs)
387
      {
388
        return true;
389
      }
390
    }
391
    catch(Exception e)
392
    {
393
      System.out.println("error in alreadyDeleted: " + e.getMessage());
394
      e.printStackTrace(System.out);
395
    }
396
    return false;
397
  }
398
  
399
  /**
400
   * Checks to see if a document is already in the DB.  Returns
401
   * "UPDATE" if it is, "INSERT" if it isn't
402
   */
403
  private static String getAction(String docid)
404
  {
405
    try
406
    {
407
      MetaCatUtil util = new MetaCatUtil();
408
      StringBuffer sql = new StringBuffer();
409
      sql.append("select docid from xml_documents where docid like '");
410
      sql.append(docid).append("'");
411
      Connection conn = util.openDBConnection();
412
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
413
      pstmt.execute();
414
      ResultSet rs = pstmt.getResultSet();
415

    
416
      if(rs.next())
417
      {
418
        conn.close();
419
        return "UPDATE";
420
      }
421
      else
422
      {
423
        conn.close();
424
        return "INSERT";
425
      }
426
    }
427
    catch(Exception e)
428
    {
429
      System.out.println("error in replicationHandler.getAction: " + 
430
                          e.getMessage());
431
    }
432
    return "";
433
  }
434
  
435
  /**
436
   * Method to query xml_replication and build a hashtable of each server
437
   * and it's last update time.
438
   * @param conn a connection to the database
439
   */
440
  public static Hashtable buildServerList(Connection conn)
441
  {
442
    Hashtable sl = new Hashtable();
443
    PreparedStatement pstmt;   
444
    try
445
    {
446
      pstmt = conn.prepareStatement("select server, last_checked from " +
447
                                    "xml_replication");
448
      pstmt.execute();
449
      ResultSet rs = pstmt.getResultSet();
450
      boolean tableHasRows = rs.next();
451
      while(tableHasRows)
452
      {
453
        String server = rs.getString(1);
454
        String last_checked = rs.getString(2);
455
        if(!server.equals("localhost"))
456
        {
457
          sl.put(server, last_checked);
458
        }
459
        tableHasRows = rs.next();
460
      }
461
    }
462
    catch(Exception e)
463
    {
464
      System.out.println("error in replicationHandler.buildServerList(): " +
465
                         e.getMessage());
466
    }
467
    return sl;
468
  }
469
  
470
  /**
471
   * Method to initialize the message parser
472
   */
473
  public static XMLReader initParser(DefaultHandler dh)
474
          throws Exception
475
  {
476
    XMLReader parser = null;
477

    
478
    try {
479
      ContentHandler chandler = dh;
480

    
481
      // Get an instance of the parser
482
      MetaCatUtil util = new MetaCatUtil();
483
      String parserName = util.getOption("saxparser");
484
      parser = XMLReaderFactory.createXMLReader(parserName);
485

    
486
      // Turn off validation
487
      parser.setFeature("http://xml.org/sax/features/validation", false);
488
      
489
      parser.setContentHandler((ContentHandler)chandler);
490
      parser.setErrorHandler((ErrorHandler)chandler);
491

    
492
    } catch (Exception e) {
493
      throw e;
494
    }
495

    
496
    return parser;
497
  }
498
}
(37-37/38)