Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: jones $'
10
 *     '$Date: 2001-01-18 11:52:00 -0800 (Thu, 18 Jan 2001) $'
11
 * '$Revision: 669 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      conn = util.openDBConnection();
86
      serverList = buildServerList(conn);
87
      update(serverList, conn);
88
      updateCatalog(serverList, conn);
89
      conn.close();
90
    }
91
    catch (Exception e)
92
    {
93
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
94
    }
95
  }
96
  
97
  /**
98
   * Method that uses revision taging for replication instead of update_date.
99
   */
100
  private void update(Hashtable serverList, Connection conn)
101
  {
102
    /*
103
     Pseudo-algorithm
104
     - request a doc list from each server in xml_replication
105
     - check the rev number of each of those documents agains the 
106
       documents in the local database
107
     - pull any documents that have a lesser rev number on the local server
108
       from the remote server
109
     - delete any documents that still exist in the local xml_documents but
110
       are in the deletedDocuments tag of the remote host response.
111
     - update last_checked to keep track of the last time it was checked.
112
       (this info is theoretically not needed using this system but probably 
113
       should be kept anyway)
114
    */
115
    Enumeration keys;
116
    String server;
117
    String update;
118
    ReplMessageHandler message = new ReplMessageHandler();
119
    Vector responses = new Vector();
120
    PreparedStatement pstmt = null;
121
    ResultSet rs;
122
    boolean tablehasrows;
123
    boolean flag=false;
124
    String action = new String();
125
    XMLReader parser;
126
    URL u;
127
    
128
    try
129
    {
130
      MetaCatUtil.debugMessage("init parser");
131
      parser = initParser(message);
132
      keys = serverList.keys();
133
      while(keys.hasMoreElements())
134
      {
135
        server = (String)(keys.nextElement());
136
        MetacatReplication.replLog("full update started to: " + server);
137
        u = new URL("http://" + server + "?action=update");
138
        System.out.println("Sending Message: " + u.toString());
139
        String result = MetacatReplication.getURLContent(u);
140
        responses.add(result);
141
      }
142
      
143
      //System.out.println("responses: " + responses.toString());
144
      
145
      for(int i=0; i<responses.size(); i++)
146
      { //check each server for updated files
147
        //System.out.println("parsing responses");
148
        parser.parse(new InputSource(
149
                     new StringReader(
150
                     (String)(responses.elementAt(i)))));
151
        Vector v = new Vector(message.getUpdatesVect());
152
        //System.out.println("v: " + v.toString());
153
        Vector d = new Vector(message.getDeletesVect());
154
        //check the revs in u to see if there are any newer ones than
155
        //in the local DB.
156
        for(int j=0; j<v.size(); j++)
157
        {
158
          Vector w = new Vector((Vector)(v.elementAt(j)));
159
          //System.out.println("w: " + w.toString());
160
          String docid = (String)w.elementAt(0);
161
          //System.out.println("docid: " + docid);
162
          int rev = Integer.parseInt((String)w.elementAt(1));
163
          //System.out.println("rev: " + rev);
164
          String docServer = (String)w.elementAt(2);
165
          //System.out.println("docServer: " + docServer);
166
    
167
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
168
                                        "docid like '" + docid + "'");
169
          pstmt.execute();
170
          rs = pstmt.getResultSet();
171
          tablehasrows = rs.next();
172
          if(tablehasrows)
173
          { //check the revs for an update because this document is in the
174
            //local DB, it just might be out of date.
175
            int localrev = rs.getInt(1);
176
            if(localrev == rev)
177
            {
178
              flag = false;
179
            }
180
            else if(localrev < rev)
181
            {//this document needs to be updated so send an read request
182
              action = "UPDATE";
183
              flag = true;
184
            }
185
          }
186
          else
187
          { //insert this document as new because it is not in the local DB
188
            action = "INSERT";
189
            flag = true;
190
          }
191
          
192
          if(flag)
193
          { //if the document needs to be updated or inserted, this is executed
194
            u = new URL("http://" + docServer + "?action=read&docid=" +
195
                          docid);
196
            System.out.println("Sending message: " + u.toString());
197
            String newxmldoc = MetacatReplication.getURLContent(u);
198
            DocInfoHandler dih = new DocInfoHandler();
199
            XMLReader docinfoParser = initParser(dih);
200
            URL docinfoUrl = new URL("http://" + docServer + 
201
                                   "?action=getdocumentinfo&docid=" +
202
                                   docid);
203
            System.out.println("Sending message: " + docinfoUrl.toString());
204
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
205
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
206
            Hashtable docinfoHash = dih.getDocInfo();
207
            int serverCode = MetacatReplication.getServerCode(docServer);
208
            System.out.println("updating doc: " + docid + " action: "+ action);
209
            try
210
            {
211
              String newDocid = DocumentImpl.write(conn, 
212
                              new StringReader(newxmldoc),
213
                              null,
214
                              action, 
215
                              docid, 
216
                              (String)docinfoHash.get("user_owner"),
217
                              (String)docinfoHash.get("user_owner"), 
218
                              serverCode, 
219
                              true);
220
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
221
                                         docServer);
222
              System.out.println("wrote doc " + docid + " from " + 
223
                                 docServer);
224
            }
225
            catch(Exception e)
226
            {
227
              System.out.println("error writing document " + docid);
228
            }
229
          }
230
        }
231
        
232
        for(int k=0; k<d.size(); k++)
233
        { //delete the deleted documents;
234
          Vector w = new Vector((Vector)d.elementAt(k));
235
          String docid = (String)w.elementAt(0);
236
          if(!alreadyDeleted(docid, conn))
237
          {
238
            DocumentImpl.delete(conn, docid, null, null);
239
            System.out.println("Document " + docid + " deleted.");
240
            MetacatReplication.replLog("doc " + docid + " deleted");
241
          }
242
        }
243
        
244
        keys = serverList.keys();
245
        while(keys.hasMoreElements())
246
        {
247
          server = (String)(keys.nextElement()); 
248
          URL dateurl = new URL("http://" + server + "?action=gettime");
249
          String datexml = MetacatReplication.getURLContent(dateurl);
250
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
251
          StringBuffer sql = new StringBuffer();
252
          sql.append("update xml_replication set last_checked = to_date('");
253
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
254
          sql.append("server like '").append(server).append("'");
255
          //System.out.println("sql: " + sql.toString());
256
          pstmt.close();
257
          pstmt = conn.prepareStatement(sql.toString());
258
          pstmt.executeUpdate();
259
          //conn.commit();
260
          System.out.println("last_checked updated to " + datestr + " on " +
261
                            server);
262
        }
263
      }
