Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2001-01-18 15:15:21 -0800 (Thu, 18 Jan 2001) $'
11
 * '$Revision: 675 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
 
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.*;
31
import java.util.*;
32
import java.lang.Thread; 
33
import java.io.*;
34
import java.net.*;
35
import java.text.*;
36
import org.xml.sax.AttributeList;
37
import org.xml.sax.ContentHandler;
38
import org.xml.sax.DTDHandler;
39
import org.xml.sax.EntityResolver;
40
import org.xml.sax.ErrorHandler;
41
import org.xml.sax.InputSource;
42
import org.xml.sax.XMLReader;
43
import org.xml.sax.SAXException;
44
import org.xml.sax.SAXParseException;
45
import org.xml.sax.helpers.XMLReaderFactory;
46
import org.xml.sax.helpers.DefaultHandler;
47

    
48

    
49

    
50
/**
51
 * This class handles deltaT replication checking.  Whenever this TimerTask
52
 * is fired it checks each server in xml_replication for updates and updates
53
 * the local db as needed.
54
 */
55
public class ReplicationHandler extends TimerTask
56
{
57
  int serverCheckCode = 1;
58
  MetaCatUtil util = new MetaCatUtil();
59
  Hashtable serverList = new Hashtable(); 
60
  Connection conn;
61
  PrintWriter out;
62
  
63
  public ReplicationHandler(PrintWriter o)
64
  {
65
    this.out = o;
66
  }
67
  
68
  public ReplicationHandler(PrintWriter o, int serverCheckCode)
69
  {
70
    this.out = o;
71
    this.serverCheckCode = serverCheckCode;
72
  }
73
  
74
  /**
75
   * Method that implements TimerTask.run().  It runs whenever the timer is 
76
   * fired.
77
   */
78
  public void run()
79
  {
80
    //find out the last_checked time of each server in the server list and
81
    //send a query to each server to see if there are any documents in 
82
    //xml_documents with an update_date > last_checked
83
    try
84
    {
85
      conn = util.openDBConnection();
86
      serverList = buildServerList(conn);
87
      update(serverList, conn);
88
      updateCatalog(serverList, conn);
89
      conn.close();
90
    }
91
    catch (Exception e)
92
    {
93
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
94
    }
95
  }
96
  
97
  /**
98
   * Method that uses revision taging for replication instead of update_date.
99
   */
100
  private void update(Hashtable serverList, Connection conn)
101
  {
102
    /*
103
     Pseudo-algorithm
104
     - request a doc list from each server in xml_replication
105
     - check the rev number of each of those documents agains the 
106
       documents in the local database
107
     - pull any documents that have a lesser rev number on the local server
108
       from the remote server
109
     - delete any documents that still exist in the local xml_documents but
110
       are in the deletedDocuments tag of the remote host response.
111
     - update last_checked to keep track of the last time it was checked.
112
       (this info is theoretically not needed using this system but probably 
113
       should be kept anyway)
114
    */
115
    Enumeration keys;
116
    String server;
117
    String update;
118
    ReplMessageHandler message = new ReplMessageHandler();
119
    Vector responses = new Vector();
120
    PreparedStatement pstmt = null;
121
    ResultSet rs;
122
    boolean tablehasrows;
123
    boolean flag=false;
124
    String action = new String();
125
    XMLReader parser;
126
    URL u;
127
    
128
    try
129
    {
130
      MetaCatUtil.debugMessage("init parser");
131
      parser = initParser(message);
132
      keys = serverList.keys();
133
      while(keys.hasMoreElements())
134
      {
135
        server = (String)(keys.nextElement());
136
        MetacatReplication.replLog("full update started to: " + server);
137
        u = new URL("http://" + server + "?action=update");
138
        System.out.println("Sending Message: " + u.toString());
139
        String result = MetacatReplication.getURLContent(u);
140
        responses.add(result);
141
      }
142
      
143
      //String srvr = util.getOption("servletpath");
144
      
145
      //System.out.println("responses (from srvr): " + responses.toString());
146
      
147
      for(int i=0; i<responses.size(); i++)
148
      { //check each server for updated files
149
        //System.out.println("parsing responses");
150
        parser.parse(new InputSource(
151
                     new StringReader(
152
                     (String)(responses.elementAt(i)))));
153
        Vector v = new Vector(message.getUpdatesVect());
154
        //System.out.println("v: " + v.toString());
155
        Vector d = new Vector(message.getDeletesVect());
156
        //check the revs in u to see if there are any newer ones than
157
        //in the local DB.
158
        for(int j=0; j<v.size(); j++)
159
        {
160
          Vector w = new Vector((Vector)(v.elementAt(j)));
161
          //System.out.println("w: " + w.toString());
162
          String docid = (String)w.elementAt(0);
163
          //System.out.println("docid: " + docid);
164
          int rev = Integer.parseInt((String)w.elementAt(1));
165
          //System.out.println("rev: " + rev);
166
          String docServer = (String)w.elementAt(2);
167
          //System.out.println("docServer: " + docServer);
168
    
169
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
170
                                        "docid like '" + docid + "'");
171
          pstmt.execute();
172
          rs = pstmt.getResultSet();
173
          tablehasrows = rs.next();
174
          if(tablehasrows)
175
          { //check the revs for an update because this document is in the
176
            //local DB, it just might be out of date.
177
            int localrev = rs.getInt(1);
178
            if(localrev == rev)
179
            {
180
              flag = false;
181
            }
182
            else if(localrev < rev)
183
            {//this document needs to be updated so send an read request
184
              action = "UPDATE";
185
              flag = true;
186
            }
187
          }
188
          else
189
          { //insert this document as new because it is not in the local DB
190
            action = "INSERT";
191
            flag = true;
192
          }
193
          
194
          if(flag)
195
          { //if the document needs to be updated or inserted, this is executed
196
            u = new URL("http://" + docServer + "?action=read&docid=" +
197
                          docid);
198
            System.out.println("Sending message: " + u.toString());
199
            String newxmldoc = MetacatReplication.getURLContent(u);
200
            DocInfoHandler dih = new DocInfoHandler();
201
            XMLReader docinfoParser = initParser(dih);
202
            URL docinfoUrl = new URL("http://" + docServer + 
203
                                   "?action=getdocumentinfo&docid=" +
204
                                   docid);
205
            System.out.println("Sending message: " + docinfoUrl.toString());
206
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
207
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
208
            Hashtable docinfoHash = dih.getDocInfo();
209
            int serverCode = MetacatReplication.getServerCode(docServer);
210
            System.out.println("updating doc: " + docid + " action: "+ action);
211
            try
212
            {
213
              String newDocid = DocumentImpl.write(conn, 
214
                              new StringReader(newxmldoc),
215
                              null,
216
                              action, 
217
                              docid, 
218
                              (String)docinfoHash.get("user_owner"),
219
                              (String)docinfoHash.get("user_owner"), 
220
                              serverCode, 
221
                              true);
222
              MetacatReplication.replLog("wrote doc " + docid + " from " + 
223
                                         docServer);
224
              System.out.println("wrote doc " + docid + " from " + 
225
                                 docServer);
226
            }
227
            catch(Exception e)
228
            {
229
              System.out.println("error writing document in " + 
230
                                 "ReplicationHandler.update: " + docid +
231
                                 ": " + e.getMessage());
232
            }
233
          }
234
        }
235
        
236
        for(int k=0; k<d.size(); k++)
237
        { //delete the deleted documents;
238
          Vector w = new Vector((Vector)d.elementAt(k));
239
          String docid = (String)w.elementAt(0);
240
          if(!alreadyDeleted(docid, conn))
241
          {
242
            DocumentImpl.delete(conn, docid, null, null);
243
            System.out.println("Document " + docid + " deleted.");
244
            MetacatReplication.replLog("doc " + docid + " deleted");
245
          }
246
        }
247
        
248
        keys = serverList.keys();
249
        while(keys.hasMoreElements())
250
        {
251
          server = (String)(keys.nextElement()); 
252
          URL dateurl = new URL("http://" + server + "?action=gettime");
253
          String datexml = MetacatReplication.getURLContent(dateurl);
254
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
255
          StringBuffer sql = new StringBuffer();
256
          sql.append("update xml_replication set last_checked = to_date('");
257
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
258
          sql.append("server like '").append(server).append("'");
259
          //System.out.println("sql: " + sql.toString());
260
          pstmt.close();
261
          pstmt = conn.prepareStatement(sql.toString());
262
          pstmt.executeUpdate();
263
          //conn.commit();
264
          System.out.println("last_checked updated to " + datestr + " on " +
265
                            server);
266
        }
267
      }
268
    }
269
    catch(Exception e)
270
    {
271
      System.out.println("error in ReplicationHandler.update: " + 
272
                          e.getMessage());
273
      e.printStackTrace(System.out);
274
    }
275
    finally
276
    {
277
      try
278
      {
279
        pstmt.close();
280
      }
281
      catch(Exception ee) {}
282
    }
283
  }
