Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: tao $'
10
 *     '$Date: 2005-11-15 16:59:45 -0800 (Tue, 15 Nov 2005) $'
11
 * '$Revision: 2740 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27

    
28
package edu.ucsb.nceas.metacat;
29

    
30
import edu.ucsb.nceas.dbadapter.AbstractDatabase;
31
import java.sql.*;
32
import java.util.*;
33
import java.util.Date;
34
import java.lang.Thread;
35
import java.io.*;
36
import java.net.*;
37
import java.text.*;
38

    
39
import org.apache.log4j.Logger;
40
import org.xml.sax.AttributeList;
41
import org.xml.sax.ContentHandler;
42
import org.xml.sax.DTDHandler;
43
import org.xml.sax.EntityResolver;
44
import org.xml.sax.ErrorHandler;
45
import org.xml.sax.InputSource;
46
import org.xml.sax.XMLReader;
47
import org.xml.sax.SAXException;
48
import org.xml.sax.SAXParseException;
49
import org.xml.sax.helpers.XMLReaderFactory;
50
import org.xml.sax.helpers.DefaultHandler;
51

    
52

    
53

    
54
/**
55
 * This class handles deltaT replication checking.  Whenever this TimerTask
56
 * is fired it checks each server in xml_replication for updates and updates
57
 * the local db as needed.
58
 */
59
public class ReplicationHandler extends TimerTask
60
{
61
  int serverCheckCode = 1;
62
  MetaCatUtil util = new MetaCatUtil();
63
  ReplicationServerList serverList = null;
64
  //PrintWriter out;
65
  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
66
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
67
  private static int DOCINSERTNUMBER = 1;
68
  private static int DOCERRORNUMBER  = 1;
69
  private static int REVINSERTNUMBER = 1;
70
  private static int REVERRORNUMBER  = 1;
71
  public ReplicationHandler()
72
  {
73
    //this.out = o;
74
    serverList = new ReplicationServerList();
75
  }
76

    
77
  public ReplicationHandler(int serverCheckCode)
78
  {
79
    //this.out = o;
80
    this.serverCheckCode = serverCheckCode;
81
    serverList = new ReplicationServerList();
82
  }
83

    
84
  /**
85
   * Method that implements TimerTask.run().  It runs whenever the timer is
86
   * fired.
87
   */
88
  public void run()
89
  {
90
    //find out the last_checked time of each server in the server list and
91
    //send a query to each server to see if there are any documents in
92
    //xml_documents with an update_date > last_checked
93
    try
94
    {
95
      //if serverList is null, metacat don't need to replication
96
      if (serverList==null||serverList.isEmpty())
97
      {
98
        return;
99
      }
100
      updateCatalog();
101
      update();
102
      //conn.close();
103
    }//try
104
    catch (Exception e)
105
    {
106
      logMetacat.error("Error in replicationHandler.run():"
107
                                                    +e.getMessage());
108
      e.printStackTrace();
109
    }//catch
110
  }
111

    
112
  /**
113
   * Method that uses revision taging for replication instead of update_date.
114
   */
115
  private void update()
116
  {
117
    /*
118
     Pseudo-algorithm
119
     - request a doc list from each server in xml_replication
120
     - check the rev number of each of those documents agains the
121
       documents in the local database
122
     - pull any documents that have a lesser rev number on the local server
123
       from the remote server
124
     - delete any documents that still exist in the local xml_documents but
125
       are in the deletedDocuments tag of the remote host response.
126
     - update last_checked to keep track of the last time it was checked.
127
       (this info is theoretically not needed using this system but probably
128
       should be kept anyway)
129
    */
130

    
131
    ReplicationServer replServer = null; // Variable to store the
132
                                        // ReplicationServer got from
133
                                        // Server list
134
    String server = null; // Variable to store server name
135
    String update;
136
    ReplMessageHandler message = new ReplMessageHandler();
137
    Vector responses = new Vector();
138
    XMLReader parser;
139
    URL u;
140

    
141

    
142
    try
143
    {
144
      parser = initParser(message);
145
    }
146
    catch (Exception e)
147
    {
148
      MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
149
                                 " initParser for message and" +e.getMessage());
150
      logMetacat.error("Failed to replicate becaue couldn't " +
151
                            " initParser for message and " +e.getMessage());
152
       // stop replication
153
       return;
154
    }
155
    //Check for every server in server list to get updated list and put
156
    // them in to response
157
    for (int i=0; i<serverList.size(); i++)
158
    {
159
        // Get ReplicationServer object from server list
160
        replServer = serverList.serverAt(i);
161
        // Get server name from ReplicationServer object
162
        server = replServer.getServerName().trim();
163
        String result = null;
164
        MetacatReplication.replLog("full update started to: " + server);
165
        // Send command to that server to get updated docid information
166
        try
167
        {
168
          u = new URL("https://" + server + "?server="
169
          +util.getLocalReplicationServerName()+"&action=update");
170
          logMetacat.info("Sending infomation " +u.toString());
171
          result = MetacatReplication.getURLContent(u);
172
        }
173
        catch (Exception e)
174
        {
175
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
176
                          "for server " + server + " because "+e.getMessage());
177
          logMetacat.error( "Failed to get updated doc list "+
178
                       "for server " + server + " because "+e.getMessage());
179
          continue;
180
        }
181

    
182
        logMetacat.info("docid: "+server+" "+result);
183
        //check if result have error or not, if has skip it.
184
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
185
        {
186
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
187
                          "for server " + server + " because "+result);
188
          logMetacat.info( "Failed to get updated doc list "+
189
                       "for server " + server + " because "+result);
190
          continue;
191
        }
