Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: tao $'
10
 *     '$Date: 2002-04-25 17:02:12 -0700 (Thu, 25 Apr 2002) $'
11
 * '$Revision: 1032 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      try
86
      { //this connection is prone to error for some reason so we 
87
        //try to connect it three times before quiting.
88
        conn = util.openDBConnection();
89
      }
90
      catch(SQLException sqle)
91
      {
92
        try
93
        {
94
          conn = util.openDBConnection();
95
        }
96
        catch(SQLException sqlee)
97
        {
98
          try
99
          {
100
            conn = util.openDBConnection();
101
          }
102
          catch(SQLException sqleee)
103
          {
104
            System.out.println("error getting db connection in " + 
105
                               "ReplicationHandler.run: " +
106
                               sqleee.getMessage());
107
          }
108
        }
109
      }
110
      serverList = buildServerList(conn);
111
      //if serverList is null, metacat don't need to replication
112
      if (serverList==null||serverList.isEmpty())
113
      {
114
        return;
115
      }
116
      updateCatalog(serverList);
117
      update(serverList);
118
      conn.close();
119
    }
120
    catch (Exception e)
121
    {
122
      //System.out.println("Error in replicationHandler.run():"+e.getMessage());
123
    }
124
  }
125
  
126
  /**
127
   * Method that uses revision taging for replication instead of update_date.
128
   */
129
  private void update(Hashtable serverList)
