Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: tao $'
10
 *     '$Date: 2005-11-01 17:01:21 -0800 (Tue, 01 Nov 2005) $'
11
 * '$Revision: 2713 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27

    
28
package edu.ucsb.nceas.metacat;
29

    
30
import edu.ucsb.nceas.dbadapter.AbstractDatabase;
31
import java.sql.*;
32
import java.util.*;
33
import java.util.Date;
34
import java.lang.Thread;
35
import java.io.*;
36
import java.net.*;
37
import java.text.*;
38

    
39
import org.apache.log4j.Logger;
40
import org.xml.sax.AttributeList;
41
import org.xml.sax.ContentHandler;
42
import org.xml.sax.DTDHandler;
43
import org.xml.sax.EntityResolver;
44
import org.xml.sax.ErrorHandler;
45
import org.xml.sax.InputSource;
46
import org.xml.sax.XMLReader;
47
import org.xml.sax.SAXException;
48
import org.xml.sax.SAXParseException;
49
import org.xml.sax.helpers.XMLReaderFactory;
50
import org.xml.sax.helpers.DefaultHandler;
51

    
52

    
53

    
54
/**
55
 * This class handles deltaT replication checking.  Whenever this TimerTask
56
 * is fired it checks each server in xml_replication for updates and updates
57
 * the local db as needed.
58
 */
59
public class ReplicationHandler extends TimerTask
60
{
61
  int serverCheckCode = 1;
62
  MetaCatUtil util = new MetaCatUtil();
63
  ReplicationServerList serverList = null;
64
  //PrintWriter out;
65
  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
66
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
67
 
68
  
69
  public ReplicationHandler()
70
  {
71
    //this.out = o;
72
    serverList = new ReplicationServerList();
73
  }
74

    
75
  public ReplicationHandler(int serverCheckCode)
76
  {
77
    //this.out = o;
78
    this.serverCheckCode = serverCheckCode;
79
    serverList = new ReplicationServerList();
80
  }
81

    
82
  /**
83
   * Method that implements TimerTask.run().  It runs whenever the timer is
84
   * fired.
85
   */
86
  public void run()
87
  {
88
    //find out the last_checked time of each server in the server list and
89
    //send a query to each server to see if there are any documents in
90
    //xml_documents with an update_date > last_checked
91
    try
92
    {
93
      //if serverList is null, metacat don't need to replication
94
      if (serverList==null||serverList.isEmpty())
95
      {
96
        return;
97
      }
98
      updateCatalog();
99
      update();
100
      //conn.close();
101
    }//try
102
    catch (Exception e)
103
    {
104
      logMetacat.error("Error in replicationHandler.run():"
105
                                                    +e.getMessage());
106
      e.printStackTrace();
107
    }//catch
108
  }
109

    
110
  /**
111
   * Method that uses revision taging for replication instead of update_date.
112
   */
113
  private void update()
114
  {
115
    /*
116
     Pseudo-algorithm
117
     - request a doc list from each server in xml_replication
118
     - check the rev number of each of those documents agains the
119
       documents in the local database
120
     - pull any documents that have a lesser rev number on the local server
121
       from the remote server
122
     - delete any documents that still exist in the local xml_documents but
123
       are in the deletedDocuments tag of the remote host response.
124
     - update last_checked to keep track of the last time it was checked.
125
       (this info is theoretically not needed using this system but probably
126
       should be kept anyway)
127
    */
128

    
129
    ReplicationServer replServer = null; // Variable to store the
130
                                        // ReplicationServer got from
131
                                        // Server list
132
    String server = null; // Variable to store server name
133
    String update;
134
    ReplMessageHandler message = new ReplMessageHandler();
135
    Vector responses = new Vector();
136
    XMLReader parser;
137
    URL u;
138

    
139

    
140
    try
141
    {
142
      parser = initParser(message);
143
    }
144
    catch (Exception e)
145
    {
146
      MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
147
                                 " initParser for message and" +e.getMessage());
148
      logMetacat.error("Failed to replicate becaue couldn't " +
149
                            " initParser for message and " +e.getMessage());
150
       // stop replication
151
       return;
152
    }
153
    //Check for every server in server list to get updated list and put
154
    // them in to response
155
    for (int i=0; i<serverList.size(); i++)
156
    {
157
        // Get ReplicationServer object from server list
158
        replServer = serverList.serverAt(i);
159
        // Get server name from ReplicationServer object
160
        server = replServer.getServerName().trim();
161
        String result = null;
162
        MetacatReplication.replLog("full update started to: " + server);
163
        // Send command to that server to get updated docid information
164
        try
165
        {
166
          u = new URL("https://" + server + "?server="
167
          +util.getLocalReplicationServerName()+"&action=update");
168
          logMetacat.info("Sending infomation " +u.toString());
169
          result = MetacatReplication.getURLContent(u);
170
        }
171
        catch (Exception e)
172
        {
173
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
174
                          "for server " + server + " because "+e.getMessage());
175
          logMetacat.error( "Failed to get updated doc list "+
176
                       "for server " + server + " because "+e.getMessage());
177
          continue;
178
        }
179

    
180
        logMetacat.info("docid: "+server+" "+result);
181
        //check if result have error or not, if has skip it.
182
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
183
        {
184
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
185
                          "for server " + server + " because "+result);
186
          logMetacat.info( "Failed to get updated doc list "+
187
                       "for server " + server + " because "+result);
188
          continue;
189
        }