192
        //Add result to vector
193
        responses.add(result);
194
    }
195

    
196
    //make sure that there is updated file list
197
    //If response is null, metacat don't need do anything
198
    if (responses==null || responses.isEmpty())
199
    {
200
        MetacatReplication.replErrorLog("No updated doc list for "+
201
                           "every server and failed to replicate");
202
        logMetacat.info( "No updated doc list for "+
203
                           "every server and failed to replicate");
204
        return;
205
    }
206

    
207

    
208
    logMetacat.info("Responses from remote metacat about updated "+
209
                   "document information: "+ responses.toString());
210
    // go through response vector(it contains updated vector and delete vector
211
    for(int i=0; i<responses.size(); i++)
212
    {
213
        try
214
        {
215
          parser.parse(new InputSource(
216
                     new StringReader(
217
                     (String)(responses.elementAt(i)))));
218
        }
219
        catch(Exception e)
220
        {
221
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
222
                           "because "+ e.getMessage());
223
          logMetacat.error("Couldn't parse one responses "+
224
                                   "because "+ e.getMessage());
225
          continue;
226
        }
227
        //v is the list of updated documents
228
        Vector updateList = new Vector(message.getUpdatesVect());
229
        MetacatReplication.replLog("The document list size is "+updateList.size());
230
        //System.out.println("v: " + v.toString());
231
        //d is the list of deleted documents
232
        Vector deleteList = new Vector(message.getDeletesVect());
233
        //System.out.println("d: " + d.toString());
234
        logMetacat.info("Update vector size: "+ updateList.size());
235
        logMetacat.info("Delete vector size: "+ deleteList.size());
236
        MetacatReplication.replLog("The delete document list size is "+deleteList.size());
237
        // go though every element in updated document vector
238
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
239
        //handle deleted docs
240
        for(int k=0; k<deleteList.size(); k++)
241
        { //delete the deleted documents;
242
          Vector w = new Vector((Vector)deleteList.elementAt(k));
243
          String docId = (String)w.elementAt(0);
244
          try
245
          {
246
            handleDeleteSingleDocument(docId, server);
247
          }
248
          catch (Exception ee)
249
          {
250
            continue;
251
          }
252
        }//for delete docs
253
        
254
        // handle replicate doc in xml_revision
255
        Vector revisionList = new Vector(message.getRevisionsVect());
256
        MetacatReplication.replLog("The revision document list size is "+revisionList.size());
257
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
258
        DOCINSERTNUMBER = 1;
259
        DOCERRORNUMBER  = 1;
260
        REVINSERTNUMBER = 1;
261
        REVERRORNUMBER  = 1;
262
    }//for response
263

    
264
    //updated last_checked
265
    for (int i=0;i<serverList.size(); i++)
266
    {
267
       // Get ReplicationServer object from server list
268
       replServer = serverList.serverAt(i);
269
       try
270
       {
271
         updateLastCheckTimeForSingleServer(replServer);
272
       }
273
       catch(Exception e)
274
       {
275
         continue;
276
       }
277
    }//for
278

    
279
  }//update
280

    
281
  /* Handle replicate single xml document*/
282
  private void handleSingleXMLDocument(String remoteserver, String actions,
283
                                       String accNumber, String tableName)
284
               throws Exception