130
  {
131
    /*
132
     Pseudo-algorithm
133
     - request a doc list from each server in xml_replication
134
     - check the rev number of each of those documents agains the 
135
       documents in the local database
136
     - pull any documents that have a lesser rev number on the local server
137
       from the remote server
138
     - delete any documents that still exist in the local xml_documents but
139
       are in the deletedDocuments tag of the remote host response.
140
     - update last_checked to keep track of the last time it was checked.
141
       (this info is theoretically not needed using this system but probably 
142
       should be kept anyway)
143
    */
144
    
145
    Connection conn = null;
146
    try
147
    {
148
      try
149
      { //this connection is prone to error for some reason so we 
150
        //try to connect it three times before quiting.
151
        conn = util.openDBConnection();
152
      }
153
      catch(SQLException sqle)
154
      {
155
        try
156
        {
157
          conn = util.openDBConnection();
158
        }
159
        catch(SQLException sqlee)
160
        {
161
          try
162
          {
163
            conn = util.openDBConnection();
164
          }
165
          catch(SQLException sqleee)
166
          {
167
            /*System.out.println("error getting db connection in " + 
168
                               "ReplicationHandler.update: " +
169
                               sqleee.getMessage());*/
170
          }
171
        }
172
      }
173
    }
174
    catch(Exception e)
175
    {
176
      /*System.out.println("error in ReplicationHandler.update: " + 
177
                          e.getMessage());*/
178
    }
179
    
180
    Enumeration keys;
181
    String server;
182
    String update;
183
    ReplMessageHandler message = new ReplMessageHandler();
184
    Vector responses = new Vector();
185
    PreparedStatement pstmt = null;
186
    ResultSet rs;
187
    boolean tablehasrows;
188
    boolean flag=false;
189
    String action = new String();
190
    XMLReader parser;
191
    URL u;
192
    
193
    try
194
    {
195
      MetaCatUtil.debugMessage("init parser");
196
      parser = initParser(message);
197
      keys = serverList.keys();
198
      while(keys.hasMoreElements())
199
      {
200
        server = (String)(keys.nextElement());
201
        MetacatReplication.replLog("full update started to: " + server);
202
        u = new URL("https://" + server + "?server="
203
        +util.getLocalReplicationServerName()+"&action=update");
204
        //System.out.println("Sending Message: " + u.toString());
205
        String result = MetacatReplication.getURLContent(u);
206
        responses.add(result);
207
      }
208
      
209
      //make sure that there is updated file list
210
      //If response is null, mecat don't need do anything
211
       if (responses==null||responses.isEmpty())
212
      {
213
        return;
214
      }
215
      
216
      //String srvr = util.getOption("servletpath");
217
      
218
      //System.out.println("responses (from srvr): " + responses.toString());
219
      
220
      for(int i=0; i<responses.size(); i++)
221
      { //check each server for updated files
222
        //System.out.println("parsing responses");
223
        parser.parse(new InputSource(
224
                     new StringReader(
225
                     (String)(responses.elementAt(i)))));
226
        Vector v = new Vector(message.getUpdatesVect());
227
        //System.out.println("v: " + v.toString());
228
        Vector d = new Vector(message.getDeletesVect());
229
        //check the revs in u to see if there are any newer ones than
230
        //in the local DB.
231
        for(int j=0; j<v.size(); j++)
232
        {
233
          Vector w = new Vector((Vector)(v.elementAt(j)));
234
          //System.out.println("w: " + w.toString());
235
          String docid = (String)w.elementAt(0);
236
          //System.out.println("docid: " + docid);
237
          int rev = Integer.parseInt((String)w.elementAt(1));
238
          //System.out.println("rev: " + rev);
239
          String docServer = (String)w.elementAt(2);
240
          //System.out.println("docServer: " + docServer);
241
    
242
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
243
                                        "docid like '" + docid + "'");
244
          pstmt.execute();
245
          rs = pstmt.getResultSet();
246
          tablehasrows = rs.next();
247
          if(tablehasrows)
248
          { //check the revs for an update because this document is in the
249
            //local DB, it just might be out of date.
250
            int localrev = rs.getInt(1);
251
            if(localrev == rev)
252
            {
253
              flag = false;
254
            }
255
            else if(localrev < rev)
256
            {//this document needs to be updated so send an read request
257
              action = "UPDATE";
258
              flag = true;
259
            }
260
          }
261
          else
262
          { //insert this document as new because it is not in the local DB
263
            action = "INSERT";
264
            flag = true;
265
          }
266
          
267
          if(flag)
268
          { //if the document needs to be updated or inserted, this is executed
269
            u = new URL("https://" + docServer + "?server="+
270
              util.getLocalReplicationServerName()+"&action=read&docid="+docid);
271
            //System.out.println("Sending message: " + u.toString());
272
            String newxmldoc = MetacatReplication.getURLContent(u);
273
            DocInfoHandler dih = new DocInfoHandler();
274
            XMLReader docinfoParser = initParser(dih);
275
            URL docinfoUrl = new URL("https://" + docServer + 
276
                  "?server="+util.getLocalReplicationServerName()+
277
                  "&action=getdocumentinfo&docid="+docid);
278
            //System.out.println("Sending message: " + docinfoUrl.toString());
279
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
280
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
281
            Hashtable docinfoHash = dih.getDocInfo();
282
            int serverCode = MetacatReplication.getServerCode(docServer);
283
            //System.out.println("updating doc: " + docid +" action: "+ action);
284
            //docid should include rev number too
285
            String accnum=docid+util.getOption("accNumSeparator")+
286
                                              (String)docinfoHash.get("rev");
287
            //System.out.println("accnum: "+accnum);
288
            try
289
            {
290
              String newDocid = DocumentImpl.write(conn, 
291
                              new StringReader(newxmldoc),
292
                              (String)docinfoHash.get("public_access"),
293
                              null,  /* the dtd text */
294
                              action, 
295
                              accnum, 
296
                              (String)docinfoHash.get("user_owner"),
297
                              null, /* null for groups[] */
298
                              serverCode, 
299
                              true, /* override */
300
                              false /* validate */);
301
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
302
                                         docServer);
303
              /*System.out.println("wrote doc " + docid + " from " + 
304
                                 docServer);*/
305
            }
306
            catch(Exception e)
307
            {
308
              System.out.println("error writing document in " + 
309
                                 "ReplicationHandler.update: " + docid +
310
                                 ": " + e.getMessage());
311
            }
312
          }
313
        }
314
        
315
        for(int k=0; k<d.size(); k++)
316
        { //delete the deleted documents;
317
          Vector w = new Vector((Vector)d.elementAt(k));
318
          String docid = (String)w.elementAt(0);
319
          if(!alreadyDeleted(docid, conn))
320
          {
321
            DocumentImpl.delete(conn, docid, null, null);
322
            System.out.println("Document " + docid + " deleted.");
323
            MetacatReplication.replLog("doc " + docid + " deleted");
324
          }
325
        }
326
        
327
        keys = serverList.keys();
328
        while(keys.hasMoreElements())
329
        {
330
          server = (String)(keys.nextElement()); 
331
          URL dateurl = new URL("https://" + server + "?server="+
332
          util.getLocalReplicationServerName()+"&action=gettime");
333
          String datexml = MetacatReplication.getURLContent(dateurl);
334
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
335
          StringBuffer sql = new StringBuffer();
336
          sql.append("update xml_replication set last_checked = to_date('");
337
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
338
          sql.append("server like '").append(server).append("'");
339
          //System.out.println("sql: " + sql.toString());
340
          pstmt.close();
341
          pstmt = conn.prepareStatement(sql.toString());
342
          pstmt.executeUpdate();
343
          //conn.commit();
344
          System.out.println("last_checked updated to " + datestr + " on " +
345
                            server);
346
        }
347
      }
348
    }
349
    catch(Exception e)
350
    {
351
      /*System.out.println("error in ReplicationHandler.update: " + 
352
                          e.getMessage());*/
353
      e.printStackTrace(System.out);
354
    }
