Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-21 09:03:27 -0800 (Thu, 21 Dec 2000) $'
11
 * '$Revision: 629 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import java.text.*;
22
import org.xml.sax.AttributeList;
23
import org.xml.sax.ContentHandler;
24
import org.xml.sax.DTDHandler;
25
import org.xml.sax.EntityResolver;
26
import org.xml.sax.ErrorHandler;
27
import org.xml.sax.InputSource;
28
import org.xml.sax.XMLReader;
29
import org.xml.sax.SAXException;
30
import org.xml.sax.SAXParseException;
31
import org.xml.sax.helpers.XMLReaderFactory;
32
import org.xml.sax.helpers.DefaultHandler;
33

    
34

    
35

    
36
/**
37
 * This class handles deltaT replication checking.  Whenever this TimerTask
38
 * is fired it checks each server in xml_replication for updates and updates
39
 * the local db as needed.
40
 */
41
public class ReplicationHandler extends TimerTask
42
{
43
  int serverCheckCode = 1;
44
  MetaCatUtil util = new MetaCatUtil();
45
  Hashtable serverList = new Hashtable(); 
46
  Connection conn;
47
  PrintWriter out;
48
  
49
  public ReplicationHandler(PrintWriter o)
50
  {
51
    this.out = o;
52
  }
53
  
54
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
55
  {
56
    this.out = o;
57
    this.serverCheckCode = serverCheckCode;
58
  }
59
  
60
  /**
61
   * Method that implements TimerTask.run().  It runs whenever the timer is 
62
   * fired.
63
   */
64
  public void run()
65
  {
66
    //find out the last_checked time of each server in the server list and
67
    //send a query to each server to see if there are any documents in 
68
    //xml_documents with an update_date > last_checked
69
    try
70
    {
71
      conn = util.openDBConnection();
72
      serverList = buildServerList(conn);
73
      update(serverList, conn);
74
      updateCatalog(serverList, conn);
75
      conn.close();
76
    }
77
    catch (Exception e)
78
    {
79
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
80
    }
81
  }
82
  
83
  /**
84
   * Method that uses revision taging for replication instead of update_date.
85
   */
86
  private void update(Hashtable serverList, Connection conn)
87
  {
88
    /*
89
     Pseudo-algorithm
90
     - request a doc list from each server in xml_replication
91
     - check the rev number of each of those documents agains the 
92
       documents in the local database
93
     - pull any documents that have a lesser rev number on the local server
94
       from the remote server
95
     - delete any documents that still exist in the local xml_documents but
96
       are in the deletedDocuments tag of the remote host response.
97
     - update last_checked to keep track of the last time it was checked.
98
       (this info is theoretically not needed using this system but probably 
99
       should be kept anyway)
100
    */
101
    Enumeration keys;
102
    String server;
103
    String update;
104
    ReplMessageHandler message = new ReplMessageHandler();
105
    Vector responses = new Vector();
106
    PreparedStatement pstmt;
107
    ResultSet rs;
108
    boolean tablehasrows;
109
    boolean flag=false;
110
    String action = new String();
111
    XMLReader parser;
112
    URL u;
113
    
114
    try
115
    {
116
      MetaCatUtil.debugMessage("init parser");
117
      parser = initParser(message);
118
      keys = serverList.keys();
119
      while(keys.hasMoreElements())
120
      {
121
        server = (String)(keys.nextElement());
122
        MetacatReplication.replLog("full update started to: " + server);
123
        u = new URL("http://" + server + "?action=update");
124
        System.out.println("Sending Message: " + u.toString());
125
        String result = MetacatReplication.getURLContent(u);
126
        responses.add(result);
127
      }
128
      
129
      //System.out.println("responses: " + responses.toString());
130
      
131
      for(int i=0; i<responses.size(); i++)
132
      { //check each server for updated files
133
        //System.out.println("parsing responses");
134
        parser.parse(new InputSource(
135
                     new StringReader(
136
                     (String)(responses.elementAt(i)))));
137
        Vector v = new Vector(message.getUpdatesVect());
138
        //System.out.println("v: " + v.toString());
139
        Vector d = new Vector(message.getDeletesVect());
140
        //check the revs in u to see if there are any newer ones than
141
        //in the local DB.
142
        for(int j=0; j<v.size(); j++)
143
        {
144
          Vector w = new Vector((Vector)(v.elementAt(j)));
145
          //System.out.println("w: " + w.toString());
146
          String docid = (String)w.elementAt(0);
147
          //System.out.println("docid: " + docid);
148
          int rev = Integer.parseInt((String)w.elementAt(1));
149
          //System.out.println("rev: " + rev);
150
          String docServer = (String)w.elementAt(2);
151
          //System.out.println("docServer: " + docServer);
152
    
153
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
154
                                        "docid like '" + docid + "'");
155
          pstmt.execute();
156
          rs = pstmt.getResultSet();
157
          tablehasrows = rs.next();
158
          if(tablehasrows)
159
          { //check the revs for an update because this document is in the
160
            //local DB, it just might be out of date.
161
            int localrev = rs.getInt(1);
162
            if(localrev == rev)
163
            {
164
              flag = false;
165
            }
166
            else if(localrev < rev)
167
            {//this document needs to be updated so send an read request
168
              action = "UPDATE";
169
              flag = true;
170
            }
171
          }
172
          else
173
          { //insert this document as new because it is not in the local DB
174
            action = "INSERT";
175
            flag = true;
176
          }
177
          
178
          if(flag)
179
          { //if the document needs to be updated or inserted, this is executed
180
            u = new URL("http://" + docServer + "?action=read&docid=" +
181
                          docid);
182
            System.out.println("Sending message: " + u.toString());
183
            String newxmldoc = MetacatReplication.getURLContent(u);
184
            DocInfoHandler dih = new DocInfoHandler();
185
            XMLReader docinfoParser = initParser(dih);
186
            URL docinfoUrl = new URL("http://" + docServer + 
187
                                   "?action=getdocumentinfo&docid=" +
188
                                   docid);
189
            System.out.println("Sending message: " + docinfoUrl.toString());
190
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
191
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
192
            Hashtable docinfoHash = dih.getDocInfo();
193
            int serverCode = MetacatReplication.getServerCode(docServer);
194
            System.out.println("updating doc: " + docid + " action: "+ action);
195
            try
196
            {
197
              String newDocid = DocumentImpl.write(conn, 
198
                              new StringReader(newxmldoc),
199
                              null,
200
                              action, 
201
                              docid, 
202
                              (String)docinfoHash.get("user_owner"),
203
                              (String)docinfoHash.get("user_owner"), 
204
                              serverCode, 
205
                              true);
206
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
207
                                         docServer);
208
              System.out.println("wrote doc " + docid + " from " + 
209
                                 docServer);
210
            }
211
            catch(Exception e)
212
            {
213
              System.out.println("error writing document " + docid);
214
            }
215
          }
216
        }
217
        
218
        for(int k=0; k<d.size(); k++)
219
        { //delete the deleted documents;
220
          Vector w = new Vector((Vector)d.elementAt(k));
221
          String docid = (String)w.elementAt(0);
222
          if(!alreadyDeleted(docid, conn))
223
          {
224
            DocumentImpl.delete(conn, docid, null, null);
225
            System.out.println("Document " + docid + " deleted.");
226
            MetacatReplication.replLog("doc " + docid + " deleted");
227
          }
228
        }
229
        
230
        keys = serverList.keys();
231
        while(keys.hasMoreElements())
232
        {
233
          server = (String)(keys.nextElement()); 
234
          URL dateurl = new URL("http://" + server + "?action=gettime");
235
          String datexml = MetacatReplication.getURLContent(dateurl);
236
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
237
          StringBuffer sql = new StringBuffer();
238
          sql.append("update xml_replication set last_checked = to_date('");
239
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
240
          sql.append("server like '").append(server).append("'");
241
          //System.out.println("sql: " + sql.toString());
242
          pstmt = conn.prepareStatement(sql.toString());
243
          pstmt.executeUpdate();
244
          //conn.commit();
245
          System.out.println("last_checked updated to " + datestr + " on " +
246
                            server);
247
        }
248
      }
249
    }