285
  {
286
    DBConnection dbConn = null;
287
    int serialNumber = -1;
288
    try
289
    {
290
      // Get DBConnection from pool
291
      dbConn=DBConnectionPool.
292
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
293
      serialNumber=dbConn.getCheckOutSerialNumber();
294
      //if the document needs to be updated or inserted, this is executed
295
      String readDocURLString = "https://" + remoteserver + "?server="+
296
              util.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
297
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
298
      URL u = new URL(readDocURLString);
299

    
300
      // Get docid content
301
      String newxmldoc = MetacatReplication.getURLContent(u);
302
      // If couldn't get skip it
303
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
304
      {
305
         throw new Exception(newxmldoc);
306
      }
307
      //logMetacat.info("xml documnet:");
308
      //logMetacat.info(newxmldoc);
309

    
310
      // Try get the docid info from remote server
311
      DocInfoHandler dih = new DocInfoHandler();
312
      XMLReader docinfoParser = initParser(dih);
313
      String docInfoURLStr = "https://" + remoteserver +
314
                       "?server="+util.getLocalReplicationServerName()+
315
                       "&action=getdocumentinfo&docid="+accNumber;
316
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
317
      URL docinfoUrl = new URL(docInfoURLStr);
318
      logMetacat.info("Sending message: " +
319
                                                  docinfoUrl.toString());
320
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
321
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
322
      Hashtable docinfoHash = dih.getDocInfo();
323
      // Get home server of the docid
324
      String docHomeServer = (String)docinfoHash.get("home_server");
325
      logMetacat.info("doc home server in repl: "+docHomeServer);
326
      String createdDate = (String)docinfoHash.get("date_created");
327
      String updatedDate = (String)docinfoHash.get("date_updated");
328
      //docid should include rev number too
329
      /*String accnum=docId+util.getOption("accNumSeparator")+
330
                                              (String)docinfoHash.get("rev");*/
331
      logMetacat.info("docid in repl: "+accNumber);
332
      String docType = (String)docinfoHash.get("doctype");
333
      logMetacat.info("doctype in repl: "+docType);
334

    
335
      String parserBase = null;
336
      // this for eml2 and we need user eml2 parser
337
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
338
      {
339
         parserBase = DocumentImpl.EML200;
340
      }
341
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
342
      {
343
        parserBase = DocumentImpl.EML200;
344
      }
345
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
346
      {
347
        parserBase = DocumentImpl.EML210;
348
      }
349
      // Write the document into local host
350
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
351
      String newDocid = wrapper.writeReplication(dbConn,
352
                              new StringReader(newxmldoc),
353
                              (String)docinfoHash.get("public_access"),
354
                              null,  /* the dtd text */
355
                              actions,
356
                              accNumber,
357
                              (String)docinfoHash.get("user_owner"),
358
                              null, /* null for groups[] */
359
                              docHomeServer,
360
                              remoteserver, tableName, true,// true is for time replication 
361
                              createdDate,
362
                              updatedDate);
363
      logMetacat.info("Successfully replicated doc " + accNumber);
364
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
365
      {
366
        MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
367
                                     " into "+tableName + " from " +
368
                                         remoteserver);
369
        DOCINSERTNUMBER++;
370
      }
371
      else
372
      {
373
          MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
374
                  " into "+tableName + " from " +
375
                      remoteserver);
376
          REVINSERTNUMBER++;
377
      }
378
      
379

    
380
    }//try
381
    catch(Exception e)
382
    {
383
        
384
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
385
        {
386
          MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
387
                                       " into "+tableName + " from " +
388
                                           remoteserver + " because "+e.getMessage());
389
          DOCERRORNUMBER++;
390
        }
391
        else
392
        {
393
            MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
394
                    " into "+tableName + " from " +
395
                        remoteserver +" because "+e.getMessage());
396
            REVERRORNUMBER++;
397
        }
398
       
399
      logMetacat.error("Failed to write doc " + accNumber +
400
                                      " into db because " +e.getMessage());
401
      throw e;
402
    }
403
    finally
404
    {
405
       //return DBConnection
406
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
407
    }//finally
408
  }
409

    
410

    
411

    
412
  /* Handle replicate single xml document*/
413
  private void handleSingleDataFile(String remoteserver, String actions,
414
                                    String accNumber, String tableName)
415
               throws Exception