190
        //Add result to vector
191
        responses.add(result);
192
    }
193

    
194
    //make sure that there is updated file list
195
    //If response is null, metacat don't need do anything
196
    if (responses==null || responses.isEmpty())
197
    {
198
        MetacatReplication.replErrorLog("No updated doc list for "+
199
                           "every server and failed to replicate");
200
        logMetacat.info( "No updated doc list for "+
201
                           "every server and failed to replicate");
202
        return;
203
    }
204

    
205

    
206
    logMetacat.info("Responses from remote metacat about updated "+
207
                   "document information: "+ responses.toString());
208
    // go through response vector(it contains updated vector and delete vector
209
    for(int i=0; i<responses.size(); i++)
210
    {
211
        try
212
        {
213
          parser.parse(new InputSource(
214
                     new StringReader(
215
                     (String)(responses.elementAt(i)))));
216
        }
217
        catch(Exception e)
218
        {
219
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
220
                           "because "+ e.getMessage());
221
          logMetacat.error("Couldn't parse one responses "+
222
                                   "because "+ e.getMessage());
223
          continue;
224
        }
225
        //v is the list of updated documents
226
        Vector updateList = new Vector(message.getUpdatesVect());
227
        //System.out.println("v: " + v.toString());
228
        //d is the list of deleted documents
229
        Vector deleteList = new Vector(message.getDeletesVect());
230
        //System.out.println("d: " + d.toString());
231
        logMetacat.info("Update vector size: "+ updateList.size());
232
        logMetacat.info("Delete vector size: "+ deleteList.size());
233
        // go though every element in updated document vector
234
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
235
        //handle deleted docs
236
        for(int k=0; k<deleteList.size(); k++)
237
        { //delete the deleted documents;
238
          Vector w = new Vector((Vector)deleteList.elementAt(k));
239
          String docId = (String)w.elementAt(0);
240
          try
241
          {
242
            handleDeleteSingleDocument(docId, server);
243
          }
244
          catch (Exception ee)
245
          {
246
            continue;
247
          }
248
        }//for delete docs
249
        
250
        // handle replicate doc in xml_revision
251
        Vector revisionList = new Vector(message.getRevisionsVect());
252
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
253
    }//for response
254

    
255
    //updated last_checked
256
    for (int i=0;i<serverList.size(); i++)
257
    {
258
       // Get ReplicationServer object from server list
259
       replServer = serverList.serverAt(i);
260
       try
261
       {
262
         updateLastCheckTimeForSingleServer(replServer);
263
       }
264
       catch(Exception e)
265
       {
266
         continue;
267
       }
268
    }//for
269

    
270
  }//update
271

    
272
  /* Handle replicate single xml document*/
273
  private void handleSingleXMLDocument(String remoteserver, String actions,
274
                                       String accNumber, String tableName)
275
               throws Exception
