Project

General

Profile

1 522 berkley
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12 669 jones
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 522 berkley
 */
27
28
package edu.ucsb.nceas.metacat;
29
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread;
33
import java.io.*;
34
import java.net.*;
35 543 berkley
import java.text.*;
36 522 berkley
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46 561 berkley
import org.xml.sax.helpers.DefaultHandler;
47 522 berkley
48 561 berkley
49
50 522 berkley
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57 573 berkley
  int serverCheckCode = 1;
58 522 berkley
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable();
60
  Connection conn;
61
  PrintWriter out;
62
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
68 573 berkley
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
74 522 berkley
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85 683 berkley
      try
86
      { //this connection is prone to error for some reason so we
87
        //try to connect it three times before quiting.
88
        conn = util.openDBConnection();
89
      }
90
      catch(SQLException sqle)
91
      {
92
        try
93
        {
94
          conn = util.openDBConnection();
95
        }
96
        catch(SQLException sqlee)
97
        {
98
          try
99
          {
100
            conn = util.openDBConnection();
101
          }
102
          catch(SQLException sqleee)
103
          {
104
            System.out.println("error getting db connection in " +
105
                               "ReplicationHandler.run: " +
106
                               sqleee.getMessage());
107
          }
108
        }
109
      }
110 522 berkley
      serverList = buildServerList(conn);
111 697 bojilova
      updateCatalog(serverList);
112 683 berkley
      update(serverList);
113 533 berkley
      conn.close();
114 522 berkley
    }
115
    catch (Exception e)
116
    {
117 1011 tao
      //System.out.println("Error in replicationHandler.run():"+e.getMessage());
118 522 berkley
    }
119
  }
120
121
  /**
122 577 berkley
   * Method that uses revision taging for replication instead of update_date.
123
   */
124 683 berkley
  private void update(Hashtable serverList)