284
  
285
  /**
286
   * updates xml_catalog with entries from other servers.
287
   */
288
  private void updateCatalog(Hashtable serverList, Connection conn)
289
  {
290
    try
291
    {
292
      String server;
293
      Enumeration keys = serverList.keys();
294
      while(keys.hasMoreElements())
295
      { //go through each server
296
        server = (String)(keys.nextElement());
297
        URL u = new URL("http://" + server + "?action=getcatalog");
298
        System.out.println("sending message " + u.toString());
299
        String catxml = MetacatReplication.getURLContent(u);
300
        //System.out.println("catxml: " + catxml);
301
        CatalogMessageHandler cmh = new CatalogMessageHandler();
302
        XMLReader catparser = initParser(cmh);
303
        catparser.parse(new InputSource(new StringReader(catxml)));
304
        //parse the returned catalog xml and put it into a vector
305
        Vector remoteCatalog = cmh.getCatalogVect();
306
        
307
        String localcatxml = MetacatReplication.getCatalogXML();
308
        //System.out.println("localcatxml: " + localcatxml);
309
        cmh = new CatalogMessageHandler();
310
        catparser = initParser(cmh);
311
        catparser.parse(new InputSource(new StringReader(localcatxml)));
312
        Vector localCatalog = cmh.getCatalogVect();
313
        
314
        //now we have the catalog from the remote server and this local server
315
        //we now need to compare the two and merge the differences.
316
        //the comparison is base on the public_id fields which is the 4th
317
        //entry in each row vector.
318
        Vector publicId = new Vector();
319
        for(int i=0; i<localCatalog.size(); i++)
320
        {
321
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
322
          //System.out.println("v1: " + v.toString());
323
          publicId.add(new String((String)v.elementAt(3)));
324
          //System.out.println("adding " + (String)v.elementAt(3));
325
        }
326
        
327
        for(int i=0; i<remoteCatalog.size(); i++)
328
        {
329
          Vector v = (Vector)remoteCatalog.elementAt(i);
330
          //System.out.println("v2: " + v.toString());
331
          //System.out.println("i: " + i);
332
          //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
333
          //System.out.println("publicID: " + publicId.toString());
334
          //System.out.println("v.elementAt(3): " + (String)v.elementAt(3));
335
          if(!publicId.contains(v.elementAt(3)))
336
          { //so we don't have this public id in our local table so we need to
337
            //add it.
338
            //System.out.println("in if");
339
            StringBuffer sql = new StringBuffer();
340
            sql.append("insert into xml_catalog (entry_type, source_doctype, ");
341
            sql.append("target_doctype, public_id, system_id) values (?,?,?,");
342
            sql.append("?,?)");
343
            //System.out.println("sql: " + sql.toString());
344
            PreparedStatement pstmt = conn.prepareStatement(sql.toString());
345
            pstmt.setString(1, (String)v.elementAt(0));
346
            pstmt.setString(2, (String)v.elementAt(1));
347
            pstmt.setString(3, (String)v.elementAt(2));
348
            pstmt.setString(4, (String)v.elementAt(3));
349
            pstmt.setString(5, (String)v.elementAt(4));
350
            pstmt.execute();
351
            pstmt.close();
352
          }
353
        }
354
      }
355
    }
356
    catch(Exception e)
357
    {
358
      System.out.println("error in ReplicationHandler.updateCatalog: " + 
359
                          e.getMessage());
360
      e.printStackTrace(System.out);
361
    }
362
  }