276
  {
277
    DBConnection dbConn = null;
278
    int serialNumber = -1;
279
    try
280
    {
281
      // Get DBConnection from pool
282
      dbConn=DBConnectionPool.
283
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
284
      serialNumber=dbConn.getCheckOutSerialNumber();
285
      //if the document needs to be updated or inserted, this is executed
286
      String readDocURLString = "https://" + remoteserver + "?server="+
287
              util.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
288
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
289
      URL u = new URL(readDocURLString);
290

    
291
      // Get docid content
292
      String newxmldoc = MetacatReplication.getURLContent(u);
293
      // If couldn't get skip it
294
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
295
      {
296
         throw new Exception(newxmldoc);
297
      }
298
      //logMetacat.info("xml documnet:");
299
      //logMetacat.info(newxmldoc);
300

    
301
      // Try get the docid info from remote server
302
      DocInfoHandler dih = new DocInfoHandler();
303
      XMLReader docinfoParser = initParser(dih);
304
      String docInfoURLStr = "https://" + remoteserver +
305
                       "?server="+util.getLocalReplicationServerName()+
306
                       "&action=getdocumentinfo&docid="+accNumber;
307
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
308
      URL docinfoUrl = new URL(docInfoURLStr);
309
      logMetacat.info("Sending message: " +
310
                                                  docinfoUrl.toString());
311
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
312
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
313
      Hashtable docinfoHash = dih.getDocInfo();
314
      // Get home server of the docid
315
      String docHomeServer = (String)docinfoHash.get("home_server");
316
      logMetacat.info("doc home server in repl: "+docHomeServer);
317
      String createdDate = (String)docinfoHash.get("date_created");
318
      String updatedDate = (String)docinfoHash.get("date_updated");
319
      //docid should include rev number too
320
      /*String accnum=docId+util.getOption("accNumSeparator")+
321
                                              (String)docinfoHash.get("rev");*/
322
      logMetacat.info("docid in repl: "+accNumber);
323
      String docType = (String)docinfoHash.get("doctype");
324
      logMetacat.info("doctype in repl: "+docType);
325

    
326
      String parserBase = null;
327
      // this for eml2 and we need user eml2 parser
328
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
329
      {
330
         parserBase = DocumentImpl.EML200;
331
      }
332
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
333
      {
334
        parserBase = DocumentImpl.EML200;
335
      }
336
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
337
      {
338
        parserBase = DocumentImpl.EML210;
339
      }
340
      // Write the document into local host
341
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
342
      String newDocid = wrapper.writeReplication(dbConn,
343
                              new StringReader(newxmldoc),
344
                              (String)docinfoHash.get("public_access"),
345
                              null,  /* the dtd text */
346
                              actions,
347
                              accNumber,
348
                              (String)docinfoHash.get("user_owner"),
349
                              null, /* null for groups[] */
350
                              docHomeServer,
351
                              remoteserver, tableName, true,// true is for time replication 
352
                              createdDate,
353
                              updatedDate);
354
      logMetacat.info("Successfully replicated doc " + accNumber);
355
      MetacatReplication.replLog("wrote doc " + accNumber + " from " +
356
                                         remoteserver);
357

    
358
    }//try
359
    catch(Exception e)
360
    {
361
      MetacatReplication.replErrorLog("Failed to write doc " + accNumber +
362
                                      " into db because " +e.getMessage());
363
      logMetacat.error("Failed to write doc " + accNumber +
364
                                      " into db because " +e.getMessage());
365
      throw e;
366
    }
367
    finally
368
    {
369
       //return DBConnection
370
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
371
    }//finally
372
  }
373

    
374

    
375

    
376
  /* Handle replicate single xml document*/
377
  private void handleSingleDataFile(String remoteserver, String actions,
378
                                    String accNumber, String tableName)
379
               throws Exception
380
  {
381
    logMetacat.info("Try to replicate data file: "+accNumber);
382
    DBConnection dbConn = null;
383
    int serialNumber = -1;
384
    try
385
    {
386
      // Get DBConnection from pool
387
      dbConn=DBConnectionPool.
388
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
389
      serialNumber=dbConn.getCheckOutSerialNumber();
390
      // Try get docid info from remote server
391
      DocInfoHandler dih = new DocInfoHandler();
392
      XMLReader docinfoParser = initParser(dih);
393
      String docInfoURLString = "https://" + remoteserver +
394
                  "?server="+util.getLocalReplicationServerName()+
395
                  "&action=getdocumentinfo&docid="+accNumber;
396
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
397
      URL docinfoUrl = new URL(docInfoURLString);
398

    
399
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
400
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
401
      Hashtable docinfoHash = dih.getDocInfo();
402
      // Get doicd owner
403
      String user = (String)docinfoHash.get("user_owner");
404
      // Get docid name (such as acl or dataset)
405
      String docName = (String)docinfoHash.get("docname");
406
      // Get doc type (eml public id)
407
      String docType = (String)docinfoHash.get("doctype");
408
      // Get docid home sever. it might be different to remoteserver
409
      // becuause of hub feature
410
      String docHomeServer = (String)docinfoHash.get("home_server");
411
      String createdDate = (String)docinfoHash.get("date_created");
412
      String updatedDate = (String)docinfoHash.get("date_updated");
413
      //docid should include rev number too
414
      /*String accnum=docId+util.getOption("accNumSeparator")+
415
                                              (String)docinfoHash.get("rev");*/
416

    
417

    
418
      String datafilePath = util.getOption("datafilepath");
419
      // Get data file content
420
      String readDataURLString = "https://" + remoteserver + "?server="+
421
                                        util.getLocalReplicationServerName()+
422
                                            "&action=readdata&docid="+accNumber;
423
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
424
      URL u = new URL(readDataURLString);
425
      InputStream input = u.openStream();
426
      //register data file into xml_documents table and wite data file
427
      //into file system
428
      if ( input != null)
429
      {
430
        DocumentImpl.writeDataFileInReplication(input,
431
                                                datafilePath,
432
                                                docName,docType,
433
                                                accNumber, user,
434
                                                docHomeServer,
435
                                                remoteserver,
436
                                                tableName,
437
                                                true, //true means timed replication
438
                                                createdDate,
439
                                                updatedDate);
440
                                         
441
        logMetacat.info("Successfully to write datafile " + accNumber);
442
        MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
443
                                    remoteserver);
444
      }//if
445
      else
446
      {
447
         logMetacat.info("Couldn't open the data file: " + accNumber);
448
         throw new Exception("Couldn't open the data file: " + accNumber);
449
      }//else
450

    
451
    }//try