416
  {
417
    logMetacat.info("Try to replicate data file: "+accNumber);
418
    DBConnection dbConn = null;
419
    int serialNumber = -1;
420
    try
421
    {
422
      // Get DBConnection from pool
423
      dbConn=DBConnectionPool.
424
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
425
      serialNumber=dbConn.getCheckOutSerialNumber();
426
      // Try get docid info from remote server
427
      DocInfoHandler dih = new DocInfoHandler();
428
      XMLReader docinfoParser = initParser(dih);
429
      String docInfoURLString = "https://" + remoteserver +
430
                  "?server="+util.getLocalReplicationServerName()+
431
                  "&action=getdocumentinfo&docid="+accNumber;
432
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
433
      URL docinfoUrl = new URL(docInfoURLString);
434

    
435
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
436
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
437
      Hashtable docinfoHash = dih.getDocInfo();
438
      // Get doicd owner
439
      String user = (String)docinfoHash.get("user_owner");
440
      // Get docid name (such as acl or dataset)
441
      String docName = (String)docinfoHash.get("docname");
442
      // Get doc type (eml public id)
443
      String docType = (String)docinfoHash.get("doctype");
444
      // Get docid home sever. it might be different to remoteserver
445
      // becuause of hub feature
446
      String docHomeServer = (String)docinfoHash.get("home_server");
447
      String createdDate = (String)docinfoHash.get("date_created");
448
      String updatedDate = (String)docinfoHash.get("date_updated");
449
      //docid should include rev number too
450
      /*String accnum=docId+util.getOption("accNumSeparator")+
451
                                              (String)docinfoHash.get("rev");*/
452

    
453

    
454
      String datafilePath = util.getOption("datafilepath");
455
      // Get data file content
456
      String readDataURLString = "https://" + remoteserver + "?server="+
457
                                        util.getLocalReplicationServerName()+
458
                                            "&action=readdata&docid="+accNumber;
459
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
460
      URL u = new URL(readDataURLString);
461
      InputStream input = u.openStream();
462
      //register data file into xml_documents table and wite data file
463
      //into file system
464
      if ( input != null)
465
      {
466
        DocumentImpl.writeDataFileInReplication(input,
467
                                                datafilePath,
468
                                                docName,docType,
469
                                                accNumber, user,
470
                                                docHomeServer,
471
                                                remoteserver,
472
                                                tableName,
473
                                                true, //true means timed replication
474
                                                createdDate,
475
                                                updatedDate);
476
                                         
477
        logMetacat.info("Successfully to write datafile " + accNumber);
478
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
479
                                    remoteserver);*/
480
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
481
        {
482
          MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote data file" + accNumber +
483
                                       " into "+tableName + " from " +
484
                                           remoteserver);
485
          DOCINSERTNUMBER++;
486
        }
487
        else
488
        {
489
            MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote data file" + accNumber +
490
                    " into "+tableName + " from " +
491
                        remoteserver);
492
            REVINSERTNUMBER++;
493
        }
494
        
495
      }//if
496
      else
497
      {
498
         logMetacat.info("Couldn't open the data file: " + accNumber);
499
         throw new Exception("Couldn't open the data file: " + accNumber);
500
      }//else
501

    
502
    }//try
503
    catch(Exception e)
504
    {
505
      /*MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
506
                                      " because " +e.getMessage());*/
507
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
508
      {
509
        MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write data file " + accNumber +
510
                                     " into "+tableName + " from " +
511
                                         remoteserver + " because "+e.getMessage());
512
        DOCERRORNUMBER++;
513
      }
514
      else
515
      {
516
          MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write data file" + accNumber +
517
                  " into "+tableName + " from " +
518
                      remoteserver +" because "+e.getMessage());
519
          REVERRORNUMBER++;
520
      }
521
      logMetacat.error("Failed to try wrote datafile " + accNumber +
522
                                      " because " +e.getMessage());
523
      throw e;
524
    }
525
    finally
526
    {
527
       //return DBConnection
528
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
529
    }//finally
530
  }
531

    
532

    
533

    
534
  /* Handle delete single document*/
535
  private void handleDeleteSingleDocument(String docId, String notifyServer)
536
               throws Exception
537
  {
538
    logMetacat.info("Try delete doc: "+docId);
539
    DBConnection dbConn = null;
540
    int serialNumber = -1;
541
    try
542
    {
543
      // Get DBConnection from pool
544
      dbConn=DBConnectionPool.
545
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
546
      serialNumber=dbConn.getCheckOutSerialNumber();
547
      if(!alreadyDeleted(docId))
548
      {
549

    
550
         //because delete method docid should have rev number
551
         //so we just add one for it. This rev number is no sence.
552
         String accnum=docId+util.getOption("accNumSeparator")+"1";
553
         //System.out.println("accnum: "+accnum);
554
         DocumentImpl.delete(accnum, null, null, notifyServer);
555
         logMetacat.info("Successfully deleted doc " + docId);
556
         MetacatReplication.replLog("Doc " + docId + " deleted");
557
      }
558

    
559
    }//try
560
    catch(Exception e)
561
    {
562
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
563
                                      " in db because " +e.getMessage());
564
      logMetacat.error("Failed to delete doc " + docId +
565
                                 " in db because because " + e.getMessage());
566
      throw e;
567
    }
568
    finally
569
    {
570
       //return DBConnection
571
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
572
    }//finally
573
  }
574

    
575
  /* Handle updateLastCheckTimForSingleServer*/
576
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
577
                                                  throws Exception
