Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: bojilova $'
10
 *     '$Date: 2001-02-07 14:07:17 -0800 (Wed, 07 Feb 2001) $'
11
 * '$Revision: 697 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      try
86
      { //this connection is prone to error for some reason so we 
87
        //try to connect it three times before quiting.
88
        conn = util.openDBConnection();
89
      }
90
      catch(SQLException sqle)
91
      {
92
        try
93
        {
94
          conn = util.openDBConnection();
95
        }
96
        catch(SQLException sqlee)
97
        {
98
          try
99
          {
100
            conn = util.openDBConnection();
101
          }
102
          catch(SQLException sqleee)
103
          {
104
            System.out.println("error getting db connection in " + 
105
                               "ReplicationHandler.run: " +
106
                               sqleee.getMessage());
107
          }
108
        }
109
      }
110
      serverList = buildServerList(conn);
111
      updateCatalog(serverList);
112
      update(serverList);
113
      conn.close();
114
    }
115
    catch (Exception e)
116
    {
117
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
118
    }
119
  }
120
  
121
  /**
122
   * Method that uses revision taging for replication instead of update_date.
123
   */
124
  private void update(Hashtable serverList)
125
  {
126
    /*
127
     Pseudo-algorithm
128
     - request a doc list from each server in xml_replication
129
     - check the rev number of each of those documents agains the 
130
       documents in the local database
131
     - pull any documents that have a lesser rev number on the local server
132
       from the remote server
133
     - delete any documents that still exist in the local xml_documents but
134
       are in the deletedDocuments tag of the remote host response.
135
     - update last_checked to keep track of the last time it was checked.
136
       (this info is theoretically not needed using this system but probably 
137
       should be kept anyway)
138
    */
139
    
140
    Connection conn = null;
141
    try
142
    {
143
      try
144
      { //this connection is prone to error for some reason so we 
145
        //try to connect it three times before quiting.
146
        conn = util.openDBConnection();
147
      }
148
      catch(SQLException sqle)
149
      {
150
        try
151
        {
152
          conn = util.openDBConnection();
153
        }
154
        catch(SQLException sqlee)
155
        {
156
          try
157
          {
158
            conn = util.openDBConnection();
159
          }
160
          catch(SQLException sqleee)
161
          {
162
            System.out.println("error getting db connection in " + 
163
                               "ReplicationHandler.update: " +
164
                               sqleee.getMessage());
165
          }
166
        }
167
      }
168
    }
169
    catch(Exception e)
170
    {
171
      System.out.println("error in ReplicationHandler.update: " + 
172
                          e.getMessage());
173
    }
174
    
175
    Enumeration keys;
176
    String server;
177
    String update;
178
    ReplMessageHandler message = new ReplMessageHandler();
179
    Vector responses = new Vector();
180
    PreparedStatement pstmt = null;
181
    ResultSet rs;
182
    boolean tablehasrows;
183
    boolean flag=false;
184
    String action = new String();
185
    XMLReader parser;
186
    URL u;
187
    
188
    try
189
    {
190
      MetaCatUtil.debugMessage("init parser");
191
      parser = initParser(message);
192
      keys = serverList.keys();
193
      while(keys.hasMoreElements())
194
      {
195
        server = (String)(keys.nextElement());
196
        MetacatReplication.replLog("full update started to: " + server);
197
        u = new URL("http://" + server + "?action=update");
198
        System.out.println("Sending Message: " + u.toString());
199
        String result = MetacatReplication.getURLContent(u);
200
        responses.add(result);
201
      }
202
      
203
      //String srvr = util.getOption("servletpath");
204
      
205
      //System.out.println("responses (from srvr): " + responses.toString());
206
      
207
      for(int i=0; i<responses.size(); i++)
208
      { //check each server for updated files
209
        //System.out.println("parsing responses");
210
        parser.parse(new InputSource(
211
                     new StringReader(
212
                     (String)(responses.elementAt(i)))));
213
        Vector v = new Vector(message.getUpdatesVect());
214
        //System.out.println("v: " + v.toString());
215
        Vector d = new Vector(message.getDeletesVect());
216
        //check the revs in u to see if there are any newer ones than
217
        //in the local DB.
218
        for(int j=0; j<v.size(); j++)
219
        {
220
          Vector w = new Vector((Vector)(v.elementAt(j)));
221
          //System.out.println("w: " + w.toString());
222
          String docid = (String)w.elementAt(0);
223
          //System.out.println("docid: " + docid);
224
          int rev = Integer.parseInt((String)w.elementAt(1));
225
          //System.out.println("rev: " + rev);
226
          String docServer = (String)w.elementAt(2);
227
          //System.out.println("docServer: " + docServer);
228
    
229
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
230
                                        "docid like '" + docid + "'");
231
          pstmt.execute();
232
          rs = pstmt.getResultSet();
233
          tablehasrows = rs.next();
234
          if(tablehasrows)
235
          { //check the revs for an update because this document is in the
236
            //local DB, it just might be out of date.
237
            int localrev = rs.getInt(1);
238
            if(localrev == rev)
239
            {
240
              flag = false;
241
            }
242
            else if(localrev < rev)
243
            {//this document needs to be updated so send an read request
244
              action = "UPDATE";
245
              flag = true;
246
            }
247
          }
248
          else
249
          { //insert this document as new because it is not in the local DB
250
            action = "INSERT";
251
            flag = true;
252
          }
253
          
254
          if(flag)
255
          { //if the document needs to be updated or inserted, this is executed
256
            u = new URL("http://" + docServer + "?action=read&docid=" +
257
                          docid);
258
            System.out.println("Sending message: " + u.toString());
259
            String newxmldoc = MetacatReplication.getURLContent(u);
260
            DocInfoHandler dih = new DocInfoHandler();
261
            XMLReader docinfoParser = initParser(dih);
262
            URL docinfoUrl = new URL("http://" + docServer + 
263
                                   "?action=getdocumentinfo&docid=" +
264
                                   docid);
265
            System.out.println("Sending message: " + docinfoUrl.toString());
266
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
267
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
268
            Hashtable docinfoHash = dih.getDocInfo();
269
            int serverCode = MetacatReplication.getServerCode(docServer);
270
            System.out.println("updating doc: " + docid + " action: "+ action);
271
            try
272
            {
273
              String newDocid = DocumentImpl.write(conn, 
274
                              new StringReader(newxmldoc),
275
                              (String)docinfoHash.get("public_access"),
276
                              null,  /* the dtd text */
277
                              action, 
278
                              docid, 
279
                              (String)docinfoHash.get("user_owner"),
280
                              (String)docinfoHash.get("user_owner"), 
281
                              serverCode, 
282
                              true, /* override */
283
                              false /* validate */);
284
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
285
                                         docServer);
286
              System.out.println("wrote doc " + docid + " from " + 
287
                                 docServer);
288
            }
289
            catch(Exception e)
290
            {
291
              System.out.println("error writing document in " + 
292
                                 "ReplicationHandler.update: " + docid +
293
                                 ": " + e.getMessage());
294
            }
295
          }
296
        }
297
        
298
        for(int k=0; k<d.size(); k++)
299
        { //delete the deleted documents;
300
          Vector w = new Vector((Vector)d.elementAt(k));
301
          String docid = (String)w.elementAt(0);
302
          if(!alreadyDeleted(docid, conn))
303
          {
304
            DocumentImpl.delete(conn, docid, null, null);
305
            System.out.println("Document " + docid + " deleted.");
306
            MetacatReplication.replLog("doc " + docid + " deleted");
307
          }
308
        }
309
        
310
        keys = serverList.keys();
311
        while(keys.hasMoreElements())
312
        {
313
          server = (String)(keys.nextElement()); 
314
          URL dateurl = new URL("http://" + server + "?action=gettime");
315
          String datexml = MetacatReplication.getURLContent(dateurl);
316
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
317
          StringBuffer sql = new StringBuffer();
318
          sql.append("update xml_replication set last_checked = to_date('");
319
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
320
          sql.append("server like '").append(server).append("'");
321
          //System.out.println("sql: " + sql.toString());
322
          pstmt.close();
323
          pstmt = conn.prepareStatement(sql.toString());
324
          pstmt.executeUpdate();
325
          //conn.commit();
326
          System.out.println("last_checked updated to " + datestr + " on " +
327
                            server);
328
        }
329
      }