452
    catch(Exception e)
453
    {
454
      MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
455
                                      " because " +e.getMessage());
456
      logMetacat.error("Failed to try wrote datafile " + accNumber +
457
                                      " because " +e.getMessage());
458
      throw e;
459
    }
460
    finally
461
    {
462
       //return DBConnection
463
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
464
    }//finally
465
  }
466

    
467

    
468

    
469
  /* Handle delete single document*/
470
  private void handleDeleteSingleDocument(String docId, String notifyServer)
471
               throws Exception
472
  {
473
    logMetacat.info("Try delete doc: "+docId);
474
    DBConnection dbConn = null;
475
    int serialNumber = -1;
476
    try
477
    {
478
      // Get DBConnection from pool
479
      dbConn=DBConnectionPool.
480
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
481
      serialNumber=dbConn.getCheckOutSerialNumber();
482
      if(!alreadyDeleted(docId))
483
      {
484

    
485
         //because delete method docid should have rev number
486
         //so we just add one for it. This rev number is no sence.
487
         String accnum=docId+util.getOption("accNumSeparator")+"1";
488
         //System.out.println("accnum: "+accnum);
489
         DocumentImpl.delete(accnum, null, null, notifyServer);
490
         logMetacat.info("Successfully deleted doc " + docId);
491
         MetacatReplication.replLog("Doc " + docId + " deleted");
492
      }
493

    
494
    }//try
495
    catch(Exception e)
496
    {
497
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
498
                                      " in db because " +e.getMessage());
499
      logMetacat.error("Failed to delete doc " + docId +
500
                                 " in db because because " + e.getMessage());
501
      throw e;
502
    }
503
    finally
504
    {
505
       //return DBConnection
506
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
507
    }//finally
508
  }
509

    
510
  /* Handle updateLastCheckTimForSingleServer*/
511
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
512
                                                  throws Exception
513
  {
514
    String server = repServer.getServerName();
515
    DBConnection dbConn = null;
516
    int serialNumber = -1;
517
    PreparedStatement pstmt = null;
518
    try
519
    {
520
      // Get DBConnection from pool
521
      dbConn=DBConnectionPool.
522
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
523
      serialNumber=dbConn.getCheckOutSerialNumber();
524

    
525
      logMetacat.info("Try to update last_check for server: "+server);
526
      // Get time from remote server
527
      URL dateurl = new URL("https://" + server + "?server="+
528
      util.getLocalReplicationServerName()+"&action=gettime");
529
      String datexml = MetacatReplication.getURLContent(dateurl);
530
      logMetacat.info("datexml: "+datexml);
531
      if (datexml!=null && !datexml.equals(""))
532
      {
533
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
534
         StringBuffer sql = new StringBuffer();
535
         /*sql.append("update xml_replication set last_checked = to_date('");
536
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
537
         sql.append("server like '").append(server).append("'");*/
538
         sql.append("update xml_replication set last_checked = ");
539
         sql.append(dbAdapter.toDate(datestr, "MM/DD/YY HH24:MI:SS"));
540
         sql.append(" where server like '").append(server).append("'");
541
         pstmt = dbConn.prepareStatement(sql.toString());
542

    
543
         pstmt.executeUpdate();
544
         dbConn.commit();
545
         pstmt.close();
546
         logMetacat.info("last_checked updated to "+datestr+" on "
547
                                      + server);
548
      }//if
549
      else
550
      {
551

    
552
         logMetacat.info("Failed to update last_checked for server "  +
553
                                  server + " in db because couldn't get time "
554
                                  );
555
         throw new Exception("Couldn't get time for server "+ server);
556
      }
557

    
558
    }//try
559
    catch(Exception e)
560
    {
561

    
562
      logMetacat.error("Failed to update last_checked for server " +
563
                                server + " in db because because " +
564
                                e.getMessage());
565
      throw e;
566
    }
567
    finally
568
    {
569
       //return DBConnection
570
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
571
    }//finally
572
  }
