Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2001-01-19 15:20:58 -0800 (Fri, 19 Jan 2001) $'
11
 * '$Revision: 683 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      try
86
      { //this connection is prone to error for some reason so we 
87
        //try to connect it three times before quiting.
88
        conn = util.openDBConnection();
89
      }
90
      catch(SQLException sqle)
91
      {
92
        try
93
        {
94
          conn = util.openDBConnection();
95
        }
96
        catch(SQLException sqlee)
97
        {
98
          try
99
          {
100
            conn = util.openDBConnection();
101
          }
102
          catch(SQLException sqleee)
103
          {
104
            System.out.println("error getting db connection in " + 
105
                               "ReplicationHandler.run: " +
106
                               sqleee.getMessage());
107
          }
108
        }
109
      }
110
      serverList = buildServerList(conn);
111
      update(serverList);
112
      updateCatalog(serverList);
113
      conn.close();
114
    }
115
    catch (Exception e)
116
    {
117
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
118
    }
119
  }
120
  
121
  /**
122
   * Method that uses revision taging for replication instead of update_date.
123
   */
124
  private void update(Hashtable serverList)
125
  {
126
    /*
127
     Pseudo-algorithm
128
     - request a doc list from each server in xml_replication
129
     - check the rev number of each of those documents agains the 
130
       documents in the local database
131
     - pull any documents that have a lesser rev number on the local server
132
       from the remote server
133
     - delete any documents that still exist in the local xml_documents but
134
       are in the deletedDocuments tag of the remote host response.
135
     - update last_checked to keep track of the last time it was checked.
136
       (this info is theoretically not needed using this system but probably 
137
       should be kept anyway)
138
    */
139
    
140
    Connection conn = null;
141
    try
142
    {
143
      try
144
      { //this connection is prone to error for some reason so we 
145
        //try to connect it three times before quiting.
146
        conn = util.openDBConnection();
147
      }
148
      catch(SQLException sqle)
149
      {
150
        try
151
        {
152
          conn = util.openDBConnection();
153
        }
154
        catch(SQLException sqlee)
155
        {
156
          try
157
          {
158
            conn = util.openDBConnection();
159
          }
160
          catch(SQLException sqleee)
161
          {
162
            System.out.println("error getting db connection in " + 
163
                               "ReplicationHandler.update: " +
164
                               sqleee.getMessage());
165
          }
166
        }
167
      }
168
    }
169
    catch(Exception e)
170
    {
171
      System.out.println("error in ReplicationHandler.update: " + 
172
                          e.getMessage());
173
    }
174
    
175
    Enumeration keys;
176
    String server;
177
    String update;
178
    ReplMessageHandler message = new ReplMessageHandler();
179
    Vector responses = new Vector();
180
    PreparedStatement pstmt = null;
181
    ResultSet rs;
182
    boolean tablehasrows;
183
    boolean flag=false;
184
    String action = new String();
185
    XMLReader parser;
186
    URL u;
187
    
188
    try
189
    {
190
      MetaCatUtil.debugMessage("init parser");
191
      parser = initParser(message);
192
      keys = serverList.keys();
193
      while(keys.hasMoreElements())
194
      {
195
        server = (String)(keys.nextElement());
196
        MetacatReplication.replLog("full update started to: " + server);
197
        u = new URL("http://" + server + "?action=update");
198
        System.out.println("Sending Message: " + u.toString());
199
        String result = MetacatReplication.getURLContent(u);
200
        responses.add(result);
201
      }
202
      
203
      //String srvr = util.getOption("servletpath");
204
      
205
      //System.out.println("responses (from srvr): " + responses.toString());
206
      
207
      for(int i=0; i<responses.size(); i++)
208
      { //check each server for updated files
209
        //System.out.println("parsing responses");
210
        parser.parse(new InputSource(
211
                     new StringReader(
212
                     (String)(responses.elementAt(i)))));
213
        Vector v = new Vector(message.getUpdatesVect());
214
        //System.out.println("v: " + v.toString());
215
        Vector d = new Vector(message.getDeletesVect());
216
        //check the revs in u to see if there are any newer ones than
217
        //in the local DB.
218
        for(int j=0; j<v.size(); j++)
219
        {
220
          Vector w = new Vector((Vector)(v.elementAt(j)));
221
          //System.out.println("w: " + w.toString());
222
          String docid = (String)w.elementAt(0);
223
          //System.out.println("docid: " + docid);
224
          int rev = Integer.parseInt((String)w.elementAt(1));
225
          //System.out.println("rev: " + rev);
226
          String docServer = (String)w.elementAt(2);
227
          //System.out.println("docServer: " + docServer);
228
    
229
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
230
                                        "docid like '" + docid + "'");
231
          pstmt.execute();
232
          rs = pstmt.getResultSet();
233
          tablehasrows = rs.next();
234
          if(tablehasrows)
235
          { //check the revs for an update because this document is in the
236
            //local DB, it just might be out of date.
237
            int localrev = rs.getInt(1);
238
            if(localrev == rev)
239
            {
240
              flag = false;
241
            }
242
            else if(localrev < rev)
243
            {//this document needs to be updated so send an read request
244
              action = "UPDATE";
245
              flag = true;
246
            }
247
          }
248
          else
249
          { //insert this document as new because it is not in the local DB
250
            action = "INSERT";
251
            flag = true;
252
          }
253
          
254
          if(flag)
255
          { //if the document needs to be updated or inserted, this is executed
256
            u = new URL("http://" + docServer + "?action=read&docid=" +
257
                          docid);
258
            System.out.println("Sending message: " + u.toString());
259
            String newxmldoc = MetacatReplication.getURLContent(u);
260
            DocInfoHandler dih = new DocInfoHandler();
261
            XMLReader docinfoParser = initParser(dih);
262
            URL docinfoUrl = new URL("http://" + docServer + 
263
                                   "?action=getdocumentinfo&docid=" +
264
                                   docid);
265
            System.out.println("Sending message: " + docinfoUrl.toString());
266
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
267
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
268
            Hashtable docinfoHash = dih.getDocInfo();
269
            int serverCode = MetacatReplication.getServerCode(docServer);
270
            System.out.println("updating doc: " + docid + " action: "+ action);
271
            try
272
            {
273
              String newDocid = DocumentImpl.write(conn, 
274
                              new StringReader(newxmldoc),
275
                              null,
276
                              action, 
277
                              docid, 
278
                              (String)docinfoHash.get("user_owner"),
279
                              (String)docinfoHash.get("user_owner"), 
280
                              serverCode, 
281
                              true);
282
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
283
                                         docServer);
284
              System.out.println("wrote doc " + docid + " from " + 
285
                                 docServer);
286
            }
287
            catch(Exception e)
288
            {
289
              System.out.println("error writing document in " + 
290
                                 "ReplicationHandler.update: " + docid +
291
                                 ": " + e.getMessage());
292
            }
293
          }
294
        }
295
        
296
        for(int k=0; k<d.size(); k++)
297
        { //delete the deleted documents;
298
          Vector w = new Vector((Vector)d.elementAt(k));
299
          String docid = (String)w.elementAt(0);
300
          if(!alreadyDeleted(docid, conn))
301
          {
302
            DocumentImpl.delete(conn, docid, null, null);
303
            System.out.println("Document " + docid + " deleted.");
304
            MetacatReplication.replLog("doc " + docid + " deleted");
305
          }
306
        }
307
        
308
        keys = serverList.keys();
309
        while(keys.hasMoreElements())
310
        {
311
          server = (String)(keys.nextElement()); 
312
          URL dateurl = new URL("http://" + server + "?action=gettime");
313
          String datexml = MetacatReplication.getURLContent(dateurl);
314
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
315
          StringBuffer sql = new StringBuffer();
316
          sql.append("update xml_replication set last_checked = to_date('");
317
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
318
          sql.append("server like '").append(server).append("'");
319
          //System.out.println("sql: " + sql.toString());
320
          pstmt.close();
321
          pstmt = conn.prepareStatement(sql.toString());
322
          pstmt.executeUpdate();
323
          //conn.commit();
324
          System.out.println("last_checked updated to " + datestr + " on " +
325
                            server);
326
        }
327
      }