264
    }
265
    catch(Exception e)
266
    {
267
      System.out.println("error in update2: " + e.getMessage());
268
      e.printStackTrace(System.out);
269
    }
270
    finally
271
    {
272
      try
273
      {
274
        pstmt.close();
275
      }
276
      catch(Exception ee) {}
277
    }
278
  }
279
  
280
  /**
281
   * updates xml_catalog with entries from other servers.
282
   */
283
  private void updateCatalog(Hashtable serverList, Connection conn)
284
  {
285
    System.out.println("in updateCatalog");
286
    try
287
    {
288
      String server;
289
      Enumeration keys = serverList.keys();
290
      while(keys.hasMoreElements())
291
      { //go through each server
292
        server = (String)(keys.nextElement());
293
        URL u = new URL("http://" + server + "?action=getcatalog");
294
        System.out.println("sending message " + u.toString());
295
        String catxml = MetacatReplication.getURLContent(u);
296
        //System.out.println("catxml: " + catxml);
297
        CatalogMessageHandler cmh = new CatalogMessageHandler();
298
        XMLReader catparser = initParser(cmh);
299
        catparser.parse(new InputSource(new StringReader(catxml)));
300
        //parse the returned catalog xml and put it into a vector
301
        Vector remoteCatalog = cmh.getCatalogVect();
302
        
303
        String localcatxml = MetacatReplication.getCatalogXML();
304
        //System.out.println("localcatxml: " + localcatxml);
305
        cmh = new CatalogMessageHandler();
306
        catparser = initParser(cmh);
307
        catparser.parse(new InputSource(new StringReader(localcatxml)));
308
        Vector localCatalog = cmh.getCatalogVect();
309
        
310
        //now we have the catalog from the remote server and this local server
311
        //we now need to compare the two and merge the differences.
312
        //the comparison is base on the public_id fields which is the 4th
313
        //entry in each row vector.
314
        Vector publicId = new Vector();
315
        for(int i=0; i<localCatalog.size(); i++)
316
        {
317
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
318
          //System.out.println("v1: " + v.toString());
319
          publicId.add(new String((String)v.elementAt(3)));
320
          //System.out.println("adding " + (String)v.elementAt(3));
321
        }
322
        
323
        for(int i=0; i<remoteCatalog.size(); i++)
324
        {
325
          Vector v = (Vector)remoteCatalog.elementAt(i);
326
          //System.out.println("v2: " + v.toString());
327
          //System.out.println("i: " + i);
328
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
329
          //System.out.println("publicID: " + publicId.toString());
330
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
331
          if(!publicId.contains(v.elementAt(3)))
332
          { //so we don't have this public id in our local table so we need to
333
            //add it.
334
            //System.out.println("in if");
335
            StringBuffer sql = new StringBuffer();
336
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
337
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
338
            sql.append("?,?)");
339
            //System.out.println("sql: " + sql.toString());
340
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
341
            pstmt.setString(1, (String)v.elementAt(0));
342
            pstmt.setString(2, (String)v.elementAt(1));
343
            pstmt.setString(3, (String)v.elementAt(2));
344
            pstmt.setString(4, (String)v.elementAt(3));
345
            pstmt.setString(5, (String)v.elementAt(4));
346
            pstmt.execute();
347
            pstmt.close();
348
          }
349
        }
350
      }
351
    }
352
    catch(Exception e)
353
    {
354
      System.out.println("error in updateCatalog: " + e.getMessage());
355
      e.printStackTrace(System.out);
356
    }
357
  }