250
    catch(Exception e)
251
    {
252
      System.out.println("error in update2: " + e.getMessage());
253
      e.printStackTrace(System.out);
254
    }
255
  }
256
  
257
  /**
258
   * updates xml_catalog with entries from other servers.
259
   */
260
  private void updateCatalog(Hashtable serverList, Connection conn)
261
  {
262
    System.out.println("in updateCatalog");
263
    try
264
    {
265
      String server;
266
      Enumeration keys = serverList.keys();
267
      while(keys.hasMoreElements())
268
      { //go through each server
269
        server = (String)(keys.nextElement());
270
        URL u = new URL("http://" + server + "?action=getcatalog");
271
        System.out.println("sending message " + u.toString());
272
        String catxml = MetacatReplication.getURLContent(u);
273
        //System.out.println("catxml: " + catxml);
274
        CatalogMessageHandler cmh = new CatalogMessageHandler();
275
        XMLReader catparser = initParser(cmh);
276
        catparser.parse(new InputSource(new StringReader(catxml)));
277
        //parse the returned catalog xml and put it into a vector
278
        Vector remoteCatalog = cmh.getCatalogVect();
279
        
280
        String localcatxml = MetacatReplication.getCatalogXML();
281
        //System.out.println("localcatxml: " + localcatxml);
282
        cmh = new CatalogMessageHandler();
283
        catparser = initParser(cmh);
284
        catparser.parse(new InputSource(new StringReader(localcatxml)));
285
        Vector localCatalog = cmh.getCatalogVect();
286
        
287
        //now we have the catalog from the remote server and this local server
288
        //we now need to compare the two and merge the differences.
289
        //the comparison is base on the public_id fields which is the 4th
290
        //entry in each row vector.
291
        Vector publicId = new Vector();
292
        for(int i=0; i<localCatalog.size(); i++)
293
        {
294
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
295
          //System.out.println("v1: " + v.toString());
296
          publicId.add(new String((String)v.elementAt(3)));
297
          //System.out.println("adding " + (String)v.elementAt(3));
298
        }
299
        
300
        for(int i=0; i<remoteCatalog.size(); i++)
301
        {
302
          Vector v = (Vector)remoteCatalog.elementAt(i);
303
          //System.out.println("v2: " + v.toString());
304
          //System.out.println("i: " + i);
305
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
306
          //System.out.println("publicID: " + publicId.toString());
307
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
308
          if(!publicId.contains(v.elementAt(3)))
309
          { //so we don't have this public id in our local table so we need to
310
            //add it.
311
            //System.out.println("in if");
312
            StringBuffer sql = new StringBuffer();
313
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
314
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
315
            sql.append("?,?)");
316
            //System.out.println("sql: " + sql.toString());
317
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
318
            pstmt.setString(1, (String)v.elementAt(0));
319
            pstmt.setString(2, (String)v.elementAt(1));
320
            pstmt.setString(3, (String)v.elementAt(2));
321
            pstmt.setString(4, (String)v.elementAt(3));
322
            pstmt.setString(5, (String)v.elementAt(4));
323
            pstmt.execute();
324
          }
325
        }
326
      }
327
    }
328
    catch(Exception e)
329
    {
330
      System.out.println("error in updateCatalog: " + e.getMessage());
331
      e.printStackTrace(System.out);
332
    }
333
  }
