Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: tao $'
10
 *     '$Date: 2002-04-16 18:21:57 -0700 (Tue, 16 Apr 2002) $'
11
 * '$Revision: 1015 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      try
86
      { //this connection is prone to error for some reason so we 
87
        //try to connect it three times before quiting.
88
        conn = util.openDBConnection();
89
      }
90
      catch(SQLException sqle)
91
      {
92
        try
93
        {
94
          conn = util.openDBConnection();
95
        }
96
        catch(SQLException sqlee)
97
        {
98
          try
99
          {
100
            conn = util.openDBConnection();
101
          }
102
          catch(SQLException sqleee)
103
          {
104
            System.out.println("error getting db connection in " + 
105
                               "ReplicationHandler.run: " +
106
                               sqleee.getMessage());
107
          }
108
        }
109
      }
110
      serverList = buildServerList(conn);
111
      updateCatalog(serverList);
112
      update(serverList);
113
      conn.close();
114
    }
115
    catch (Exception e)
116
    {
117
      //System.out.println("Error in replicationHandler.run():"+e.getMessage());
118
    }
119
  }
120
  
121
  /**
122
   * Method that uses revision taging for replication instead of update_date.
123
   */
124
  private void update(Hashtable serverList)
125
  {
126
    /*
127
     Pseudo-algorithm
128
     - request a doc list from each server in xml_replication
129
     - check the rev number of each of those documents agains the 
130
       documents in the local database
131
     - pull any documents that have a lesser rev number on the local server
132
       from the remote server
133
     - delete any documents that still exist in the local xml_documents but
134
       are in the deletedDocuments tag of the remote host response.
135
     - update last_checked to keep track of the last time it was checked.
136
       (this info is theoretically not needed using this system but probably 
137
       should be kept anyway)
138
    */
139
    
140
    Connection conn = null;
141
    try
142
    {
143
      try
144
      { //this connection is prone to error for some reason so we 
145
        //try to connect it three times before quiting.
146
        conn = util.openDBConnection();
147
      }
148
      catch(SQLException sqle)
149
      {
150
        try
151
        {
152
          conn = util.openDBConnection();
153
        }
154
        catch(SQLException sqlee)
155
        {
156
          try
157
          {
158
            conn = util.openDBConnection();
159
          }
160
          catch(SQLException sqleee)
161
          {
162
            /*System.out.println("error getting db connection in " + 
163
                               "ReplicationHandler.update: " +
164
                               sqleee.getMessage());*/
165
          }
166
        }
167
      }
168
    }
169
    catch(Exception e)
170
    {
171
      /*System.out.println("error in ReplicationHandler.update: " + 
172
                          e.getMessage());*/
173
    }
174
    
175
    Enumeration keys;
176
    String server;
177
    String update;
178
    ReplMessageHandler message = new ReplMessageHandler();
179
    Vector responses = new Vector();
180
    PreparedStatement pstmt = null;
181
    ResultSet rs;
182
    boolean tablehasrows;
183
    boolean flag=false;
184
    String action = new String();
185
    XMLReader parser;
186
    URL u;
187
    
188
    try
189
    {
190
      MetaCatUtil.debugMessage("init parser");
191
      parser = initParser(message);
192
      keys = serverList.keys();
193
      while(keys.hasMoreElements())
194
      {
195
        server = (String)(keys.nextElement());
196
        MetacatReplication.replLog("full update started to: " + server);
197
        u = new URL("https://" + server + "?server="
198
        +util.getLocalReplicationServerName()+"&action=update");
199
        //System.out.println("Sending Message: " + u.toString());
200
        String result = MetacatReplication.getURLContent(u);
201
        responses.add(result);
202
      }
203
      
204
      //String srvr = util.getOption("servletpath");
205
      
206
      //System.out.println("responses (from srvr): " + responses.toString());
207
      
208
      for(int i=0; i<responses.size(); i++)
209
      { //check each server for updated files
210
        //System.out.println("parsing responses");
211
        parser.parse(new InputSource(
212
                     new StringReader(
213
                     (String)(responses.elementAt(i)))));
214
        Vector v = new Vector(message.getUpdatesVect());
215
        //System.out.println("v: " + v.toString());
216
        Vector d = new Vector(message.getDeletesVect());
217
        //check the revs in u to see if there are any newer ones than
218
        //in the local DB.
219
        for(int j=0; j<v.size(); j++)
220
        {
221
          Vector w = new Vector((Vector)(v.elementAt(j)));
222
          //System.out.println("w: " + w.toString());
223
          String docid = (String)w.elementAt(0);
224
          //System.out.println("docid: " + docid);
225
          int rev = Integer.parseInt((String)w.elementAt(1));
226
          //System.out.println("rev: " + rev);
227
          String docServer = (String)w.elementAt(2);
228
          //System.out.println("docServer: " + docServer);
229
    
230
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
231
                                        "docid like '" + docid + "'");
232
          pstmt.execute();
233
          rs = pstmt.getResultSet();
234
          tablehasrows = rs.next();
235
          if(tablehasrows)
236
          { //check the revs for an update because this document is in the
237
            //local DB, it just might be out of date.
238
            int localrev = rs.getInt(1);
239
            if(localrev == rev)
240
            {
241
              flag = false;
242
            }
243
            else if(localrev < rev)
244
            {//this document needs to be updated so send an read request
245
              action = "UPDATE";
246
              flag = true;
247
            }
248
          }
249
          else
250
          { //insert this document as new because it is not in the local DB
251
            action = "INSERT";
252
            flag = true;
253
          }
254
          
255
          if(flag)
256
          { //if the document needs to be updated or inserted, this is executed
257
            u = new URL("https://" + docServer + "?server="+
258
              util.getLocalReplicationServerName()+"&action=read&docid="+docid);
259
            //System.out.println("Sending message: " + u.toString());
260
            String newxmldoc = MetacatReplication.getURLContent(u);
261
            DocInfoHandler dih = new DocInfoHandler();
262
            XMLReader docinfoParser = initParser(dih);
263
            URL docinfoUrl = new URL("https://" + docServer + 
264
                  "?server="+util.getLocalReplicationServerName()+
265
                  "&action=getdocumentinfo&docid="+docid);
266
            //System.out.println("Sending message: " + docinfoUrl.toString());
267
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
268
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
269
            Hashtable docinfoHash = dih.getDocInfo();
270
            int serverCode = MetacatReplication.getServerCode(docServer);
271
            //System.out.println("updating doc: " + docid +" action: "+ action);
272
            //docid should include rev number too
273
            String accnum=docid+util.getOption("accNumSeparator")+
274
                                              (String)docinfoHash.get("rev");
275
            //System.out.println("accnum: "+accnum);
276
            try
277
            {
278
              String newDocid = DocumentImpl.write(conn, 
279
                              new StringReader(newxmldoc),
280
                              (String)docinfoHash.get("public_access"),
281
                              null,  /* the dtd text */
282
                              action, 
283
                              accnum, 
284
                              (String)docinfoHash.get("user_owner"),
285
                              null, /* null for groups[] */
286
                              serverCode, 
287
                              true, /* override */
288
                              false /* validate */);
289
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
290
                                         docServer);
291
              /*System.out.println("wrote doc " + docid + " from " + 
292
                                 docServer);*/
293
            }
294
            catch(Exception e)
295
            {
296
              System.out.println("error writing document in " + 
297
                                 "ReplicationHandler.update: " + docid +
298
                                 ": " + e.getMessage());
299
            }
300
          }
301
        }
302
        
303
        for(int k=0; k<d.size(); k++)
304
        { //delete the deleted documents;
305
          Vector w = new Vector((Vector)d.elementAt(k));
306
          String docid = (String)w.elementAt(0);
307
          if(!alreadyDeleted(docid, conn))
308
          {
309
            DocumentImpl.delete(conn, docid, null, null);
310
            System.out.println("Document " + docid + " deleted.");
311
            MetacatReplication.replLog("doc " + docid + " deleted");
312
          }
313
        }
314
        
315
        keys = serverList.keys();
316
        while(keys.hasMoreElements())
317
        {
318
          server = (String)(keys.nextElement()); 
319
          URL dateurl = new URL("https://" + server + "?server="+
320
          util.getLocalReplicationServerName()+"&action=gettime");
321
          String datexml = MetacatReplication.getURLContent(dateurl);
322
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
323
          StringBuffer sql = new StringBuffer();
324
          sql.append("update xml_replication set last_checked = to_date('");
325
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
326
          sql.append("server like '").append(server).append("'");
327
          //System.out.println("sql: " + sql.toString());
328
          pstmt.close();
329
          pstmt = conn.prepareStatement(sql.toString());
330
          pstmt.executeUpdate();
331
          //conn.commit();
332
          System.out.println("last_checked updated to " + datestr + " on " +
333
                            server);
334
        }
335
      }