330
    }
331
    catch(Exception e)
332
    {
333
      System.out.println("error in ReplicationHandler.update: " + 
334
                          e.getMessage());
335
      e.printStackTrace(System.out);
336
    }
337
    finally
338
    {
339
      try
340
      {
341
        pstmt.close();
342
      }
343
      catch(Exception ee) {}
344
    }
345
  }
346
  
347
  /**
348
   * updates xml_catalog with entries from other servers.
349
   */
350
  private void updateCatalog(Hashtable serverList)
351
  {
352
    Connection conn = null;
353
    try
354
    {
355
      try
356
      { //this connection is prone to error for some reason so we 
357
        //try to connect it three times before quiting.
358
        conn = util.openDBConnection();
359
      }
360
      catch(SQLException sqle)
361
      {
362
        try
363
        {
364
          conn = util.openDBConnection();
365
        }
366
        catch(SQLException sqlee)
367
        {
368
          try
369
          {
370
            conn = util.openDBConnection();
371
          }
372
          catch(SQLException sqleee)
373
          {
374
            System.out.println("error getting db connection in " + 
375
                               "ReplicationHandler.update: " +
376
                               sqleee.getMessage());
377
          }
378
        }
379
      }
380
      String server;
381
      Enumeration keys = serverList.keys();
382
      while(keys.hasMoreElements())
383
      { //go through each server
384
        server = (String)(keys.nextElement());
385
        URL u = new URL("http://" + server + "?action=getcatalog");
386
        System.out.println("sending message " + u.toString());
387
        String catxml = MetacatReplication.getURLContent(u);
388
        //System.out.println("catxml: " + catxml);
389
        CatalogMessageHandler cmh = new CatalogMessageHandler();
390
        XMLReader catparser = initParser(cmh);
391
        catparser.parse(new InputSource(new StringReader(catxml)));
392
        //parse the returned catalog xml and put it into a vector
393
        Vector remoteCatalog = cmh.getCatalogVect();
394
        
395
        String localcatxml = MetacatReplication.getCatalogXML();
396
        //System.out.println("localcatxml: " + localcatxml);
397
        cmh = new CatalogMessageHandler();
398
        catparser = initParser(cmh);
399
        catparser.parse(new InputSource(new StringReader(localcatxml)));
400
        Vector localCatalog = cmh.getCatalogVect();
401
        
402
        //now we have the catalog from the remote server and this local server
403
        //we now need to compare the two and merge the differences.
404
        //the comparison is base on the public_id fields which is the 4th
405
        //entry in each row vector.
406
        Vector publicId = new Vector();
407
        for(int i=0; i<localCatalog.size(); i++)
408
        {
409
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
410
          //System.out.println("v1: " + v.toString());
411
          publicId.add(new String((String)v.elementAt(3)));
412
          //System.out.println("adding " + (String)v.elementAt(3));
413
        }
414
        
415
        for(int i=0; i<remoteCatalog.size(); i++)
416
        {
417
          Vector v = (Vector)remoteCatalog.elementAt(i);
418
          //System.out.println("v2: " + v.toString());
419
          //System.out.println("i: " + i);
420
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
421
          //System.out.println("publicID: " + publicId.toString());
422
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
423
          if(!publicId.contains(v.elementAt(3)))
424
          { //so we don't have this public id in our local table so we need to
425
            //add it.
426
            //System.out.println("in if");
427
            StringBuffer sql = new StringBuffer();
428
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
429
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
430
            sql.append("?,?)");
431
            //System.out.println("sql: " + sql.toString());
432
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
433
            pstmt.setString(1, (String)v.elementAt(0));
434
            pstmt.setString(2, (String)v.elementAt(1));
435
            pstmt.setString(3, (String)v.elementAt(2));
436
            pstmt.setString(4, (String)v.elementAt(3));
437
            pstmt.setString(5, (String)v.elementAt(4));
438
            pstmt.execute();
439
            pstmt.close();
440
          }
441
        }
442
      }
443
    }
444
    catch(Exception e)
445
    {
446
      System.out.println("error in ReplicationHandler.updateCatalog: " + 
447
                          e.getMessage());
448
      e.printStackTrace(System.out);
449
    }
450
  }