334
  
335
  /**
336
   * Method that returns true if docid has already been "deleted" from metacat.
337
   * This method really implements a truth table for deleted documents
338
   * The table is (a docid in one of the tables is represented by the X):
339
   * xml_docs      xml_revs      deleted?
340
   * ------------------------------------
341
   *   X             X             FALSE
342
   *   X             _             FALSE
343
   *   _             X             TRUE
344
   *   _             _             TRUE
345
   */
346
  private static boolean alreadyDeleted(String docid, Connection conn)
347
  {
348
    try
349
    {
350
      boolean xml_docs = false;
351
      boolean xml_revs = false;
352
      
353
      StringBuffer sb = new StringBuffer();
354
      sb.append("select docid from xml_revisions where docid like '");
355
      sb.append(docid).append("'");
356
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
357
      pstmt.execute();
358
      ResultSet rs = pstmt.getResultSet();
359
      boolean tablehasrows = rs.next();
360
      if(tablehasrows)
361
      {
362
        xml_revs = true;
363
      }
364
      
365
      sb = new StringBuffer();
366
      sb.append("select docid from xml_documents where docid like '");
367
      sb.append(docid).append("'");
368
      pstmt = conn.prepareStatement(sb.toString());
369
      pstmt.execute();
370
      rs = pstmt.getResultSet();
371
      tablehasrows = rs.next();
372
      if(tablehasrows)
373
      {
374
        xml_docs = true;
375
      }
376
      
377
      if(xml_docs && xml_revs)
378
      {
379
        return false;
380
      }
381
      else if(xml_docs && !xml_revs)
382
      {
383
        return false;
384
      }
385
      else if(!xml_docs && xml_revs)
386
      {
387
        return true;
388
      }
389
      else if(!xml_docs && !xml_revs)
390
      {
391
        return true;
392
      }
393
    }
394
    catch(Exception e)
395
    {
396
      System.out.println("error in alreadyDeleted: " + e.getMessage());
397
      e.printStackTrace(System.out);
398
    }
399
    return false;
400
  }