573

    
574

    
575

    
576
  /**
577
   * updates xml_catalog with entries from other servers.
578
   */
579
  private void updateCatalog()
580
  {
581
    logMetacat.info("Start of updateCatalog");
582
    // ReplicationServer object in server list
583
    ReplicationServer replServer = null;
584
    PreparedStatement pstmt = null;
585
    String server = null;
586

    
587

    
588
    // Go through each ReplicationServer object in sererlist
589
    for (int j=0; j<serverList.size(); j++)
590
    {
591
      Vector remoteCatalog = new Vector();
592
      Vector publicId = new Vector();
593
      try
594
      {
595
        // Get ReplicationServer object from server list
596
        replServer = serverList.serverAt(j);
597
        // Get server name from the ReplicationServer object
598
        server = replServer.getServerName();
599
        // Try to get catalog
600
        URL u = new URL("https://" + server + "?server="+
601
        util.getLocalReplicationServerName()+"&action=getcatalog");
602
        logMetacat.info("sending message " + u.toString());
603
        String catxml = MetacatReplication.getURLContent(u);
604

    
605
        // Make sure there are not error, no empty string
606
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
607
        {
608
          throw new Exception("Couldn't get catalog list form server " +server);
609
        }
610
        logMetacat.info("catxml: " + catxml);
611
        CatalogMessageHandler cmh = new CatalogMessageHandler();
612
        XMLReader catparser = initParser(cmh);
613
        catparser.parse(new InputSource(new StringReader(catxml)));
614
        //parse the returned catalog xml and put it into a vector
615
        remoteCatalog = cmh.getCatalogVect();
616

    
617
        // Makse sure remoteCatalog is not empty
618
        if (remoteCatalog.isEmpty())
619
        {
620
          throw new Exception("Couldn't get catalog list form server " +server);
621
        }
622

    
623
        String localcatxml = MetacatReplication.getCatalogXML();
624

    
625
        // Make sure local catalog is no empty
626
        if (localcatxml==null||localcatxml.equals(""))
627
        {
628
          throw new Exception("Couldn't get catalog list form server " +server);
629
        }
630

    
631
        cmh = new CatalogMessageHandler();
632
        catparser = initParser(cmh);
633
        catparser.parse(new InputSource(new StringReader(localcatxml)));
634
        Vector localCatalog = cmh.getCatalogVect();
635

    
636
        //now we have the catalog from the remote server and this local server
637
        //we now need to compare the two and merge the differences.
638
        //the comparison is base on the public_id fields which is the 4th
639
        //entry in each row vector.
640
        publicId = new Vector();
641
        for(int i=0; i<localCatalog.size(); i++)
642
        {
643
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
644
          logMetacat.info("v1: " + v.toString());
645
          publicId.add(new String((String)v.elementAt(3)));
646
          //System.out.println("adding " + (String)v.elementAt(3));
647
        }
648
      }//try
649
      catch (Exception e)
650
      {
651
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
652
                                    server + " because " +e.getMessage());
653
        logMetacat.error("Failed to update catalog for server "+
654
                                    server + " because " +e.getMessage());
655
      }//catch
656

    
657
      for(int i=0; i<remoteCatalog.size(); i++)
658
      {
659
         // DConnection
660
        DBConnection dbConn = null;
661
        // DBConnection checkout serial number
662
        int serialNumber = -1;
663
        try
664
        {
665
            dbConn=DBConnectionPool.
666
                  getDBConnection("ReplicationHandler.updateCatalog");
667
            serialNumber=dbConn.getCheckOutSerialNumber();
668
            Vector v = (Vector)remoteCatalog.elementAt(i);
669
            //System.out.println("v2: " + v.toString());
670
            //System.out.println("i: " + i);
671
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
672
            //System.out.println("publicID: " + publicId.toString());
673
            logMetacat.info
674
                              ("v.elementAt(3): " + (String)v.elementAt(3));
675
           if(!publicId.contains(v.elementAt(3)))
676
           { //so we don't have this public id in our local table so we need to
677
             //add it.
678
             //System.out.println("in if");
679
             StringBuffer sql = new StringBuffer();
680
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
681
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
682
             sql.append("?,?)");
683
             //System.out.println("sql: " + sql.toString());
684
             pstmt = dbConn.prepareStatement(sql.toString());
685
             pstmt.setString(1, (String)v.elementAt(0));
686
             pstmt.setString(2, (String)v.elementAt(1));
687
             pstmt.setString(3, (String)v.elementAt(2));
688
             pstmt.setString(4, (String)v.elementAt(3));
689
             pstmt.setString(5, (String)v.elementAt(4));
690
             pstmt.execute();
691
             pstmt.close();
692
             MetacatReplication.replLog("Success fully to insert new publicid "+
693
                               (String)v.elementAt(3) + " from server"+server);
694
             logMetacat.info("Success fully to insert new publicid "+
695
                             (String)v.elementAt(3) + " from server" +server);
696
           }
697
        }
698
        catch(Exception e)
699
        {
700
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
701
                                    server + " because " +e.getMessage());
702
           logMetacat.error("Failed to update catalog for server "+
703
                                    server + " because " +e.getMessage());
704
        }//catch
705
        finally
706
        {
707
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
708
        }//finall
709
      }//for remote catalog