451
  
452
  /**
453
   * Method that returns true if docid has already been "deleted" from metacat.
454
   * This method really implements a truth table for deleted documents
455
   * The table is (a docid in one of the tables is represented by the X):
456
   * xml_docs      xml_revs      deleted?
457
   * ------------------------------------
458
   *   X             X             FALSE
459
   *   X             _             FALSE
460
   *   _             X             TRUE
461
   *   _             _             TRUE
462
   */
463
  private static boolean alreadyDeleted(String docid, Connection conn)
464
  {
465
    try
466
    {
467
      boolean xml_docs = false;
468
      boolean xml_revs = false;
469
      
470
      StringBuffer sb = new StringBuffer();
471
      sb.append("select docid from xml_revisions where docid like '");
472
      sb.append(docid).append("'");
473
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
474
      pstmt.execute();
475
      ResultSet rs = pstmt.getResultSet();
476
      boolean tablehasrows = rs.next();
477
      if(tablehasrows)
478
      {
479
        xml_revs = true;
480
      }
481
      
482
      sb = new StringBuffer();
483
      sb.append("select docid from xml_documents where docid like '");
484
      sb.append(docid).append("'");
485
      pstmt.close();
486
      pstmt = conn.prepareStatement(sb.toString());
487
      pstmt.execute();
488
      rs = pstmt.getResultSet();
489
      tablehasrows = rs.next();
490
      pstmt.close();
491
      if(tablehasrows)
492
      {
493
        xml_docs = true;
494
      }
495
      
496
      if(xml_docs && xml_revs)
497
      {
498
        return false;
499
      }
500
      else if(xml_docs && !xml_revs)
501
      {
502
        return false;
503
      }
504
      else if(!xml_docs && xml_revs)
505
      {
506
        return true;
507
      }
508
      else if(!xml_docs && !xml_revs)
509
      {
510
        return true;
511
      }
512
    }
513
    catch(Exception e)
514
    {
515
      System.out.println("error in ReplicationHandler.alreadyDeleted: " + 
516
                          e.getMessage());
517
      e.printStackTrace(System.out);
518
    }
519
    finally
520
    {
521
      
522
    }
523
    return false;
524
  }