401
  
402
  /**
403
   * Checks to see if a document is already in the DB.  Returns
404
   * "UPDATE" if it is, "INSERT" if it isn't
405
   */
406
  private static String getAction(String docid)
407
  {
408
    try
409
    {
410
      MetaCatUtil util = new MetaCatUtil();
411
      StringBuffer sql = new StringBuffer();
412
      sql.append("select docid from xml_documents where docid like '");
413
      sql.append(docid).append("'");
414
      Connection conn = util.openDBConnection();
415
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
416
      pstmt.execute();
417
      ResultSet rs = pstmt.getResultSet();
418

    
419
      if(rs.next())
420
      {
421
        conn.close();
422
        return "UPDATE";
423
      }
424
      else
425
      {
426
        conn.close();
427
        return "INSERT";
428
      }
429
    }
430
    catch(Exception e)
431
    {
432
      System.out.println("error in replicationHandler.getAction: " + 
433
                          e.getMessage());
434
    }
435
    return "";
436
  }
437
  
438
  /**
439
   * Method to query xml_replication and build a hashtable of each server
440
   * and it's last update time.
441
   * @param conn a connection to the database
442
   */
443
  public static Hashtable buildServerList(Connection conn)
444
  {
445
    Hashtable sl = new Hashtable();
446
    PreparedStatement pstmt;   
447
    try
448
    {
449
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
450
                                    "from xml_replication");
451
      pstmt.execute();
452
      ResultSet rs = pstmt.getResultSet();
453
      boolean tableHasRows = rs.next();
454
      while(tableHasRows)
455
      {
456
        if(rs.getInt(3) == 1)
457
        {//only put the server in the list if the replicate flag is true
458
          String server = rs.getString(1);
459
          String last_checked = rs.getString(2);
460
          if(!server.equals("localhost"))
461
          {
462
            sl.put(server, last_checked);
463
          }
464
        }
465
        tableHasRows = rs.next();   
466
      }
467
    }
468
    catch(Exception e)
469
    {
470
      System.out.println("error in replicationHandler.buildServerList(): " +
471
                         e.getMessage());
472
    }
473
    return sl;
474
  }
475
  
476
  /**
477
   * Method to initialize the message parser
478
   */
479
  public static XMLReader initParser(DefaultHandler dh)
480
          throws Exception
481
  {
482
    XMLReader parser = null;
483

    
484
    try {
485
      ContentHandler chandler = dh;
486

    
487
      // Get an instance of the parser
488
      MetaCatUtil util = new MetaCatUtil();
489
      String parserName = util.getOption("saxparser");
490
      parser = XMLReaderFactory.createXMLReader(parserName);
491

    
492
      // Turn off validation
493
      parser.setFeature("http://xml.org/sax/features/validation", false);
494
      
495
      parser.setContentHandler((ContentHandler)chandler);
496
      parser.setErrorHandler((ErrorHandler)chandler);
497

    
498
    } catch (Exception e) {
499
      throw e;
500
    }
501

    
502
    return parser;
503
  }
504
}
(41-41/42)