358
  
359
  /**
360
   * Method that returns true if docid has already been "deleted" from metacat.
361
   * This method really implements a truth table for deleted documents
362
   * The table is (a docid in one of the tables is represented by the X):
363
   * xml_docs      xml_revs      deleted?
364
   * ------------------------------------
365
   *   X             X             FALSE
366
   *   X             _             FALSE
367
   *   _             X             TRUE
368
   *   _             _             TRUE
369
   */
370
  private static boolean alreadyDeleted(String docid, Connection conn)
371
  {
372
    try
373
    {
374
      boolean xml_docs = false;
375
      boolean xml_revs = false;
376
      
377
      StringBuffer sb = new StringBuffer();
378
      sb.append("select docid from xml_revisions where docid like '");
379
      sb.append(docid).append("'");
380
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
381
      pstmt.execute();
382
      ResultSet rs = pstmt.getResultSet();
383
      boolean tablehasrows = rs.next();
384
      if(tablehasrows)
385
      {
386
        xml_revs = true;
387
      }
388
      
389
      sb = new StringBuffer();
390
      sb.append("select docid from xml_documents where docid like '");
391
      sb.append(docid).append("'");
392
      pstmt.close();
393
      pstmt = conn.prepareStatement(sb.toString());
394
      pstmt.execute();
395
      rs = pstmt.getResultSet();
396
      tablehasrows = rs.next();
397
      pstmt.close();
398
      if(tablehasrows)
399
      {
400
        xml_docs = true;
401
      }
402
      
403
      if(xml_docs && xml_revs)
404
      {
405
        return false;
406
      }
407
      else if(xml_docs && !xml_revs)
408
      {
409
        return false;
410
      }
411
      else if(!xml_docs && xml_revs)
412
      {
413
        return true;
414
      }
415
      else if(!xml_docs && !xml_revs)
416
      {
417
        return true;
418
      }
419
    }
420
    catch(Exception e)
421
    {
422
      System.out.println("error in alreadyDeleted: " + e.getMessage());
423
      e.printStackTrace(System.out);
424
    }
425
    finally
426
    {
427
      
428
    }
429
    return false;
430
  }