355
    finally
356
    {
357
      try
358
      {
359
        pstmt.close();
360
      }
361
      catch(Exception ee) {}
362
    }
363
  }
364
  
365
  /**
366
   * updates xml_catalog with entries from other servers.
367
   */
368
  private void updateCatalog(Hashtable serverList)
369
  {
370
    Connection conn = null;
371
    try
372
    {
373
      try
374
      { //this connection is prone to error for some reason so we 
375
        //try to connect it three times before quiting.
376
        conn = util.openDBConnection();
377
      }
378
      catch(SQLException sqle)
379
      {
380
        try
381
        {
382
          conn = util.openDBConnection();
383
        }
384
        catch(SQLException sqlee)
385
        {
386
          try
387
          {
388
            conn = util.openDBConnection();
389
          }
390
          catch(SQLException sqleee)
391
          {
392
            System.out.println("error getting db connection in " + 
393
                               "ReplicationHandler.update: " +
394
                               sqleee.getMessage());
395
          }
396
        }
397
      }
398
      String server;
399
      Enumeration keys = serverList.keys();
400
      while(keys.hasMoreElements())
401
      { //go through each server
402
        server = (String)(keys.nextElement());
403
        URL u = new URL("https://" + server + "?server="+
404
        util.getLocalReplicationServerName()+"&action=getcatalog");
405
        //System.out.println("sending message " + u.toString());
406
        String catxml = MetacatReplication.getURLContent(u);
407
        //System.out.println("catxml: " + catxml);
408
        CatalogMessageHandler cmh = new CatalogMessageHandler();
409
        XMLReader catparser = initParser(cmh);
410
        catparser.parse(new InputSource(new StringReader(catxml)));
411
        //parse the returned catalog xml and put it into a vector
412
        Vector remoteCatalog = cmh.getCatalogVect();
413
        
414
        String localcatxml = MetacatReplication.getCatalogXML();
415
        //System.out.println("localcatxml: " + localcatxml);
416
        cmh = new CatalogMessageHandler();
417
        catparser = initParser(cmh);
418
        catparser.parse(new InputSource(new StringReader(localcatxml)));
419
        Vector localCatalog = cmh.getCatalogVect();
420
        
421
        //now we have the catalog from the remote server and this local server
422
        //we now need to compare the two and merge the differences.
423
        //the comparison is base on the public_id fields which is the 4th
424
        //entry in each row vector.
425
        Vector publicId = new Vector();
426
        for(int i=0; i<localCatalog.size(); i++)
427
        {
428
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
429
          //System.out.println("v1: " + v.toString());
430
          publicId.add(new String((String)v.elementAt(3)));
431
          //System.out.println("adding " + (String)v.elementAt(3));
432
        }
433
        
434
        for(int i=0; i<remoteCatalog.size(); i++)
435
        {
436
          Vector v = (Vector)remoteCatalog.elementAt(i);
437
          //System.out.println("v2: " + v.toString());
438
          //System.out.println("i: " + i);
439
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
440
          //System.out.println("publicID: " + publicId.toString());
441
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
442
          if(!publicId.contains(v.elementAt(3)))
443
          { //so we don't have this public id in our local table so we need to
444
            //add it.
445
            //System.out.println("in if");
446
            StringBuffer sql = new StringBuffer();
447
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
448
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
449
            sql.append("?,?)");
450
            //System.out.println("sql: " + sql.toString());
451
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
452
            pstmt.setString(1, (String)v.elementAt(0));
453
            pstmt.setString(2, (String)v.elementAt(1));
454
            pstmt.setString(3, (String)v.elementAt(2));
455
            pstmt.setString(4, (String)v.elementAt(3));
456
            pstmt.setString(5, (String)v.elementAt(4));
457
            pstmt.execute();
458
            pstmt.close();
459
          }
460
        }
461
      }
462
    }
463
    catch(Exception e)
464
    {
465
      System.out.println("error in ReplicationHandler.updateCatalog: " + 
466
                          e.getMessage());
467
      e.printStackTrace(System.out);
468
    }
469
  }
470
  
471
  /**
472
   * Method that returns true if docid has already been "deleted" from metacat.
473
   * This method really implements a truth table for deleted documents
474
   * The table is (a docid in one of the tables is represented by the X):
475
   * xml_docs      xml_revs      deleted?
476
   * ------------------------------------
477
   *   X             X             FALSE
478
   *   X             _             FALSE
479
   *   _             X             TRUE
480
   *   _             _             TRUE
481
   */
482
  private static boolean alreadyDeleted(String docid, Connection conn)