328
    }
329
    catch(Exception e)
330
    {
331
      System.out.println("error in ReplicationHandler.update: " + 
332
                          e.getMessage());
333
      e.printStackTrace(System.out);
334
    }
335
    finally
336
    {
337
      try
338
      {
339
        pstmt.close();
340
      }
341
      catch(Exception ee) {}
342
    }
343
  }
344
  
345
  /**
346
   * updates xml_catalog with entries from other servers.
347
   */
348
  private void updateCatalog(Hashtable serverList)
349
  {
350
    Connection conn = null;
351
    try
352
    {
353
      try
354
      { //this connection is prone to error for some reason so we 
355
        //try to connect it three times before quiting.
356
        conn = util.openDBConnection();
357
      }
358
      catch(SQLException sqle)
359
      {
360
        try
361
        {
362
          conn = util.openDBConnection();
363
        }
364
        catch(SQLException sqlee)
365
        {
366
          try
367
          {
368
            conn = util.openDBConnection();
369
          }
370
          catch(SQLException sqleee)
371
          {
372
            System.out.println("error getting db connection in " + 
373
                               "ReplicationHandler.update: " +
374
                               sqleee.getMessage());
375
          }
376
        }
377
      }
378
      String server;
379
      Enumeration keys = serverList.keys();
380
      while(keys.hasMoreElements())
381
      { //go through each server
382
        server = (String)(keys.nextElement());
383
        URL u = new URL("http://" + server + "?action=getcatalog");
384
        System.out.println("sending message " + u.toString());
385
        String catxml = MetacatReplication.getURLContent(u);
386
        //System.out.println("catxml: " + catxml);
387
        CatalogMessageHandler cmh = new CatalogMessageHandler();
388
        XMLReader catparser = initParser(cmh);
389
        catparser.parse(new InputSource(new StringReader(catxml)));
390
        //parse the returned catalog xml and put it into a vector
391
        Vector remoteCatalog = cmh.getCatalogVect();
392
        
393
        String localcatxml = MetacatReplication.getCatalogXML();
394
        //System.out.println("localcatxml: " + localcatxml);
395
        cmh = new CatalogMessageHandler();
396
        catparser = initParser(cmh);
397
        catparser.parse(new InputSource(new StringReader(localcatxml)));
398
        Vector localCatalog = cmh.getCatalogVect();
399
        
400
        //now we have the catalog from the remote server and this local server
401
        //we now need to compare the two and merge the differences.
402
        //the comparison is base on the public_id fields which is the 4th
403
        //entry in each row vector.
404
        Vector publicId = new Vector();
405
        for(int i=0; i<localCatalog.size(); i++)
406
        {
407
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
408
          //System.out.println("v1: " + v.toString());
409
          publicId.add(new String((String)v.elementAt(3)));
410
          //System.out.println("adding " + (String)v.elementAt(3));
411
        }
412
        
413
        for(int i=0; i<remoteCatalog.size(); i++)
414
        {
415
          Vector v = (Vector)remoteCatalog.elementAt(i);
416
          //System.out.println("v2: " + v.toString());
417
          //System.out.println("i: " + i);
418
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
419
          //System.out.println("publicID: " + publicId.toString());
420
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
421
          if(!publicId.contains(v.elementAt(3)))
422
          { //so we don't have this public id in our local table so we need to
423
            //add it.
424
            //System.out.println("in if");
425
            StringBuffer sql = new StringBuffer();
426
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
427
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
428
            sql.append("?,?)");
429
            //System.out.println("sql: " + sql.toString());
430
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
431
            pstmt.setString(1, (String)v.elementAt(0));
432
            pstmt.setString(2, (String)v.elementAt(1));
433
            pstmt.setString(3, (String)v.elementAt(2));
434
            pstmt.setString(4, (String)v.elementAt(3));
435
            pstmt.setString(5, (String)v.elementAt(4));
436
            pstmt.execute();
437
            pstmt.close();
438
          }
439
        }
440
      }
441
    }
442
    catch(Exception e)
443
    {
444
      System.out.println("error in ReplicationHandler.updateCatalog: " + 
445
                          e.getMessage());
446
      e.printStackTrace(System.out);
447
    }
448
  }