578
  {
579
    String server = repServer.getServerName();
580
    DBConnection dbConn = null;
581
    int serialNumber = -1;
582
    PreparedStatement pstmt = null;
583
    try
584
    {
585
      // Get DBConnection from pool
586
      dbConn=DBConnectionPool.
587
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
588
      serialNumber=dbConn.getCheckOutSerialNumber();
589

    
590
      logMetacat.info("Try to update last_check for server: "+server);
591
      // Get time from remote server
592
      URL dateurl = new URL("https://" + server + "?server="+
593
      util.getLocalReplicationServerName()+"&action=gettime");
594
      String datexml = MetacatReplication.getURLContent(dateurl);
595
      logMetacat.info("datexml: "+datexml);
596
      if (datexml!=null && !datexml.equals(""))
597
      {
598
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
599
         StringBuffer sql = new StringBuffer();
600
         /*sql.append("update xml_replication set last_checked = to_date('");
601
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
602
         sql.append("server like '").append(server).append("'");*/
603
         sql.append("update xml_replication set last_checked = ");
604
         sql.append(dbAdapter.toDate(datestr, "MM/DD/YY HH24:MI:SS"));
605
         sql.append(" where server like '").append(server).append("'");
606
         pstmt = dbConn.prepareStatement(sql.toString());
607

    
608
         pstmt.executeUpdate();
609
         dbConn.commit();
610
         pstmt.close();
611
         logMetacat.info("last_checked updated to "+datestr+" on "
612
                                      + server);
613
      }//if
614
      else
615
      {
616

    
617
         logMetacat.info("Failed to update last_checked for server "  +
618
                                  server + " in db because couldn't get time "
619
                                  );
620
         throw new Exception("Couldn't get time for server "+ server);
621
      }
622

    
623
    }//try
624
    catch(Exception e)
625
    {
626

    
627
      logMetacat.error("Failed to update last_checked for server " +
628
                                server + " in db because because " +
629
                                e.getMessage());
630
      throw e;
631
    }
632
    finally
633
    {
634
       //return DBConnection
635
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
636
    }//finally
637
  }
638

    
639

    
640

    
641
  /**
642
   * updates xml_catalog with entries from other servers.
643
   */
644
  private void updateCatalog()
645
  {
646
    logMetacat.info("Start of updateCatalog");
647
    // ReplicationServer object in server list
648
    ReplicationServer replServer = null;
649
    PreparedStatement pstmt = null;
650
    String server = null;
651

    
652

    
653
    // Go through each ReplicationServer object in sererlist
654
    for (int j=0; j<serverList.size(); j++)
655
    {
656
      Vector remoteCatalog = new Vector();
657
      Vector publicId = new Vector();
658
      try
659
      {
660
        // Get ReplicationServer object from server list
661
        replServer = serverList.serverAt(j);
662
        // Get server name from the ReplicationServer object
663
        server = replServer.getServerName();
664
        // Try to get catalog
665
        URL u = new URL("https://" + server + "?server="+
666
        util.getLocalReplicationServerName()+"&action=getcatalog");
667
        logMetacat.info("sending message " + u.toString());
668
        String catxml = MetacatReplication.getURLContent(u);
669

    
670
        // Make sure there are not error, no empty string
671
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
672
        {
673
          throw new Exception("Couldn't get catalog list form server " +server);
674
        }
675
        logMetacat.info("catxml: " + catxml);
676
        CatalogMessageHandler cmh = new CatalogMessageHandler();
677
        XMLReader catparser = initParser(cmh);
678
        catparser.parse(new InputSource(new StringReader(catxml)));
679
        //parse the returned catalog xml and put it into a vector
680
        remoteCatalog = cmh.getCatalogVect();
681

    
682
        // Makse sure remoteCatalog is not empty
683
        if (remoteCatalog.isEmpty())
684
        {
685
          throw new Exception("Couldn't get catalog list form server " +server);
686
        }
687

    
688
        String localcatxml = MetacatReplication.getCatalogXML();
689

    
690
        // Make sure local catalog is no empty
691
        if (localcatxml==null||localcatxml.equals(""))
692
        {
693
          throw new Exception("Couldn't get catalog list form server " +server);
694
        }
695

    
696
        cmh = new CatalogMessageHandler();
697
        catparser = initParser(cmh);
698
        catparser.parse(new InputSource(new StringReader(localcatxml)));
699
        Vector localCatalog = cmh.getCatalogVect();
700

    
701
        //now we have the catalog from the remote server and this local server
702
        //we now need to compare the two and merge the differences.
703
        //the comparison is base on the public_id fields which is the 4th
704
        //entry in each row vector.
705
        publicId = new Vector();
706
        for(int i=0; i<localCatalog.size(); i++)
707
        {
708
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
709
          logMetacat.info("v1: " + v.toString());
710
          publicId.add(new String((String)v.elementAt(3)));
711
          //System.out.println("adding " + (String)v.elementAt(3));
712
        }
713
      }//try
714
      catch (Exception e)
715
      {
716
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
717
                                    server + " because " +e.getMessage());
718
        logMetacat.error("Failed to update catalog for server "+
719
                                    server + " because " +e.getMessage());
720
      }//catch
721

    
722
      for(int i=0; i<remoteCatalog.size(); i++)
723
      {
724
         // DConnection
725
        DBConnection dbConn = null;
726
        // DBConnection checkout serial number
727
        int serialNumber = -1;
728
        try
729
        {
730
            dbConn=DBConnectionPool.
731
                  getDBConnection("ReplicationHandler.updateCatalog");
732
            serialNumber=dbConn.getCheckOutSerialNumber();
733
            Vector v = (Vector)remoteCatalog.elementAt(i);
734
            //System.out.println("v2: " + v.toString());
735
            //System.out.println("i: " + i);
736
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
737
            //System.out.println("publicID: " + publicId.toString());
738
            logMetacat.info
739
                              ("v.elementAt(3): " + (String)v.elementAt(3));
740
           if(!publicId.contains(v.elementAt(3)))
741
           { //so we don't have this public id in our local table so we need to
742
             //add it.
743
             //System.out.println("in if");
744
             StringBuffer sql = new StringBuffer();
745
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
746
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
747
             sql.append("?,?)");
748
             //System.out.println("sql: " + sql.toString());
749
             pstmt = dbConn.prepareStatement(sql.toString());
750
             pstmt.setString(1, (String)v.elementAt(0));
751
             pstmt.setString(2, (String)v.elementAt(1));
752
             pstmt.setString(3, (String)v.elementAt(2));
753
             pstmt.setString(4, (String)v.elementAt(3));
754
             pstmt.setString(5, (String)v.elementAt(4));
755
             pstmt.execute();
756
             pstmt.close();
757
             MetacatReplication.replLog("Success fully to insert new publicid "+
758
                               (String)v.elementAt(3) + " from server"+server);
759
             logMetacat.info("Success fully to insert new publicid "+
760
                             (String)v.elementAt(3) + " from server" +server);
761
           }
762
        }
763
        catch(Exception e)
764
        {
765
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
766
                                    server + " because " +e.getMessage());
767
           logMetacat.error("Failed to update catalog for server "+
768
                                    server + " because " +e.getMessage());
769
        }//catch
770
        finally
771
        {
772
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
773
        }//finall
774
      }//for remote catalog