431
  
432
  /**
433
   * Checks to see if a document is already in the DB.  Returns
434
   * "UPDATE" if it is, "INSERT" if it isn't
435
   */
436
  private static String getAction(String docid)
437
  {
438
    try
439
    {
440
      MetaCatUtil util = new MetaCatUtil();
441
      StringBuffer sql = new StringBuffer();
442
      sql.append("select docid from xml_documents where docid like '");
443
      sql.append(docid).append("'");
444
      Connection conn = util.openDBConnection();
445
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
446
      pstmt.execute();
447
      ResultSet rs = pstmt.getResultSet();
448

    
449
      if(rs.next())
450
      {
451
        pstmt.close();
452
        conn.close();
453
        return "UPDATE";
454
      }
455
      else
456
      {
457
        pstmt.close();
458
        conn.close();
459
        return "INSERT";
460
      }
461
    }
462
    catch(Exception e)
463
    {
464
      System.out.println("error in replicationHandler.getAction: " + 
465
                          e.getMessage());
466
    }
467
    return "";
468
  }
469
  
470
  /**
471
   * Method to query xml_replication and build a hashtable of each server
472
   * and it's last update time.
473
   * @param conn a connection to the database
474
   */
475
  public static Hashtable buildServerList(Connection conn)
476
  {
477
    Hashtable sl = new Hashtable();
478
    PreparedStatement pstmt;   
479
    try
480
    {
481
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
482
                                    "from xml_replication");
483
      pstmt.execute();
484
      ResultSet rs = pstmt.getResultSet();
485
      boolean tableHasRows = rs.next();
486
      while(tableHasRows)
487
      {
488
        if(rs.getInt(3) == 1)
489
        {//only put the server in the list if the replicate flag is true
490
          String server = rs.getString(1);
491
          String last_checked = rs.getString(2);
492
          if(!server.equals("localhost"))
493
          {
494
            sl.put(server, last_checked);
495
          }
496
        }
497
        tableHasRows = rs.next();   
498
      }
499
      pstmt.close();
500
    }
501
    catch(Exception e)
502
    {
503
      System.out.println("error in replicationHandler.buildServerList(): " +
504
                         e.getMessage());
505
    }
506
    return sl;
507
  }
508
  
509
  /**
510
   * Method to initialize the message parser
511
   */
512
  public static XMLReader initParser(DefaultHandler dh)
513
          throws Exception
514
  {
515
    XMLReader parser = null;
516

    
517
    try {
518
      ContentHandler chandler = dh;
519

    
520
      // Get an instance of the parser
521
      MetaCatUtil util = new MetaCatUtil();
522
      String parserName = util.getOption("saxparser");
523
      parser = XMLReaderFactory.createXMLReader(parserName);
524

    
525
      // Turn off validation
526
      parser.setFeature("http://xml.org/sax/features/validation", false);
527
      
528
      parser.setContentHandler((ContentHandler)chandler);
529
      parser.setErrorHandler((ErrorHandler)chandler);
530

    
531
    } catch (Exception e) {
532
      throw e;
533
    }
534

    
535
    return parser;
536
  }
537
}
(42-42/43)