449
  
450
  /**
451
   * Method that returns true if docid has already been "deleted" from metacat.
452
   * This method really implements a truth table for deleted documents
453
   * The table is (a docid in one of the tables is represented by the X):
454
   * xml_docs      xml_revs      deleted?
455
   * ------------------------------------
456
   *   X             X             FALSE
457
   *   X             _             FALSE
458
   *   _             X             TRUE
459
   *   _             _             TRUE
460
   */
461
  private static boolean alreadyDeleted(String docid, Connection conn)
462
  {
463
    try
464
    {
465
      boolean xml_docs = false;
466
      boolean xml_revs = false;
467
      
468
      StringBuffer sb = new StringBuffer();
469
      sb.append("select docid from xml_revisions where docid like '");
470
      sb.append(docid).append("'");
471
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
472
      pstmt.execute();
473
      ResultSet rs = pstmt.getResultSet();
474
      boolean tablehasrows = rs.next();
475
      if(tablehasrows)
476
      {
477
        xml_revs = true;
478
      }
479
      
480
      sb = new StringBuffer();
481
      sb.append("select docid from xml_documents where docid like '");
482
      sb.append(docid).append("'");
483
      pstmt.close();
484
      pstmt = conn.prepareStatement(sb.toString());
485
      pstmt.execute();
486
      rs = pstmt.getResultSet();
487
      tablehasrows = rs.next();
488
      pstmt.close();
489
      if(tablehasrows)
490
      {
491
        xml_docs = true;
492
      }
493
      
494
      if(xml_docs && xml_revs)
495
      {
496
        return false;
497
      }
498
      else if(xml_docs && !xml_revs)
499
      {
500
        return false;
501
      }
502
      else if(!xml_docs && xml_revs)
503
      {
504
        return true;
505
      }
506
      else if(!xml_docs && !xml_revs)
507
      {
508
        return true;
509
      }
510
    }
511
    catch(Exception e)
512
    {
513
      System.out.println("error in ReplicationHandler.alreadyDeleted: " + 
514
                          e.getMessage());
515
      e.printStackTrace(System.out);
516
    }
517
    finally
518
    {
519
      
520
    }
521
    return false;
522
  }