525
  
526
  /**
527
   * Checks to see if a document is already in the DB.  Returns
528
   * "UPDATE" if it is, "INSERT" if it isn't
529
   */
530
  private static String getAction(String docid)
531
  {
532
    try
533
    {
534
      MetaCatUtil util = new MetaCatUtil();
535
      StringBuffer sql = new StringBuffer();
536
      sql.append("select docid from xml_documents where docid like '");
537
      sql.append(docid).append("'");
538
      Connection conn = util.openDBConnection();
539
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
540
      pstmt.execute();
541
      ResultSet rs = pstmt.getResultSet();
542

    
543
      if(rs.next())
544
      {
545
        pstmt.close();
546
        conn.close();
547
        return "UPDATE";
548
      }
549
      else
550
      {
551
        pstmt.close();
552
        conn.close();
553
        return "INSERT";
554
      }
555
    }
556
    catch(Exception e)
557
    {
558
      System.out.println("error in replicationHandler.getAction: " + 
559
                          e.getMessage());
560
    }
561
    return "";
562
  }
563
  
564
  /**
565
   * Method to query xml_replication and build a hashtable of each server
566
   * and it's last update time.
567
   * @param conn a connection to the database
568
   */
569
  public static Hashtable buildServerList(Connection conn)
570
  {
571
    Hashtable sl = new Hashtable();
572
    PreparedStatement pstmt;   
573
    try
574
    {
575
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
576
                                    "from xml_replication");
577
      pstmt.execute();
578
      ResultSet rs = pstmt.getResultSet();
579
      boolean tableHasRows = rs.next();
580
      while(tableHasRows)
581
      {
582
        if(rs.getInt(3) == 1)
583
        {//only put the server in the list if the replicate flag is true
584
          String server = rs.getString(1);
585
          String last_checked = rs.getString(2);
586
          if(!server.equals("localhost"))
587
          {
588
            sl.put(server, last_checked);
589
          }
590
        }
591
        tableHasRows = rs.next();   
592
      }
593
      pstmt.close();
594
    }
595
    catch(Exception e)
596
    {
597
      System.out.println("error in replicationHandler.buildServerList(): " +
598
                         e.getMessage());
599
    }
600
    return sl;
601
  }
602
  
603
  /**
604
   * Method to initialize the message parser
605
   */
606
  public static XMLReader initParser(DefaultHandler dh)
607
          throws Exception
608
  {
609
    XMLReader parser = null;
610

    
611
    try {
612
      ContentHandler chandler = dh;
613

    
614
      // Get an instance of the parser
615
      MetaCatUtil util = new MetaCatUtil();
616
      String parserName = util.getOption("saxparser");
617
      parser = XMLReaderFactory.createXMLReader(parserName);
618

    
619
      // Turn off validation
620
      parser.setFeature("http://xml.org/sax/features/validation", false);
621
      
622
      parser.setContentHandler((ContentHandler)chandler);
623
      parser.setErrorHandler((ErrorHandler)chandler);
624

    
625
    } catch (Exception e) {
626
      throw e;
627
    }
628

    
629
    return parser;
630
  }
631
}
(42-42/43)