710
    }//for server list
711
    logMetacat.info("End of updateCatalog");
712
  }
713

    
714
  /**
715
   * Method that returns true if docid has already been "deleted" from metacat.
716
   * This method really implements a truth table for deleted documents
717
   * The table is (a docid in one of the tables is represented by the X):
718
   * xml_docs      xml_revs      deleted?
719
   * ------------------------------------
720
   *   X             X             FALSE
721
   *   X             _             FALSE
722
   *   _             X             TRUE
723
   *   _             _             TRUE
724
   */
725
  private static boolean alreadyDeleted(String docid) throws Exception
726
  {
727
    DBConnection dbConn = null;
728
    int serialNumber = -1;
729
    PreparedStatement pstmt = null;
730
    try
731
    {
732
      dbConn=DBConnectionPool.
733
                  getDBConnection("ReplicationHandler.alreadyDeleted");
734
      serialNumber=dbConn.getCheckOutSerialNumber();
735
      boolean xml_docs = false;
736
      boolean xml_revs = false;
737

    
738
      StringBuffer sb = new StringBuffer();
739
      sb.append("select docid from xml_revisions where docid like '");
740
      sb.append(docid).append("'");
741
      pstmt = dbConn.prepareStatement(sb.toString());
742
      pstmt.execute();
743
      ResultSet rs = pstmt.getResultSet();
744
      boolean tablehasrows = rs.next();
745
      if(tablehasrows)
746
      {
747
        xml_revs = true;
748
      }
749

    
750
      sb = new StringBuffer();
751
      sb.append("select docid from xml_documents where docid like '");
752
      sb.append(docid).append("'");
753
      pstmt.close();
754
      pstmt = dbConn.prepareStatement(sb.toString());
755
      //increase usage count
756
      dbConn.increaseUsageCount(1);
757
      pstmt.execute();
758
      rs = pstmt.getResultSet();
759
      tablehasrows = rs.next();
760
      pstmt.close();
761
      if(tablehasrows)
762
      {
763
        xml_docs = true;
764
      }
765

    
766
      if(xml_docs && xml_revs)
767
      {
768
        return false;
769
      }
770
      else if(xml_docs && !xml_revs)
771
      {
772
        return false;
773
      }
774
      else if(!xml_docs && xml_revs)
775
      {
776
        return true;
777
      }
778
      else if(!xml_docs && !xml_revs)
779
      {
780
        return true;
781
      }
782
    }
783
    catch(Exception e)
784
    {
785
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
786
                          e.getMessage());
787
      throw e;
788
    }
789
    finally
790
    {
791
      try
792
      {
793
        pstmt.close();
794
      }//try
795
      catch (SQLException ee)
796
      {
797
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
798
                          "to close pstmt: "+ee.getMessage());
799
        throw ee;
800
      }//catch
801
      finally
802
      {
803
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
804
      }//finally
805
    }//finally
806
    return false;
807
  }
808

    
809

    
810
  /**
811
   * Method to initialize the message parser
812
   */
813
  public static XMLReader initParser(DefaultHandler dh)
814
          throws Exception