336
    }
337
    catch(Exception e)
338
    {
339
      /*System.out.println("error in ReplicationHandler.update: " + 
340
                          e.getMessage());*/
341
      e.printStackTrace(System.out);
342
    }
343
    finally
344
    {
345
      try
346
      {
347
        pstmt.close();
348
      }
349
      catch(Exception ee) {}
350
    }
351
  }
352
  
353
  /**
354
   * updates xml_catalog with entries from other servers.
355
   */
356
  private void updateCatalog(Hashtable serverList)
357
  {
358
    Connection conn = null;
359
    try
360
    {
361
      try
362
      { //this connection is prone to error for some reason so we 
363
        //try to connect it three times before quiting.
364
        conn = util.openDBConnection();
365
      }
366
      catch(SQLException sqle)
367
      {
368
        try
369
        {
370
          conn = util.openDBConnection();
371
        }
372
        catch(SQLException sqlee)
373
        {
374
          try
375
          {
376
            conn = util.openDBConnection();
377
          }
378
          catch(SQLException sqleee)
379
          {
380
            System.out.println("error getting db connection in " + 
381
                               "ReplicationHandler.update: " +
382
                               sqleee.getMessage());
383
          }
384
        }
385
      }
386
      String server;
387
      Enumeration keys = serverList.keys();
388
      while(keys.hasMoreElements())
389
      { //go through each server
390
        server = (String)(keys.nextElement());
391
        URL u = new URL("https://" + server + "?server="+
392
        util.getLocalReplicationServerName()+"&action=getcatalog");
393
        //System.out.println("sending message " + u.toString());
394
        String catxml = MetacatReplication.getURLContent(u);
395
        //System.out.println("catxml: " + catxml);
396
        CatalogMessageHandler cmh = new CatalogMessageHandler();
397
        XMLReader catparser = initParser(cmh);
398
        catparser.parse(new InputSource(new StringReader(catxml)));
399
        //parse the returned catalog xml and put it into a vector
400
        Vector remoteCatalog = cmh.getCatalogVect();
401
        
402
        String localcatxml = MetacatReplication.getCatalogXML();
403
        //System.out.println("localcatxml: " + localcatxml);
404
        cmh = new CatalogMessageHandler();
405
        catparser = initParser(cmh);
406
        catparser.parse(new InputSource(new StringReader(localcatxml)));
407
        Vector localCatalog = cmh.getCatalogVect();
408
        
409
        //now we have the catalog from the remote server and this local server
410
        //we now need to compare the two and merge the differences.
411
        //the comparison is base on the public_id fields which is the 4th
412
        //entry in each row vector.
413
        Vector publicId = new Vector();
414
        for(int i=0; i<localCatalog.size(); i++)
415
        {
416
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
417
          //System.out.println("v1: " + v.toString());
418
          publicId.add(new String((String)v.elementAt(3)));
419
          //System.out.println("adding " + (String)v.elementAt(3));
420
        }
421
        
422
        for(int i=0; i<remoteCatalog.size(); i++)
423
        {
424
          Vector v = (Vector)remoteCatalog.elementAt(i);
425
          //System.out.println("v2: " + v.toString());
426
          //System.out.println("i: " + i);
427
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
428
          //System.out.println("publicID: " + publicId.toString());
429
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
430
          if(!publicId.contains(v.elementAt(3)))
431
          { //so we don't have this public id in our local table so we need to
432
            //add it.
433
            //System.out.println("in if");
434
            StringBuffer sql = new StringBuffer();
435
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
436
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
437
            sql.append("?,?)");
438
            //System.out.println("sql: " + sql.toString());
439
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
440
            pstmt.setString(1, (String)v.elementAt(0));
441
            pstmt.setString(2, (String)v.elementAt(1));
442
            pstmt.setString(3, (String)v.elementAt(2));
443
            pstmt.setString(4, (String)v.elementAt(3));
444
            pstmt.setString(5, (String)v.elementAt(4));
445
            pstmt.execute();
446
            pstmt.close();
447
          }
448
        }
449
      }
450
    }
451
    catch(Exception e)
452
    {
453
      System.out.println("error in ReplicationHandler.updateCatalog: " + 
454
                          e.getMessage());
455
      e.printStackTrace(System.out);
456
    }
457
  }