775
    }//for server list
776
    logMetacat.info("End of updateCatalog");
777
  }
778

    
779
  /**
780
   * Method that returns true if docid has already been "deleted" from metacat.
781
   * This method really implements a truth table for deleted documents
782
   * The table is (a docid in one of the tables is represented by the X):
783
   * xml_docs      xml_revs      deleted?
784
   * ------------------------------------
785
   *   X             X             FALSE
786
   *   X             _             FALSE
787
   *   _             X             TRUE
788
   *   _             _             TRUE
789
   */
790
  private static boolean alreadyDeleted(String docid) throws Exception
791
  {
792
    DBConnection dbConn = null;
793
    int serialNumber = -1;
794
    PreparedStatement pstmt = null;
795
    try
796
    {
797
      dbConn=DBConnectionPool.
798
                  getDBConnection("ReplicationHandler.alreadyDeleted");
799
      serialNumber=dbConn.getCheckOutSerialNumber();
800
      boolean xml_docs = false;
801
      boolean xml_revs = false;
802

    
803
      StringBuffer sb = new StringBuffer();
804
      sb.append("select docid from xml_revisions where docid like '");
805
      sb.append(docid).append("'");
806
      pstmt = dbConn.prepareStatement(sb.toString());
807
      pstmt.execute();
808
      ResultSet rs = pstmt.getResultSet();
809
      boolean tablehasrows = rs.next();
810
      if(tablehasrows)
811
      {
812
        xml_revs = true;
813
      }
814

    
815
      sb = new StringBuffer();
816
      sb.append("select docid from xml_documents where docid like '");
817
      sb.append(docid).append("'");
818
      pstmt.close();
819
      pstmt = dbConn.prepareStatement(sb.toString());
820
      //increase usage count
821
      dbConn.increaseUsageCount(1);
822
      pstmt.execute();
823
      rs = pstmt.getResultSet();
824
      tablehasrows = rs.next();
825
      pstmt.close();
826
      if(tablehasrows)
827
      {
828
        xml_docs = true;
829
      }
830

    
831
      if(xml_docs && xml_revs)
832
      {
833
        return false;
834
      }
835
      else if(xml_docs && !xml_revs)
836
      {
837
        return false;
838
      }
839
      else if(!xml_docs && xml_revs)
840
      {
841
        return true;
842
      }
843
      else if(!xml_docs && !xml_revs)
844
      {
845
        return true;
846
      }
847
    }
848
    catch(Exception e)
849
    {
850
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
851
                          e.getMessage());
852
      throw e;
853
    }
854
    finally
855
    {
856
      try
857
      {
858
        pstmt.close();
859
      }//try
860
      catch (SQLException ee)
861
      {
862
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
863
                          "to close pstmt: "+ee.getMessage());
864
        throw ee;
865
      }//catch