815
  {
816
    XMLReader parser = null;
817

    
818
    try {
819
      ContentHandler chandler = dh;
820

    
821
      // Get an instance of the parser
822
      MetaCatUtil util = new MetaCatUtil();
823
      String parserName = util.getOption("saxparser");
824
      parser = XMLReaderFactory.createXMLReader(parserName);
825

    
826
      // Turn off validation
827
      parser.setFeature("http://xml.org/sax/features/validation", false);
828

    
829
      parser.setContentHandler((ContentHandler)chandler);
830
      parser.setErrorHandler((ErrorHandler)chandler);
831

    
832
    } catch (Exception e) {
833
      throw e;
834
    }
835

    
836
    return parser;
837
  }
838

    
839
  /**
840
   * This method will combinate given time string(in short format) to
841
   * current date. If the given time (e.g 10:00 AM) passed the current time
842
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
843
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
844
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
845
   * 10:00 AM Aug 21, 2005.
846
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
847
   * @return
848
   * @throws Exception
849
   */
850
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
851
  {
852
     Date givenDate = parseTime(givenTime);
853
     Date newDate = null;
854
     Date now = new Date();
855
     String currentTimeString = getTimeString(now);
856
     Date currentTime = parseTime(currentTimeString); 
857
     if ( currentTime.getTime() >= givenDate.getTime())
858
     {
859
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
860
        String dateAndTime = getDateString(now) + " " + givenTime;
861
        Date combinationDate = parseDateTime(dateAndTime);
862
        // new date should plus 24 hours to make is the second day
863
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
864
     }
865
     else
866
     {
867
         logMetacat.info("Today haven't pass the given time, we should it as today");
868
         String dateAndTime = getDateString(now) + " " + givenTime;
869
         newDate = parseDateTime(dateAndTime);
870
     }
871
     logMetacat.warn("final setting time is "+ newDate.toString());
872
     return newDate;
873
  }
874

    
875
  /*
876
   * parse a given string to Time in short format.
877
   * For example, given time is 10:00 AM, the date will be return as
878
   * Jan 1 1970, 10:00 AM
879
   */
880
  private static Date parseTime(String timeString) throws Exception
881
  {
882
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
883
    Date time = format.parse(timeString); 
884
    logMetacat.info("Date string is after parse a time string "
885
                              +time.toString());
886
    return time;
887

    
888
  }
889
  
890
  /*
891
   * Pasre a given string to date and time. Date format is long and time
892
   * format is short.
893
   */
894
  private static Date parseDateTime(String timeString) throws Exception
895
  {
896
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
897
    Date time = format.parse(timeString);
898
    logMetacat.info("Date string is after parse a time string "+
899
                             time.toString());
900
    return time;
901
  }
902
  
903
  /*
904
   * Get a date string from a Date object. The date format will be long
905
   */
906
  private static String getDateString(Date now)
907
  {
908
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
909
     String s = df.format(now);
910
     logMetacat.info("Today is " + s);
911
     return s;
912
  }
913
  
914
  /*
915
   * Get a time string from a Date object, the time format will be short
916
   */
917
  private static String getTimeString(Date now)
918
  {
919
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
920
     String s = df.format(now);
921
     logMetacat.info("Time is " + s);
922
     return s;
923
  }
924
  
925
  
926
  /*
927
   * This method will go through the docid list both in xml_Documents table
928
   * and in xml_revisions table
929
   * @author tao
930
   */
931
   private void handleDocList(Vector docList, String tableName)
932
   {
933
       boolean dataFile=false;
934
       for(int j=0; j<docList.size(); j++)
935
       {
936
         //initial dataFile is false
937
         dataFile=false;
938
         //w is information for one document, information contain
939
         //docid, rev, server or datafile.
940
         Vector w = new Vector((Vector)(docList.elementAt(j)));
941
         //Check if the vector w contain "datafile"
942
         //If it has, this document is data file
943
         if (w.contains((String)util.getOption("datafileflag")))
944
         {
945
           dataFile=true;
946
         }
947
         //System.out.println("w: " + w.toString());
948
         // Get docid
949
         String docid = (String)w.elementAt(0);
950
         logMetacat.info("docid: " + docid);
951
         // Get revision number
952
         int rev = Integer.parseInt((String)w.elementAt(1));
953
         logMetacat.info("rev: " + rev);
954
         // Get remote server name (it is may not be doc homeserver because
955
         // the new hub feature
956
         String remoteServer = (String)w.elementAt(2);
957
         remoteServer = remoteServer.trim();
958

    
959
         try
960
         {
961
             if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
962
             {
963
                 handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
964
             }
965
             else if (tableName.equals(DocumentImpl.REVISIONTABLE))
966
             {
967
                 handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
968
             }
969
             else
970
             {
971
                 continue;
972
             }
973
                 
974
         }
975
         catch (Exception e)
976
         {
977
             logMetacat.error("error to handle update doc in "+ 
978
                     tableName +" in time replication"+e.getMessage());
979
             continue;
980
         }
981
        
982
       }//for update docs
983

    
984
   }