125 577 berkley
  {
126
    /*
127
     Pseudo-algorithm
128
     - request a doc list from each server in xml_replication
129
     - check the rev number of each of those documents agains the
130
       documents in the local database
131
     - pull any documents that have a lesser rev number on the local server
132
       from the remote server
133
     - delete any documents that still exist in the local xml_documents but
134
       are in the deletedDocuments tag of the remote host response.
135
     - update last_checked to keep track of the last time it was checked.
136
       (this info is theoretically not needed using this system but probably
137
       should be kept anyway)
138
    */
139 683 berkley
140
    Connection conn = null;
141
    try
142
    {
143
      try
144
      { //this connection is prone to error for some reason so we
145
        //try to connect it three times before quiting.
146
        conn = util.openDBConnection();
147
      }
148
      catch(SQLException sqle)
149
      {
150
        try
151
        {
152
          conn = util.openDBConnection();
153
        }
154
        catch(SQLException sqlee)
155
        {
156
          try
157
          {
158
            conn = util.openDBConnection();
159
          }
160
          catch(SQLException sqleee)
161
          {
162 1011 tao
            /*System.out.println("error getting db connection in " +
163 683 berkley
                               "ReplicationHandler.update: " +
164 1011 tao
                               sqleee.getMessage());*/
165 683 berkley
          }
166
        }
167
      }
168
    }
169
    catch(Exception e)
170
    {
171 1011 tao
      /*System.out.println("error in ReplicationHandler.update: " +
172
                          e.getMessage());*/
173 683 berkley
    }
174
175 577 berkley
    Enumeration keys;
176
    String server;
177
    String update;
178
    ReplMessageHandler message = new ReplMessageHandler();
179
    Vector responses = new Vector();
180 667 berkley
    PreparedStatement pstmt = null;
181 577 berkley
    ResultSet rs;
182
    boolean tablehasrows;
183
    boolean flag=false;
184
    String action = new String();
185
    XMLReader parser;
186
    URL u;
187
188
    try
189
    {
190 590 berkley
      MetaCatUtil.debugMessage("init parser");
191 577 berkley
      parser = initParser(message);
192
      keys = serverList.keys();
193
      while(keys.hasMoreElements())
194
      {
195
        server = (String)(keys.nextElement());
196 584 berkley
        MetacatReplication.replLog("full update started to: " + server);
197 1011 tao
        u = new URL("https://" + server + "?server="
198 1015 tao
        +util.getLocalReplicationServerName()+"&action=update");
199 1011 tao
        //System.out.println("Sending Message: " + u.toString());
200 577 berkley
        String result = MetacatReplication.getURLContent(u);
201
        responses.add(result);
202
      }
203
204 675 berkley
      //String srvr = util.getOption("servletpath");
205 577 berkley
206 675 berkley
      //System.out.println("responses (from srvr): " + responses.toString());
207
208 577 berkley
      for(int i=0; i<responses.size(); i++)
209
      { //check each server for updated files
210
        //System.out.println("parsing responses");
211
        parser.parse(new InputSource(
212
                     new StringReader(
213
                     (String)(responses.elementAt(i)))));
214
        Vector v = new Vector(message.getUpdatesVect());
215
        //System.out.println("v: " + v.toString());
216
        Vector d = new Vector(message.getDeletesVect());
217
        //check the revs in u to see if there are any newer ones than
218
        //in the local DB.
219
        for(int j=0; j<v.size(); j++)
220
        {
221
          Vector w = new Vector((Vector)(v.elementAt(j)));
222
          //System.out.println("w: " + w.toString());
223
          String docid = (String)w.elementAt(0);
224
          //System.out.println("docid: " + docid);
225
          int rev = Integer.parseInt((String)w.elementAt(1));
226
          //System.out.println("rev: " + rev);
227
          String docServer = (String)w.elementAt(2);
228
          //System.out.println("docServer: " + docServer);
229
230
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
231
                                        "docid like '" + docid + "'");
232
          pstmt.execute();
233
          rs = pstmt.getResultSet();
234
          tablehasrows = rs.next();
235
          if(tablehasrows)
236
          { //check the revs for an update because this document is in the
237
            //local DB, it just might be out of date.
238
            int localrev = rs.getInt(1);
239
            if(localrev == rev)
240
            {
241
              flag = false;
242
            }
243
            else if(localrev < rev)
244
            {//this document needs to be updated so send an read request
245
              action = "UPDATE";
246
              flag = true;
247
            }
248
          }
249
          else
250
          { //insert this document as new because it is not in the local DB
251
            action = "INSERT";
252
            flag = true;
253
          }
254
255
          if(flag)
256
          { //if the document needs to be updated or inserted, this is executed
257 1011 tao
            u = new URL("https://" + docServer + "?server="+
258 1015 tao
              util.getLocalReplicationServerName()+"&action=read&docid="+docid);
259 1011 tao
            //System.out.println("Sending message: " + u.toString());
260 577 berkley
            String newxmldoc = MetacatReplication.getURLContent(u);
261
            DocInfoHandler dih = new DocInfoHandler();
262
            XMLReader docinfoParser = initParser(dih);
263 837 bojilova
            URL docinfoUrl = new URL("https://" + docServer +
264 1015 tao
                  "?server="+util.getLocalReplicationServerName()+
265 1011 tao
                  "&action=getdocumentinfo&docid="+docid);
266
            //System.out.println("Sending message: " + docinfoUrl.toString());
267 577 berkley
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
268
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
269
            Hashtable docinfoHash = dih.getDocInfo();
270
            int serverCode = MetacatReplication.getServerCode(docServer);
271 1011 tao
            //System.out.println("updating doc: " + docid +" action: "+ action);
272
            //docid should include rev number too
273
            String accnum=docid+util.getOption("accNumSeparator")+
274
                                              (String)docinfoHash.get("rev");
275
            //System.out.println("accnum: "+accnum);
276 625 berkley
            try
277
            {
278
              String newDocid = DocumentImpl.write(conn,
279 577 berkley
                              new StringReader(newxmldoc),
280 697 bojilova
                              (String)docinfoHash.get("public_access"),
281
                              null,  /* the dtd text */
282 577 berkley
                              action,
283 1011 tao
                              accnum,
284 577 berkley
                              (String)docinfoHash.get("user_owner"),
285 804 bojilova
                              null, /* null for groups[] */
286 577 berkley
                              serverCode,
287 697 bojilova
                              true, /* override */
288
                              false /* validate */);
289 625 berkley
              MetacatReplication.replLog("wrote doc " + docid + " from " +
290
                                         docServer);
291 1011 tao
              /*System.out.println("wrote doc " + docid + " from " +
292
                                 docServer);*/
293 625 berkley
            }
294
            catch(Exception e)
295
            {
296 675 berkley
              System.out.println("error writing document in " +
297
                                 "ReplicationHandler.update: " + docid +
298
                                 ": " + e.getMessage());
299 625 berkley
            }
300 577 berkley
          }
301
        }
302
303
        for(int k=0; k<d.size(); k++)
304
        { //delete the deleted documents;
305
          Vector w = new Vector((Vector)d.elementAt(k));
306
          String docid = (String)w.elementAt(0);
307 579 berkley
          if(!alreadyDeleted(docid, conn))
308
          {
309
            DocumentImpl.delete(conn, docid, null, null);
310 595 berkley
            System.out.println("Document " + docid + " deleted.");
311 584 berkley
            MetacatReplication.replLog("doc " + docid + " deleted");
312 579 berkley
          }
313 577 berkley
        }
314
315
        keys = serverList.keys();
316
        while(keys.hasMoreElements())
317
        {
318
          server = (String)(keys.nextElement());
319 1011 tao
          URL dateurl = new URL("https://" + server + "?server="+
320 1015 tao
          util.getLocalReplicationServerName()+"&action=gettime");
321 577 berkley
          String datexml = MetacatReplication.getURLContent(dateurl);
322
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
323
          StringBuffer sql = new StringBuffer();
324
          sql.append("update xml_replication set last_checked = to_date('");
325
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
326
          sql.append("server like '").append(server).append("'");
327
          //System.out.println("sql: " + sql.toString());
328 667 berkley
          pstmt.close();
329 577 berkley
          pstmt = conn.prepareStatement(sql.toString());
330
          pstmt.executeUpdate();
331
          //conn.commit();
332 595 berkley
          System.out.println("last_checked updated to " + datestr + " on " +
333 590 berkley
                            server);
334 577 berkley
        }
335
      }
336
    }