866
      finally
867
      {
868
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
869
      }//finally
870
    }//finally
871
    return false;
872
  }
873

    
874

    
875
  /**
876
   * Method to initialize the message parser
877
   */
878
  public static XMLReader initParser(DefaultHandler dh)
879
          throws Exception
880
  {
881
    XMLReader parser = null;
882

    
883
    try {
884
      ContentHandler chandler = dh;
885

    
886
      // Get an instance of the parser
887
      MetaCatUtil util = new MetaCatUtil();
888
      String parserName = util.getOption("saxparser");
889
      parser = XMLReaderFactory.createXMLReader(parserName);
890

    
891
      // Turn off validation
892
      parser.setFeature("http://xml.org/sax/features/validation", false);
893

    
894
      parser.setContentHandler((ContentHandler)chandler);
895
      parser.setErrorHandler((ErrorHandler)chandler);
896

    
897
    } catch (Exception e) {
898
      throw e;
899
    }
900

    
901
    return parser;
902
  }
903

    
904
  /**
905
   * This method will combinate given time string(in short format) to
906
   * current date. If the given time (e.g 10:00 AM) passed the current time
907
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
908
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
909
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
910
   * 10:00 AM Aug 21, 2005.
911
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
912
   * @return
913
   * @throws Exception
914
   */
915
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
916
  {
917
     Date givenDate = parseTime(givenTime);
918
     Date newDate = null;
919
     Date now = new Date();
920
     String currentTimeString = getTimeString(now);
921
     Date currentTime = parseTime(currentTimeString); 
922
     if ( currentTime.getTime() >= givenDate.getTime())
923
     {
924
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
925
        String dateAndTime = getDateString(now) + " " + givenTime;
926
        Date combinationDate = parseDateTime(dateAndTime);
927
        // new date should plus 24 hours to make is the second day
928
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
929
     }
930
     else
931
     {
932
         logMetacat.info("Today haven't pass the given time, we should it as today");
933
         String dateAndTime = getDateString(now) + " " + givenTime;
934
         newDate = parseDateTime(dateAndTime);
935
     }
936
     logMetacat.warn("final setting time is "+ newDate.toString());
937
     return newDate;
938
  }
939

    
940
  /*
941
   * parse a given string to Time in short format.
942
   * For example, given time is 10:00 AM, the date will be return as
943
   * Jan 1 1970, 10:00 AM
944
   */
945
  private static Date parseTime(String timeString) throws Exception
946
  {
947
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
948
    Date time = format.parse(timeString); 
949
    logMetacat.info("Date string is after parse a time string "
950
                              +time.toString());
951
    return time;
952

    
953
  }
954
  
955
  /*
956
   * Pasre a given string to date and time. Date format is long and time
957
   * format is short.
958
   */
959
  private static Date parseDateTime(String timeString) throws Exception
960
  {
961
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
962
    Date time = format.parse(timeString);
963
    logMetacat.info("Date string is after parse a time string "+
964
                             time.toString());
965
    return time;
966
  }
967
  
968
  /*
969
   * Get a date string from a Date object. The date format will be long
970
   */
971
  private static String getDateString(Date now)
972
  {
973
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
974
     String s = df.format(now);
975
     logMetacat.info("Today is " + s);
976
     return s;
977
  }
978
  
979
  /*
980
   * Get a time string from a Date object, the time format will be short
981
   */
982
  private static String getTimeString(Date now)
983
  {
984
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
985
     String s = df.format(now);
986
     logMetacat.info("Time is " + s);
987
     return s;
988
  }
989
  
990
  
991
  /*
992
   * This method will go through the docid list both in xml_Documents table
993
   * and in xml_revisions table
994
   * @author tao
995
   */
996
   private void handleDocList(Vector docList, String tableName)
997
   {
998
       boolean dataFile=false;
999
       for(int j=0; j<docList.size(); j++)
1000
       {
1001
         //initial dataFile is false
1002
         dataFile=false;
1003
         //w is information for one document, information contain
1004
         //docid, rev, server or datafile.
1005
         Vector w = new Vector((Vector)(docList.elementAt(j)));
1006
         //Check if the vector w contain "datafile"
1007
         //If it has, this document is data file
1008
         if (w.contains((String)util.getOption("datafileflag")))
1009
         {
1010
           dataFile=true;
1011
         }
1012
         //System.out.println("w: " + w.toString());
1013
         // Get docid
1014
         String docid = (String)w.elementAt(0);
1015
         logMetacat.info("docid: " + docid);
1016
         // Get revision number
1017
         int rev = Integer.parseInt((String)w.elementAt(1));
1018
         logMetacat.info("rev: " + rev);
1019
         // Get remote server name (it is may not be doc homeserver because
1020
         // the new hub feature
1021
         String remoteServer = (String)w.elementAt(2);
1022
         remoteServer = remoteServer.trim();
1023

    
1024
         try
1025
         {
1026
             if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
1027
             {
1028
                 handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1029
             }
1030
             else if (tableName.equals(DocumentImpl.REVISIONTABLE))
1031
             {
1032
                 handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1033
             }
1034
             else
1035
             {
1036
                 continue;
1037
             }
1038
                 
1039
         }
1040
         catch (Exception e)
1041
         {
1042
             logMetacat.error("error to handle update doc in "+ 
1043
                     tableName +" in time replication"+e.getMessage());
1044
             continue;
1045
         }
1046
        
1047
       }//for update docs
1048

    
1049
   }