985
   
986
   /*
987
    * This method will handle doc in xml_documents table.
988
    */
989
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
990
                                        throws Exception
991
   {
992
       // compare the update rev and local rev to see what need happen
993
       int localrev = -1;
994
       String action = null;
995
       boolean flag = false;
996
       try
997
       {
998
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
999
       }
1000
       catch (SQLException e)
1001
       {
1002
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1003
                                " be found because " + e.getMessage());
1004
         MetacatReplication.replErrorLog("Docid "+ docid + " could not be "+
1005
                 "written because error happend to find it's local revision");
1006
         throw new Exception (e.getMessage());
1007
       }
1008
       logMetacat.info("Local rev for docid "+ docid + " is "+
1009
                               localrev);
1010

    
1011
       //check the revs for an update because this document is in the
1012
       //local DB, it might be out of date.
1013
       if (localrev == -1)
1014
       {
1015
          // check if the revision is in the revision table
1016
         Vector localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1017
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1018
         {
1019
             // this version was deleted, so don't need replicate
1020
             flag = false;
1021
         }
1022
         else
1023
         {
1024
           //insert this document as new because it is not in the local DB
1025
           action = "INSERT";
1026
           flag = true;
1027
         }
1028
       }
1029
       else
1030
       {
1031
         if(localrev == rev)
1032
         {
1033
           // Local meatacat has the same rev to remote host, don't need
1034
           // update and flag set false
1035
           flag = false;
1036
         }
1037
         else if(localrev < rev)
1038
         {
1039
           //this document needs to be updated so send an read request
1040
           action = "UPDATE";
1041
           flag = true;
1042
         }
1043
       }
1044
       
1045
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1046
       // this is non-data file
1047
       if(flag && !dataFile)
1048
       {
1049
         try
1050
         {
1051
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1052
         }
1053
         catch(Exception e)
1054
         {
1055
           // skip this document
1056
           throw e;
1057
         }
1058
       }//if for non-data file
1059

    
1060
        // this is for data file
1061
       if(flag && dataFile)
1062
       {
1063
         try
1064
         {
1065
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1066
         }
1067
         catch(Exception e)
1068
         {
1069
           // skip this datafile
1070
           throw e;
1071
         }
1072

    
1073
       }//for datafile
1074
   }
1075
   
1076
   /*
1077
    * This method will handle doc in xml_documents table.
1078
    */
1079
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1080
                                        throws Exception
1081
   {
1082
       // compare the update rev and local rev to see what need happen
1083
       logMetacat.info("In handle repliation revsion table");
1084
       logMetacat.info("the docid is "+ docid);
1085
       logMetacat.info("The rev is "+rev);
1086
       Vector localrev = null;
1087
       String action = "INSERT";
1088
       boolean flag = false;
1089
       try
1090
       {
1091
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1092
       }
1093
       catch (SQLException e)
1094
       {
1095
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1096
                                " be found because " + e.getMessage());
1097
         MetacatReplication.replErrorLog("Docid "+ docid + " could not be "+
1098
                 "written because error happend to find it's local revision");
1099
         throw new Exception (e.getMessage());
1100
       }
1101
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1102
                               localrev.toString());
1103
       
1104
       // if the rev is not in the xml_revision, we need insert it
1105
       if (!localrev.contains(new Integer(rev)))
1106
       {
1107
           flag = true;    
1108
       }
1109
     
1110
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1111
       // this is non-data file
1112
       if(flag && !dataFile)
1113
       {
1114
         try
1115
         {
1116
           
1117
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1118
         }
1119
         catch(Exception e)
1120
         {
1121
           // skip this document
1122
           throw e;
1123
         }
1124
       }//if for non-data file
1125

    
1126
        // this is for data file
1127
       if(flag && dataFile)
1128
       {
1129
         try
1130
         {
1131
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1132
         }
1133
         catch(Exception e)
1134
         {
1135
           // skip this datafile
1136
           throw e;
1137
         }
1138

    
1139
       }//for datafile
1140
   }
1141
  
1142
}
1143

    
(57-57/63)