337
    catch(Exception e)
338
    {
339 1011 tao
      /*System.out.println("error in ReplicationHandler.update: " +
340
                          e.getMessage());*/
341 577 berkley
      e.printStackTrace(System.out);
342
    }
343 667 berkley
    finally
344
    {
345
      try
346
      {
347
        pstmt.close();
348
      }
349
      catch(Exception ee) {}
350
    }
351 577 berkley
  }
352
353
  /**
354 590 berkley
   * updates xml_catalog with entries from other servers.
355
   */
356 683 berkley
  private void updateCatalog(Hashtable serverList)
357 590 berkley
  {
358 683 berkley
    Connection conn = null;
359 590 berkley
    try
360
    {
361 683 berkley
      try
362
      { //this connection is prone to error for some reason so we
363
        //try to connect it three times before quiting.
364
        conn = util.openDBConnection();
365
      }
366
      catch(SQLException sqle)
367
      {
368
        try
369
        {
370
          conn = util.openDBConnection();
371
        }
372
        catch(SQLException sqlee)
373
        {
374
          try
375
          {
376
            conn = util.openDBConnection();
377
          }
378
          catch(SQLException sqleee)
379
          {
380
            System.out.println("error getting db connection in " +
381
                               "ReplicationHandler.update: " +
382
                               sqleee.getMessage());
383
          }
384
        }
385
      }
386 590 berkley
      String server;
387
      Enumeration keys = serverList.keys();
388
      while(keys.hasMoreElements())
389
      { //go through each server
390
        server = (String)(keys.nextElement());
391 1011 tao
        URL u = new URL("https://" + server + "?server="+
392 1015 tao
        util.getLocalReplicationServerName()+"&action=getcatalog");
393
        //System.out.println("sending message " + u.toString());
394 590 berkley
        String catxml = MetacatReplication.getURLContent(u);
395 595 berkley
        //System.out.println("catxml: " + catxml);
396 590 berkley
        CatalogMessageHandler cmh = new CatalogMessageHandler();
397
        XMLReader catparser = initParser(cmh);
398
        catparser.parse(new InputSource(new StringReader(catxml)));
399
        //parse the returned catalog xml and put it into a vector
400
        Vector remoteCatalog = cmh.getCatalogVect();
401
402
        String localcatxml = MetacatReplication.getCatalogXML();
403 595 berkley
        //System.out.println("localcatxml: " + localcatxml);
404 590 berkley
        cmh = new CatalogMessageHandler();
405
        catparser = initParser(cmh);
406
        catparser.parse(new InputSource(new StringReader(localcatxml)));
407
        Vector localCatalog = cmh.getCatalogVect();
408
409
        //now we have the catalog from the remote server and this local server
410
        //we now need to compare the two and merge the differences.
411
        //the comparison is base on the public_id fields which is the 4th
412
        //entry in each row vector.
413
        Vector publicId = new Vector();
414
        for(int i=0; i<localCatalog.size(); i++)
415
        {
416
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
417 595 berkley
          //System.out.println("v1: " + v.toString());
418 590 berkley
          publicId.add(new String((String)v.elementAt(3)));
419 595 berkley
          //System.out.println("adding " + (String)v.elementAt(3));
420 590 berkley
        }
421
422
        for(int i=0; i<remoteCatalog.size(); i++)
423
        {
424
          Vector v = (Vector)remoteCatalog.elementAt(i);
425 595 berkley
          //System.out.println("v2: " + v.toString());
426 590 berkley
          //System.out.println("i: " + i);
427
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
428 595 berkley
          //System.out.println("publicID: " + publicId.toString());
429
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
430 590 berkley
          if(!publicId.contains(v.elementAt(3)))
431
          { //so we don't have this public id in our local table so we need to
432
            //add it.
433 595 berkley
            //System.out.println("in if");
434 590 berkley
            StringBuffer sql = new StringBuffer();
435
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
436
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
437
            sql.append("?,?)");
438 595 berkley
            //System.out.println("sql: " + sql.toString());
439 590 berkley
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
440
            pstmt.setString(1, (String)v.elementAt(0));
441
            pstmt.setString(2, (String)v.elementAt(1));
442
            pstmt.setString(3, (String)v.elementAt(2));
443
            pstmt.setString(4, (String)v.elementAt(3));
444
            pstmt.setString(5, (String)v.elementAt(4));
445
            pstmt.execute();
446 667 berkley
            pstmt.close();
447 590 berkley
          }
448
        }
449
      }
450
    }
451
    catch(Exception e)
452
    {
453 675 berkley
      System.out.println("error in ReplicationHandler.updateCatalog: " +
454
                          e.getMessage());
455 590 berkley
      e.printStackTrace(System.out);
456
    }
457
  }