363
  
364
  /**
365
   * Method that returns true if docid has already been "deleted" from metacat.
366
   * This method really implements a truth table for deleted documents
367
   * The table is (a docid in one of the tables is represented by the X):
368
   * xml_docs      xml_revs      deleted?
369
   * ------------------------------------
370
   *   X             X             FALSE
371
   *   X             _             FALSE
372
   *   _             X             TRUE
373
   *   _             _             TRUE
374
   */
375
  private static boolean alreadyDeleted(String docid, Connection conn)
376
  {
377
    try
378
    {
379
      boolean xml_docs = false;
380
      boolean xml_revs = false;
381
      
382
      StringBuffer sb = new StringBuffer();
383
      sb.append("select docid from xml_revisions where docid like '");
384
      sb.append(docid).append("'");
385
      PreparedStatement pstmt = conn.prepareStatement(sb.toString());
386
      pstmt.execute();
387
      ResultSet rs = pstmt.getResultSet();
388
      boolean tablehasrows = rs.next();
389
      if(tablehasrows)
390
      {
391
        xml_revs = true;
392
      }
393
      
394
      sb = new StringBuffer();
395
      sb.append("select docid from xml_documents where docid like '");
396
      sb.append(docid).append("'");
397
      pstmt.close();
398
      pstmt = conn.prepareStatement(sb.toString());
399
      pstmt.execute();
400
      rs = pstmt.getResultSet();
401
      tablehasrows = rs.next();
402
      pstmt.close();
403
      if(tablehasrows)
404
      {
405
        xml_docs = true;
406
      }
407
      
408
      if(xml_docs && xml_revs)
409
      {
410
        return false;
411
      }
412
      else if(xml_docs && !xml_revs)
413
      {
414
        return false;
415
      }
416
      else if(!xml_docs && xml_revs)
417
      {
418
        return true;
419
      }
420
      else if(!xml_docs && !xml_revs)
421
      {
422
        return true;
423
      }
424
    }
425
    catch(Exception e)
426
    {
427
      System.out.println("error in ReplicationHandler.alreadyDeleted: " + 
428
                          e.getMessage());
429
      e.printStackTrace(System.out);
430
    }
431
    finally
432
    {
433
      
434
    }
435
    return false;
436
  }