458
  
459
  /**
460
   * Method that returns true if docid has already been "deleted" from metacat.
461
   * This method really implements a truth table for deleted documents
462
   * The table is (a docid in one of the tables is represented by the X):
463
   * xml_docs      xml_revs      deleted?
464
   * ------------------------------------
465
   *   X             X             FALSE
466
   *   X             _             FALSE
467
   *   _             X             TRUE
468
   *   _             _             TRUE
469
   */
470
  private static boolean alreadyDeleted(String docid, Connection conn)
471
  {
472
    try
473
    {
474
      boolean xml_docs = false;
475
      boolean xml_revs = false;
476
      
477
      StringBuffer sb = new StringBuffer();
478
      sb.append("select docid from xml_revisions where docid like '");
479
      sb.append(docid).append("'");
480
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
481
      pstmt.execute();
482
      ResultSet rs = pstmt.getResultSet();
483
      boolean tablehasrows = rs.next();
484
      if(tablehasrows)
485
      {
486
        xml_revs = true;
487
      }
488
      
489
      sb = new StringBuffer();
490
      sb.append("select docid from xml_documents where docid like '");
491
      sb.append(docid).append("'");
492
      pstmt.close();
493
      pstmt = conn.prepareStatement(sb.toString());
494
      pstmt.execute();
495
      rs = pstmt.getResultSet();
496
      tablehasrows = rs.next();
497
      pstmt.close();
498
      if(tablehasrows)
499
      {
500
        xml_docs = true;
501
      }
502
      
503
      if(xml_docs && xml_revs)
504
      {
505
        return false;
506
      }
507
      else if(xml_docs && !xml_revs)
508
      {
509
        return false;
510
      }
511
      else if(!xml_docs && xml_revs)
512
      {
513
        return true;
514
      }
515
      else if(!xml_docs && !xml_revs)
516
      {
517
        return true;
518
      }
519
    }
520
    catch(Exception e)
521
    {
522
      System.out.println("error in ReplicationHandler.alreadyDeleted: " + 
523
                          e.getMessage());
524
      e.printStackTrace(System.out);
525
    }
526
    finally
527
    {
528
      
529
    }
530
    return false;
531
  }