523
  
524
  /**
525
   * Checks to see if a document is already in the DB.  Returns
526
   * "UPDATE" if it is, "INSERT" if it isn't
527
   */
528
  private static String getAction(String docid)
529
  {
530
    try
531
    {
532
      MetaCatUtil util = new MetaCatUtil();
533
      StringBuffer sql = new StringBuffer();
534
      sql.append("select docid from xml_documents where docid like '");
535
      sql.append(docid).append("'");
536
      Connection conn = util.openDBConnection();
537
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
538
      pstmt.execute();
539
      ResultSet rs = pstmt.getResultSet();
540

    
541
      if(rs.next())
542
      {
543
        pstmt.close();
544
        conn.close();
545
        return "UPDATE";
546
      }
547
      else
548
      {
549
        pstmt.close();
550
        conn.close();
551
        return "INSERT";
552
      }
553
    }
554
    catch(Exception e)
555
    {
556
      System.out.println("error in replicationHandler.getAction: " + 
557
                          e.getMessage());
558
    }
559
    return "";
560
  }
561
  
562
  /**
563
   * Method to query xml_replication and build a hashtable of each server
564
   * and it's last update time.
565
   * @param conn a connection to the database
566
   */
567
  public static Hashtable buildServerList(Connection conn)
568
  {
569
    Hashtable sl = new Hashtable();
570
    PreparedStatement pstmt;   
571
    try
572
    {
573
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
574
                                    "from xml_replication");
575
      pstmt.execute();
576
      ResultSet rs = pstmt.getResultSet();
577
      boolean tableHasRows = rs.next();
578
      while(tableHasRows)
579
      {
580
        if(rs.getInt(3) == 1)
581
        {//only put the server in the list if the replicate flag is true
582
          String server = rs.getString(1);
583
          String last_checked = rs.getString(2);
584
          if(!server.equals("localhost"))
585
          {
586
            sl.put(server, last_checked);
587
          }
588
        }
589
        tableHasRows = rs.next();   
590
      }
591
      pstmt.close();
592
    }
593
    catch(Exception e)
594
    {
595
      System.out.println("error in replicationHandler.buildServerList(): " +
596
                         e.getMessage());
597
    }
598
    return sl;
599
  }
600
  
601
  /**
602
   * Method to initialize the message parser
603
   */
604
  public static XMLReader initParser(DefaultHandler dh)
605
          throws Exception
606
  {
607
    XMLReader parser = null;
608

    
609
    try {
610
      ContentHandler chandler = dh;
611

    
612
      // Get an instance of the parser
613
      MetaCatUtil util = new MetaCatUtil();
614
      String parserName = util.getOption("saxparser");
615
      parser = XMLReaderFactory.createXMLReader(parserName);
616

    
617
      // Turn off validation
618
      parser.setFeature("http://xml.org/sax/features/validation", false);
619
      
620
      parser.setContentHandler((ContentHandler)chandler);
621
      parser.setErrorHandler((ErrorHandler)chandler);
622

    
623
    } catch (Exception e) {
624
      throw e;
625
    }
626

    
627
    return parser;
628
  }
629
}
(42-42/43)