437
  
438
  /**
439
   * Checks to see if a document is already in the DB.  Returns
440
   * "UPDATE" if it is, "INSERT" if it isn't
441
   */
442
  private static String getAction(String docid)
443
  {
444
    try
445
    {
446
      MetaCatUtil util = new MetaCatUtil();
447
      StringBuffer sql = new StringBuffer();
448
      sql.append("select docid from xml_documents where docid like '");
449
      sql.append(docid).append("'");
450
      Connection conn = util.openDBConnection();
451
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
452
      pstmt.execute();
453
      ResultSet rs = pstmt.getResultSet();
454

    
455
      if(rs.next())
456
      {
457
        pstmt.close();
458
        conn.close();
459
        return "UPDATE";
460
      }
461
      else
462
      {
463
        pstmt.close();
464
        conn.close();
465
        return "INSERT";
466
      }
467
    }
468
    catch(Exception e)
469
    {
470
      System.out.println("error in replicationHandler.getAction: " + 
471
                          e.getMessage());
472
    }
473
    return "";
474
  }
475
  
476
  /**
477
   * Method to query xml_replication and build a hashtable of each server
478
   * and it's last update time.
479
   * @param conn a connection to the database
480
   */
481
  public static Hashtable buildServerList(Connection conn)
482
  {
483
    Hashtable sl = new Hashtable();
484
    PreparedStatement pstmt;   
485
    try
486
    {
487
      pstmt = conn.prepareStatement("select server, last_checked, replicate " +
488
                                    "from xml_replication");
489
      pstmt.execute();
490
      ResultSet rs = pstmt.getResultSet();
491
      boolean tableHasRows = rs.next();
492
      while(tableHasRows)
493
      {
494
        if(rs.getInt(3) == 1)
495
        {//only put the server in the list if the replicate flag is true
496
          String server = rs.getString(1);
497
          String last_checked = rs.getString(2);
498
          if(!server.equals("localhost"))
499
          {
500
            sl.put(server, last_checked);
501
          }
502
        }
503
        tableHasRows = rs.next();   
504
      }
505
      pstmt.close();
506
    }
507
    catch(Exception e)
508
    {
509
      System.out.println("error in replicationHandler.buildServerList(): " +
510
                         e.getMessage());
511
    }
512
    return sl;
513
  }
514
  
515
  /**
516
   * Method to initialize the message parser
517
   */
518
  public static XMLReader initParser(DefaultHandler dh)
519
          throws Exception
520
  {
521
    XMLReader parser = null;
522

    
523
    try {
524
      ContentHandler chandler = dh;
525

    
526
      // Get an instance of the parser
527
      MetaCatUtil util = new MetaCatUtil();
528
      String parserName = util.getOption("saxparser");
529
      parser = XMLReaderFactory.createXMLReader(parserName);
530

    
531
      // Turn off validation
532
      parser.setFeature("http://xml.org/sax/features/validation", false);
533
      
534
      parser.setContentHandler((ContentHandler)chandler);
535
      parser.setErrorHandler((ErrorHandler)chandler);
536

    
537
    } catch (Exception e) {
538
      throw e;
539
    }
540

    
541
    return parser;
542
  }
543
}
(42-42/43)