532
  
533
  /**
534
   * Checks to see if a document is already in the DB.  Returns
535
   * "UPDATE" if it is, "INSERT" if it isn't
536
   */
537
  private static String getAction(String docid)
538
  {
539
    try
540
    {
541
      MetaCatUtil util = new MetaCatUtil();
542
      StringBuffer sql = new StringBuffer();
543
      sql.append("select docid from xml_documents where docid like '");
544
      sql.append(docid).append("'");
545
      Connection conn = util.openDBConnection();
546
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
547
      pstmt.execute();
548
      ResultSet rs = pstmt.getResultSet();
549

    
550
      if(rs.next())
551
      {
552
        pstmt.close();
553
        conn.close();
554
        return "UPDATE";
555
      }
556
      else
557
      {
558
        pstmt.close();
559
        conn.close();
560
        return "INSERT";
561
      }
562
    }
563
    catch(Exception e)
564
    {
565
      System.out.println("error in replicationHandler.getAction: " + 
566
                          e.getMessage());
567
    }
568
    return "";
569
  }
570
  
571
  /**
572
   * Method to query xml_replication and build a hashtable of each server
573
   * and it's last update time.
574
   * @param conn a connection to the database
575
   */
576
  public static Hashtable buildServerList(Connection conn)
577
  {
578
    Hashtable sl = new Hashtable();
579
    PreparedStatement pstmt;   
580
    try
581
    {
582
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
583
                                    "from xml_replication");
584
      pstmt.execute();
585
      ResultSet rs = pstmt.getResultSet();
586
      boolean tableHasRows = rs.next();
587
      while(tableHasRows)
588
      {
589
        if(rs.getInt(3) == 1)
590
        {//only put the server in the list if the replicate flag is true
591
          String server = rs.getString(1);
592
          String last_checked = rs.getString(2);
593
          if(!server.equals("localhost"))
594
          {
595
            sl.put(server, last_checked);
596
          }
597
        }
598
        tableHasRows = rs.next();   
599
      }
600
      pstmt.close();
601
    }
602
    catch(Exception e)
603
    {
604
      System.out.println("error in replicationHandler.buildServerList(): " +
605
                         e.getMessage());
606
    }
607
    return sl;
608
  }
609
  
610
  /**
611
   * Method to initialize the message parser
612
   */
613
  public static XMLReader initParser(DefaultHandler dh)
614
          throws Exception
615
  {
616
    XMLReader parser = null;
617

    
618
    try {
619
      ContentHandler chandler = dh;
620

    
621
      // Get an instance of the parser
622
      MetaCatUtil util = new MetaCatUtil();
623
      String parserName = util.getOption("saxparser");
624
      parser = XMLReaderFactory.createXMLReader(parserName);
625

    
626
      // Turn off validation
627
      parser.setFeature("http://xml.org/sax/features/validation", false);
628
      
629
      parser.setContentHandler((ContentHandler)chandler);
630
      parser.setErrorHandler((ErrorHandler)chandler);
631

    
632
    } catch (Exception e) {
633
      throw e;
634
    }
635

    
636
    return parser;
637
  }
638
  
639
 
640
}
641
   
(39-39/40)