458
459
  /**
460 579 berkley
   * Method that returns true if docid has already been "deleted" from metacat.
461 582 berkley
   * This method really implements a truth table for deleted documents
462 590 berkley
   * The table is (a docid in one of the tables is represented by the X):
463 582 berkley
   * xml_docs      xml_revs      deleted?
464
   * ------------------------------------
465
   *   X             X             FALSE
466
   *   X             _             FALSE
467
   *   _             X             TRUE
468
   *   _             _             TRUE
469 579 berkley
   */
470
  private static boolean alreadyDeleted(String docid, Connection conn)
471
  {
472
    try
473
    {
474 582 berkley
      boolean xml_docs = false;
475
      boolean xml_revs = false;
476
477 579 berkley
      StringBuffer sb = new StringBuffer();
478 582 berkley
      sb.append("select docid from xml_revisions where docid like '");
479 579 berkley
      sb.append(docid).append("'");
480
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
481
      pstmt.execute();
482
      ResultSet rs = pstmt.getResultSet();
483
      boolean tablehasrows = rs.next();
484
      if(tablehasrows)
485
      {
486 582 berkley
        xml_revs = true;
487
      }
488
489
      sb = new StringBuffer();
490
      sb.append("select docid from xml_documents where docid like '");
491
      sb.append(docid).append("'");
492 667 berkley
      pstmt.close();
493 582 berkley
      pstmt = conn.prepareStatement(sb.toString());
494
      pstmt.execute();
495
      rs = pstmt.getResultSet();
496
      tablehasrows = rs.next();
497 667 berkley
      pstmt.close();
498 582 berkley
      if(tablehasrows)
499
      {
500
        xml_docs = true;
501
      }
502
503
      if(xml_docs && xml_revs)
504
      {
505
        return false;
506
      }
507
      else if(xml_docs && !xml_revs)
508
      {
509
        return false;
510
      }
511
      else if(!xml_docs && xml_revs)
512
      {
513 579 berkley
        return true;
514
      }
515 582 berkley
      else if(!xml_docs && !xml_revs)
516
      {
517
        return true;
518
      }
519 579 berkley
    }
520
    catch(Exception e)
521
    {
522 675 berkley
      System.out.println("error in ReplicationHandler.alreadyDeleted: " +
523
                          e.getMessage());
524 579 berkley
      e.printStackTrace(System.out);
525
    }
526 667 berkley
    finally
527
    {
528
529
    }
530 579 berkley
    return false;
531
  }