483
  {
484
    try
485
    {
486
      boolean xml_docs = false;
487
      boolean xml_revs = false;
488
      
489
      StringBuffer sb = new StringBuffer();
490
      sb.append("select docid from xml_revisions where docid like '");
491
      sb.append(docid).append("'");
492
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
493
      pstmt.execute();
494
      ResultSet rs = pstmt.getResultSet();
495
      boolean tablehasrows = rs.next();
496
      if(tablehasrows)
497
      {
498
        xml_revs = true;
499
      }
500
      
501
      sb = new StringBuffer();
502
      sb.append("select docid from xml_documents where docid like '");
503
      sb.append(docid).append("'");
504
      pstmt.close();
505
      pstmt = conn.prepareStatement(sb.toString());
506
      pstmt.execute();
507
      rs = pstmt.getResultSet();
508
      tablehasrows = rs.next();
509
      pstmt.close();
510
      if(tablehasrows)
511
      {
512
        xml_docs = true;
513
      }
514
      
515
      if(xml_docs && xml_revs)
516
      {
517
        return false;
518
      }
519
      else if(xml_docs && !xml_revs)
520
      {
521
        return false;
522
      }
523
      else if(!xml_docs && xml_revs)
524
      {
525
        return true;
526
      }
527
      else if(!xml_docs && !xml_revs)
528
      {
529
        return true;
530
      }
531
    }
532
    catch(Exception e)
533
    {
534
      System.out.println("error in ReplicationHandler.alreadyDeleted: " + 
535
                          e.getMessage());
536
      e.printStackTrace(System.out);
537
    }
538
    finally
539
    {
540
      
541
    }
542
    return false;
543
  }
544
  
545
  /**
546
   * Checks to see if a document is already in the DB.  Returns
547
   * "UPDATE" if it is, "INSERT" if it isn't
548
   */
549
  private static String getAction(String docid)
550
  {
551
    try
552
    {
553
      MetaCatUtil util = new MetaCatUtil();
554
      StringBuffer sql = new StringBuffer();
555
      sql.append("select docid from xml_documents where docid like '");
556
      sql.append(docid).append("'");
557
      Connection conn = util.openDBConnection();
558
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
559
      pstmt.execute();
560
      ResultSet rs = pstmt.getResultSet();
561

    
562
      if(rs.next())
563
      {
564
        pstmt.close();
565
        conn.close();
566
        return "UPDATE";
567
      }
568
      else
569
      {
570
        pstmt.close();
571
        conn.close();
572
        return "INSERT";
573
      }
574
    }
575
    catch(Exception e)
576
    {
577
      System.out.println("error in replicationHandler.getAction: " + 
578
                          e.getMessage());
579
    }
580
    return "";
581
  }
582
  
583
  /**
584
   * Method to query xml_replication and build a hashtable of each server
585
   * and it's last update time.
586
   * @param conn a connection to the database
587
   */
588
  public static Hashtable buildServerList(Connection conn)
589
  {
590
    Hashtable sl = new Hashtable();
591
    PreparedStatement pstmt;   
592
    try
593
    {
594
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
595
                                    "from xml_replication");
596
      pstmt.execute();
597
      ResultSet rs = pstmt.getResultSet();
598
      boolean tableHasRows = rs.next();
599
      while(tableHasRows)
600
      {
601
        if(rs.getInt(3) == 1)
602
        {//only put the server in the list if the replicate flag is true
603
          String server = rs.getString(1);
604
          String last_checked = rs.getString(2);
605
          if(!server.equals("localhost"))
606
          {
607
            sl.put(server, last_checked);
608
          }
609
        }
610
        tableHasRows = rs.next();   
611
      }
612
      pstmt.close();
613
    }
614
    catch(Exception e)
615
    {
616
      System.out.println("error in replicationHandler.buildServerList(): " +
617
                         e.getMessage());
618
    }
619
    return sl;
620
  }
621
  
622
  /**
623
   * Method to initialize the message parser
624
   */
625
  public static XMLReader initParser(DefaultHandler dh)
626
          throws Exception
627
  {
628
    XMLReader parser = null;
629

    
630
    try {
631
      ContentHandler chandler = dh;
632

    
633
      // Get an instance of the parser
634
      MetaCatUtil util = new MetaCatUtil();
635
      String parserName = util.getOption("saxparser");
636
      parser = XMLReaderFactory.createXMLReader(parserName);
637

    
638
      // Turn off validation
639
      parser.setFeature("http://xml.org/sax/features/validation", false);
640
      
641
      parser.setContentHandler((ContentHandler)chandler);
642
      parser.setErrorHandler((ErrorHandler)chandler);
643

    
644
    } catch (Exception e) {
645
      throw e;
646
    }
647

    
648
    return parser;
649
  }
650
  
651
 
652
}
653
   
(40-40/41)