1050
   
1051
   /*
1052
    * This method will handle doc in xml_documents table.
1053
    */
1054
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1055
                                        throws Exception
1056
   {
1057
       // compare the update rev and local rev to see what need happen
1058
       int localrev = -1;
1059
       String action = null;
1060
       boolean flag = false;
1061
       try
1062
       {
1063
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1064
       }
1065
       catch (SQLException e)
1066
       {
1067
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1068
                                " be found because " + e.getMessage());
1069
         MetacatReplication.replErrorLog(""+DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1070
                 "written because error happend to find it's local revision");
1071
         DOCERRORNUMBER++;
1072
         throw new Exception (e.getMessage());
1073
       }
1074
       logMetacat.info("Local rev for docid "+ docid + " is "+
1075
                               localrev);
1076

    
1077
       //check the revs for an update because this document is in the
1078
       //local DB, it might be out of date.
1079
       if (localrev == -1)
1080
       {
1081
          // check if the revision is in the revision table
1082
         Vector localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1083
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1084
         {
1085
             // this version was deleted, so don't need replicate
1086
             flag = false;
1087
         }
1088
         else
1089
         {
1090
           //insert this document as new because it is not in the local DB
1091
           action = "INSERT";
1092
           flag = true;
1093
         }
1094
       }
1095
       else
1096
       {
1097
         if(localrev == rev)
1098
         {
1099
           // Local meatacat has the same rev to remote host, don't need
1100
           // update and flag set false
1101
           flag = false;
1102
         }
1103
         else if(localrev < rev)
1104
         {
1105
           //this document needs to be updated so send an read request
1106
           action = "UPDATE";
1107
           flag = true;
1108
         }
1109
       }
1110
       
1111
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1112
       // this is non-data file
1113
       if(flag && !dataFile)
1114
       {
1115
         try
1116
         {
1117
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1118
         }
1119
         catch(Exception e)
1120
         {
1121
           // skip this document
1122
           throw e;
1123
         }
1124
       }//if for non-data file
1125

    
1126
        // this is for data file
1127
       if(flag && dataFile)
1128
       {
1129
         try
1130
         {
1131
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1132
         }
1133
         catch(Exception e)
1134
         {
1135
           // skip this datafile
1136
           throw e;
1137
         }
1138

    
1139
       }//for datafile
1140
   }
1141
   
1142
   /*
1143
    * This method will handle doc in xml_documents table.
1144
    */
1145
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1146
                                        throws Exception
1147
   {
1148
       // compare the update rev and local rev to see what need happen
1149
       logMetacat.info("In handle repliation revsion table");
1150
       logMetacat.info("the docid is "+ docid);
1151
       logMetacat.info("The rev is "+rev);
1152
       Vector localrev = null;
1153
       String action = "INSERT";
1154
       boolean flag = false;
1155
       try
1156
       {
1157
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1158
       }
1159
       catch (SQLException e)
1160
       {
1161
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1162
                                " be found because " + e.getMessage());
1163
         MetacatReplication.replErrorLog(""+REVERRORNUMBER+" Docid "+ docid + " could not be "+
1164
                 "written because error happend to find it's local revision");
1165
         REVERRORNUMBER++;
1166
         throw new Exception (e.getMessage());
1167
       }
1168
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1169
                               localrev.toString());
1170
       
1171
       // if the rev is not in the xml_revision, we need insert it
1172
       if (!localrev.contains(new Integer(rev)))
1173
       {
1174
           flag = true;    
1175
       }
1176
     
1177
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1178
       // this is non-data file
1179
       if(flag && !dataFile)
1180
       {
1181
         try
1182
         {
1183
           
1184
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1185
         }
1186
         catch(Exception e)
1187
         {
1188
           // skip this document
1189
           throw e;
1190
         }
1191
       }//if for non-data file
1192

    
1193
        // this is for data file
1194
       if(flag && dataFile)
1195
       {
1196
         try
1197
         {
1198
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1199
         }
1200
         catch(Exception e)
1201
         {
1202
           // skip this datafile
1203
           throw e;
1204
         }
1205

    
1206
       }//for datafile
1207
   }
1208
  
1209
}
1210

    
(59-59/65)