532
533
  /**
534 533 berkley
   * Checks to see if a document is already in the DB.  Returns
535
   * "UPDATE" if it is, "INSERT" if it isn't
536
   */
537
  private static String getAction(String docid)
538
  {
539
    try
540
    {
541
      MetaCatUtil util = new MetaCatUtil();
542
      StringBuffer sql = new StringBuffer();
543
      sql.append("select docid from xml_documents where docid like '");
544
      sql.append(docid).append("'");
545
      Connection conn = util.openDBConnection();
546
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
547
      pstmt.execute();
548
      ResultSet rs = pstmt.getResultSet();
549
550
      if(rs.next())
551
      {
552 667 berkley
        pstmt.close();
553 533 berkley
        conn.close();
554
        return "UPDATE";
555
      }
556
      else
557
      {
558 667 berkley
        pstmt.close();
559 533 berkley
        conn.close();
560
        return "INSERT";
561
      }
562
    }
563
    catch(Exception e)
564
    {
565
      System.out.println("error in replicationHandler.getAction: " +
566
                          e.getMessage());
567
    }
568
    return "";
569
  }
570
571
  /**
572 522 berkley
   * Method to query xml_replication and build a hashtable of each server
573
   * and it's last update time.
574
   * @param conn a connection to the database
575
   */
576 577 berkley
  public static Hashtable buildServerList(Connection conn)
577 522 berkley
  {
578
    Hashtable sl = new Hashtable();
579
    PreparedStatement pstmt;
580
    try
581
    {
582 629 berkley
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
583
                                    "from xml_replication");
584 522 berkley
      pstmt.execute();
585
      ResultSet rs = pstmt.getResultSet();
586
      boolean tableHasRows = rs.next();
587
      while(tableHasRows)
588
      {
589 629 berkley
        if(rs.getInt(3) == 1)
590
        {//only put the server in the list if the replicate flag is true
591
          String server = rs.getString(1);
592
          String last_checked = rs.getString(2);
593
          if(!server.equals("localhost"))
594
          {
595
            sl.put(server, last_checked);
596
          }
597 549 berkley
        }
598 629 berkley
        tableHasRows = rs.next();
599 522 berkley
      }
600 667 berkley
      pstmt.close();
601 522 berkley
    }
602
    catch(Exception e)
603
    {
604
      System.out.println("error in replicationHandler.buildServerList(): " +
605
                         e.getMessage());
606
    }
607
    return sl;
608
  }
609 574 berkley
610
  /**
611
   * Method to initialize the message parser
612
   */
613
  public static XMLReader initParser(DefaultHandler dh)
614
          throws Exception
615
  {
616
    XMLReader parser = null;
617
618
    try {
619
      ContentHandler chandler = dh;
620
621
      // Get an instance of the parser
622
      MetaCatUtil util = new MetaCatUtil();
623
      String parserName = util.getOption("saxparser");
624
      parser = XMLReaderFactory.createXMLReader(parserName);
625
626
      // Turn off validation
627
      parser.setFeature("http://xml.org/sax/features/validation", false);
628
629
      parser.setContentHandler((ContentHandler)chandler);
630
      parser.setErrorHandler((ErrorHandler)chandler);
631
632
    } catch (Exception e) {
633
      throw e;
634
    }
635
636
    return parser;
637
  }
638 1011 tao
639 1015 tao
640 522 berkley
}