Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: tao $'
9
 *     '$Date: 2007-04-13 11:49:52 -0700 (Fri, 13 Apr 2007) $'
10
 * '$Revision: 3234 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat;
28

    
29
import edu.ucsb.nceas.dbadapter.AbstractDatabase;
30
import java.sql.*;
31
import java.util.*;
32
import java.util.Date;
33
import java.lang.Thread;
34
import java.io.*;
35
import java.net.*;
36
import java.text.*;
37

    
38
import org.apache.log4j.Logger;
39
import org.xml.sax.AttributeList;
40
import org.xml.sax.ContentHandler;
41
import org.xml.sax.DTDHandler;
42
import org.xml.sax.EntityResolver;
43
import org.xml.sax.ErrorHandler;
44
import org.xml.sax.InputSource;
45
import org.xml.sax.XMLReader;
46
import org.xml.sax.SAXException;
47
import org.xml.sax.SAXParseException;
48
import org.xml.sax.helpers.XMLReaderFactory;
49
import org.xml.sax.helpers.DefaultHandler;
50

    
51

    
52

    
53
/**
54
 * This class handles deltaT replication checking.  Whenever this TimerTask
55
 * is fired it checks each server in xml_replication for updates and updates
56
 * the local db as needed.
57
 */
58
public class ReplicationHandler extends TimerTask
59
{
60
  int serverCheckCode = 1;
61
  MetaCatUtil util = new MetaCatUtil();
62
  ReplicationServerList serverList = null;
63
  //PrintWriter out;
64
  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
65
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
66
  private static int DOCINSERTNUMBER = 1;
67
  private static int DOCERRORNUMBER  = 1;
68
  private static int REVINSERTNUMBER = 1;
69
  private static int REVERRORNUMBER  = 1;
70
  public ReplicationHandler()
71
  {
72
    //this.out = o;
73
    serverList = new ReplicationServerList();
74
  }
75

    
76
  public ReplicationHandler(int serverCheckCode)
77
  {
78
    //this.out = o;
79
    this.serverCheckCode = serverCheckCode;
80
    serverList = new ReplicationServerList();
81
  }
82

    
83
  /**
84
   * Method that implements TimerTask.run().  It runs whenever the timer is
85
   * fired.
86
   */
87
  public void run()
88
  {
89
    //find out the last_checked time of each server in the server list and
90
    //send a query to each server to see if there are any documents in
91
    //xml_documents with an update_date > last_checked
92
    try
93
    {
94
      //if serverList is null, metacat don't need to replication
95
      if (serverList==null||serverList.isEmpty())
96
      {
97
        return;
98
      }
99
      updateCatalog();
100
      update();
101
      //conn.close();
102
    }//try
103
    catch (Exception e)
104
    {
105
      logMetacat.error("Error in replicationHandler.run():"
106
                                                    +e.getMessage());
107
      e.printStackTrace();
108
    }//catch
109
  }
110

    
111
  /**
112
   * Method that uses revision taging for replication instead of update_date.
113
   */
114
  private void update()
115
  {
116
    /*
117
     Pseudo-algorithm
118
     - request a doc list from each server in xml_replication
119
     - check the rev number of each of those documents agains the
120
       documents in the local database
121
     - pull any documents that have a lesser rev number on the local server
122
       from the remote server
123
     - delete any documents that still exist in the local xml_documents but
124
       are in the deletedDocuments tag of the remote host response.
125
     - update last_checked to keep track of the last time it was checked.
126
       (this info is theoretically not needed using this system but probably
127
       should be kept anyway)
128
    */
129

    
130
    ReplicationServer replServer = null; // Variable to store the
131
                                        // ReplicationServer got from
132
                                        // Server list
133
    String server = null; // Variable to store server name
134
    String update;
135
    ReplMessageHandler message = new ReplMessageHandler();
136
    Vector responses = new Vector();
137
    XMLReader parser;
138
    URL u;
139

    
140

    
141
    try
142
    {
143
      parser = initParser(message);
144
    }
145
    catch (Exception e)
146
    {
147
      MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
148
                                 " initParser for message and" +e.getMessage());
149
      logMetacat.error("Failed to replicate becaue couldn't " +
150
                            " initParser for message and " +e.getMessage());
151
       // stop replication
152
       return;
153
    }
154
    //Check for every server in server list to get updated list and put
155
    // them in to response
156
    for (int i=0; i<serverList.size(); i++)
157
    {
158
        // Get ReplicationServer object from server list
159
        replServer = serverList.serverAt(i);
160
        // Get server name from ReplicationServer object
161
        server = replServer.getServerName().trim();
162
        String result = null;
163
        MetacatReplication.replLog("full update started to: " + server);
164
        // Send command to that server to get updated docid information
165
        try
166
        {
167
          u = new URL("https://" + server + "?server="
168
          +util.getLocalReplicationServerName()+"&action=update");
169
          logMetacat.info("Sending infomation " +u.toString());
170
          result = MetacatReplication.getURLContent(u);
171
        }
172
        catch (Exception e)
173
        {
174
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
175
                          "for server " + server + " because "+e.getMessage());
176
          logMetacat.error( "Failed to get updated doc list "+
177
                       "for server " + server + " because "+e.getMessage());
178
          continue;
179
        }
180

    
181
        logMetacat.info("docid: "+server+" "+result);
182
        //check if result have error or not, if has skip it.
183
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
184
        {
185
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
186
                          "for server " + server + " because "+result);
187
          logMetacat.info( "Failed to get updated doc list "+
188
                       "for server " + server + " because "+result);
189
          continue;
190
        }
191
        //Add result to vector
192
        responses.add(result);
193
    }
194

    
195
    //make sure that there is updated file list
196
    //If response is null, metacat don't need do anything
197
    if (responses==null || responses.isEmpty())
198
    {
199
        MetacatReplication.replErrorLog("No updated doc list for "+
200
                           "every server and failed to replicate");
201
        logMetacat.info( "No updated doc list for "+
202
                           "every server and failed to replicate");
203
        return;
204
    }
205

    
206

    
207
    logMetacat.info("Responses from remote metacat about updated "+
208
                   "document information: "+ responses.toString());
209
    // go through response vector(it contains updated vector and delete vector
210
    for(int i=0; i<responses.size(); i++)
211
    {
212
        try
213
        {
214
          parser.parse(new InputSource(
215
                     new StringReader(
216
                     (String)(responses.elementAt(i)))));
217
        }
218
        catch(Exception e)
219
        {
220
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
221
                           "because "+ e.getMessage());
222
          logMetacat.error("Couldn't parse one responses "+
223
                                   "because "+ e.getMessage());
224
          continue;
225
        }
226
        //v is the list of updated documents
227
        Vector updateList = new Vector(message.getUpdatesVect());
228
        MetacatReplication.replLog("The document list size is "+updateList.size());
229
        //System.out.println("v: " + v.toString());
230
        //d is the list of deleted documents
231
        Vector deleteList = new Vector(message.getDeletesVect());
232
        //System.out.println("d: " + d.toString());
233
        logMetacat.info("Update vector size: "+ updateList.size());
234
        logMetacat.info("Delete vector size: "+ deleteList.size());
235
        MetacatReplication.replLog("The delete document list size is "+deleteList.size());
236
        // go though every element in updated document vector
237
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
238
        //handle deleted docs
239
        for(int k=0; k<deleteList.size(); k++)
240
        { //delete the deleted documents;
241
          Vector w = new Vector((Vector)deleteList.elementAt(k));
242
          String docId = (String)w.elementAt(0);
243
          try
244
          {
245
            handleDeleteSingleDocument(docId, server);
246
          }
247
          catch (Exception ee)
248
          {
249
            continue;
250
          }
251
        }//for delete docs
252
        
253
        // handle replicate doc in xml_revision
254
        Vector revisionList = new Vector(message.getRevisionsVect());
255
        MetacatReplication.replLog("The revision document list size is "+revisionList.size());
256
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
257
        DOCINSERTNUMBER = 1;
258
        DOCERRORNUMBER  = 1;
259
        REVINSERTNUMBER = 1;
260
        REVERRORNUMBER  = 1;
261
    }//for response
262

    
263
    //updated last_checked
264
    for (int i=0;i<serverList.size(); i++)
265
    {
266
       // Get ReplicationServer object from server list
267
       replServer = serverList.serverAt(i);
268
       try
269
       {
270
         updateLastCheckTimeForSingleServer(replServer);
271
       }
272
       catch(Exception e)
273
       {
274
         continue;
275
       }
276
    }//for
277

    
278
  }//update
279

    
280
  /* Handle replicate single xml document*/
281
  private void handleSingleXMLDocument(String remoteserver, String actions,
282
                                       String accNumber, String tableName)
283
               throws Exception
284
  {
285
    DBConnection dbConn = null;
286
    int serialNumber = -1;
287
    try
288
    {
289
      // Get DBConnection from pool
290
      dbConn=DBConnectionPool.
291
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
292
      serialNumber=dbConn.getCheckOutSerialNumber();
293
      //if the document needs to be updated or inserted, this is executed
294
      String readDocURLString = "https://" + remoteserver + "?server="+
295
              util.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
296
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
297
      URL u = new URL(readDocURLString);
298

    
299
      // Get docid content
300
      String newxmldoc = MetacatReplication.getURLContent(u);
301
      // If couldn't get skip it
302
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
303
      {
304
         throw new Exception(newxmldoc);
305
      }
306
      //logMetacat.info("xml documnet:");
307
      //logMetacat.info(newxmldoc);
308

    
309
      // Try get the docid info from remote server
310
      DocInfoHandler dih = new DocInfoHandler();
311
      XMLReader docinfoParser = initParser(dih);
312
      String docInfoURLStr = "https://" + remoteserver +
313
                       "?server="+util.getLocalReplicationServerName()+
314
                       "&action=getdocumentinfo&docid="+accNumber;
315
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
316
      URL docinfoUrl = new URL(docInfoURLStr);
317
      logMetacat.info("Sending message: " +
318
                                                  docinfoUrl.toString());
319
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
320
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
321
      Hashtable docinfoHash = dih.getDocInfo();
322
      // Get home server of the docid
323
      String docHomeServer = (String)docinfoHash.get("home_server");
324
      logMetacat.info("doc home server in repl: "+docHomeServer);
325
      String createdDate = (String)docinfoHash.get("date_created");
326
      String updatedDate = (String)docinfoHash.get("date_updated");
327
      //docid should include rev number too
328
      /*String accnum=docId+util.getOption("accNumSeparator")+
329
                                              (String)docinfoHash.get("rev");*/
330
      logMetacat.info("docid in repl: "+accNumber);
331
      String docType = (String)docinfoHash.get("doctype");
332
      logMetacat.info("doctype in repl: "+docType);
333

    
334
      String parserBase = null;
335
      // this for eml2 and we need user eml2 parser
336
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
337
      {
338
         parserBase = DocumentImpl.EML200;
339
      }
340
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
341
      {
342
        parserBase = DocumentImpl.EML200;
343
      }
344
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
345
      {
346
        parserBase = DocumentImpl.EML210;
347
      }
348
      // Write the document into local host
349
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
350
      String newDocid = wrapper.writeReplication(dbConn,
351
                              new StringReader(newxmldoc),
352
                              (String)docinfoHash.get("public_access"),
353
                              null,  /* the dtd text */
354
                              actions,
355
                              accNumber,
356
                              (String)docinfoHash.get("user_owner"),
357
                              null, /* null for groups[] */
358
                              docHomeServer,
359
                              remoteserver, tableName, true,// true is for time replication 
360
                              createdDate,
361
                              updatedDate);
362
      logMetacat.info("Successfully replicated doc " + accNumber);
363
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
364
      {
365
        MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
366
                                     " into "+tableName + " from " +
367
                                         remoteserver);
368
        DOCINSERTNUMBER++;
369
      }
370
      else
371
      {
372
          MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
373
                  " into "+tableName + " from " +
374
                      remoteserver);
375
          REVINSERTNUMBER++;
376
      }
377
      String ip = getIpFromURL(u);
378
      EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
379
      
380

    
381
    }//try
382
    catch(Exception e)
383
    {
384
        
385
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
386
        {
387
          MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
388
                                       " into "+tableName + " from " +
389
                                           remoteserver + " because "+e.getMessage());
390
          DOCERRORNUMBER++;
391
        }
392
        else
393
        {
394
            MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
395
                    " into "+tableName + " from " +
396
                        remoteserver +" because "+e.getMessage());
397
            REVERRORNUMBER++;
398
        }
399
       
400
      logMetacat.error("Failed to write doc " + accNumber +
401
                                      " into db because " +e.getMessage());
402
      throw e;
403
    }
404
    finally
405
    {
406
       //return DBConnection
407
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
408
    }//finally
409
  }
410

    
411

    
412

    
413
  /* Handle replicate single xml document*/
414
  private void handleSingleDataFile(String remoteserver, String actions,
415
                                    String accNumber, String tableName)
416
               throws Exception
417
  {
418
    logMetacat.info("Try to replicate data file: "+accNumber);
419
    DBConnection dbConn = null;
420
    int serialNumber = -1;
421
    try
422
    {
423
      // Get DBConnection from pool
424
      dbConn=DBConnectionPool.
425
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
426
      serialNumber=dbConn.getCheckOutSerialNumber();
427
      // Try get docid info from remote server
428
      DocInfoHandler dih = new DocInfoHandler();
429
      XMLReader docinfoParser = initParser(dih);
430
      String docInfoURLString = "https://" + remoteserver +
431
                  "?server="+util.getLocalReplicationServerName()+
432
                  "&action=getdocumentinfo&docid="+accNumber;
433
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
434
      URL docinfoUrl = new URL(docInfoURLString);
435

    
436
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
437
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
438
      Hashtable docinfoHash = dih.getDocInfo();
439
      // Get doicd owner
440
      String user = (String)docinfoHash.get("user_owner");
441
      // Get docid name (such as acl or dataset)
442
      String docName = (String)docinfoHash.get("docname");
443
      // Get doc type (eml public id)
444
      String docType = (String)docinfoHash.get("doctype");
445
      // Get docid home sever. it might be different to remoteserver
446
      // becuause of hub feature
447
      String docHomeServer = (String)docinfoHash.get("home_server");
448
      String createdDate = (String)docinfoHash.get("date_created");
449
      String updatedDate = (String)docinfoHash.get("date_updated");
450
      //docid should include rev number too
451
      /*String accnum=docId+util.getOption("accNumSeparator")+
452
                                              (String)docinfoHash.get("rev");*/
453

    
454

    
455
      String datafilePath = util.getOption("datafilepath");
456
      // Get data file content
457
      String readDataURLString = "https://" + remoteserver + "?server="+
458
                                        util.getLocalReplicationServerName()+
459
                                            "&action=readdata&docid="+accNumber;
460
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
461
      URL u = new URL(readDataURLString);
462
      InputStream input = u.openStream();
463
      //register data file into xml_documents table and wite data file
464
      //into file system
465
      if ( input != null)
466
      {
467
        DocumentImpl.writeDataFileInReplication(input,
468
                                                datafilePath,
469
                                                docName,docType,
470
                                                accNumber, user,
471
                                                docHomeServer,
472
                                                remoteserver,
473
                                                tableName,
474
                                                true, //true means timed replication
475
                                                createdDate,
476
                                                updatedDate);
477
                                         
478
        logMetacat.info("Successfully to write datafile " + accNumber);
479
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
480
                                    remoteserver);*/
481
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
482
        {
483
          MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote data file" + accNumber +
484
                                       " into "+tableName + " from " +
485
                                           remoteserver);
486
          DOCINSERTNUMBER++;
487
        }
488
        else
489
        {
490
            MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote data file" + accNumber +
491
                    " into "+tableName + " from " +
492
                        remoteserver);
493
            REVINSERTNUMBER++;
494
        }
495
        String ip = getIpFromURL(u);
496
        EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
497
        
498
      }//if
499
      else
500
      {
501
         logMetacat.info("Couldn't open the data file: " + accNumber);
502
         throw new Exception("Couldn't open the data file: " + accNumber);
503
      }//else
504

    
505
    }//try
506
    catch(Exception e)
507
    {
508
      /*MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
509
                                      " because " +e.getMessage());*/
510
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
511
      {
512
        MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write data file " + accNumber +
513
                                     " into "+tableName + " from " +
514
                                         remoteserver + " because "+e.getMessage());
515
        DOCERRORNUMBER++;
516
      }
517
      else
518
      {
519
          MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write data file" + accNumber +
520
                  " into "+tableName + " from " +
521
                      remoteserver +" because "+e.getMessage());
522
          REVERRORNUMBER++;
523
      }
524
      logMetacat.error("Failed to try wrote datafile " + accNumber +
525
                                      " because " +e.getMessage());
526
      throw e;
527
    }
528
    finally
529
    {
530
       //return DBConnection
531
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
532
    }//finally
533
  }
534

    
535

    
536

    
537
  /* Handle delete single document*/
538
  private void handleDeleteSingleDocument(String docId, String notifyServer)
539
               throws Exception
540
  {
541
    logMetacat.info("Try delete doc: "+docId);
542
    DBConnection dbConn = null;
543
    int serialNumber = -1;
544
    try
545
    {
546
      // Get DBConnection from pool
547
      dbConn=DBConnectionPool.
548
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
549
      serialNumber=dbConn.getCheckOutSerialNumber();
550
      if(!alreadyDeleted(docId))
551
      {
552

    
553
         //because delete method docid should have rev number
554
         //so we just add one for it. This rev number is no sence.
555
         String accnum=docId+util.getOption("accNumSeparator")+"1";
556
         //System.out.println("accnum: "+accnum);
557
         DocumentImpl.delete(accnum, null, null, notifyServer);
558
         logMetacat.info("Successfully deleted doc " + docId);
559
         MetacatReplication.replLog("Doc " + docId + " deleted");
560
         URL u = new URL("https://"+notifyServer);
561
         String ip = getIpFromURL(u);
562
         EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, docId, "delete");
563
      }
564

    
565
    }//try
566
    catch(Exception e)
567
    {
568
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
569
                                      " in db because " +e.getMessage());
570
      logMetacat.error("Failed to delete doc " + docId +
571
                                 " in db because because " + e.getMessage());
572
      throw e;
573
    }
574
    finally
575
    {
576
       //return DBConnection
577
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
578
    }//finally
579
  }
580

    
581
  /* Handle updateLastCheckTimForSingleServer*/
582
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
583
                                                  throws Exception
584
  {
585
    String server = repServer.getServerName();
586
    DBConnection dbConn = null;
587
    int serialNumber = -1;
588
    PreparedStatement pstmt = null;
589
    try
590
    {
591
      // Get DBConnection from pool
592
      dbConn=DBConnectionPool.
593
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
594
      serialNumber=dbConn.getCheckOutSerialNumber();
595

    
596
      logMetacat.info("Try to update last_check for server: "+server);
597
      // Get time from remote server
598
      URL dateurl = new URL("https://" + server + "?server="+
599
      util.getLocalReplicationServerName()+"&action=gettime");
600
      String datexml = MetacatReplication.getURLContent(dateurl);
601
      logMetacat.info("datexml: "+datexml);
602
      if (datexml!=null && !datexml.equals(""))
603
      {
604
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
605
         StringBuffer sql = new StringBuffer();
606
         /*sql.append("update xml_replication set last_checked = to_date('");
607
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
608
         sql.append("server like '").append(server).append("'");*/
609
         sql.append("update xml_replication set last_checked = ");
610
         sql.append(dbAdapter.toDate(datestr, "MM/DD/YY HH24:MI:SS"));
611
         sql.append(" where server like '").append(server).append("'");
612
         pstmt = dbConn.prepareStatement(sql.toString());
613

    
614
         pstmt.executeUpdate();
615
         dbConn.commit();
616
         pstmt.close();
617
         logMetacat.info("last_checked updated to "+datestr+" on "
618
                                      + server);
619
      }//if
620
      else
621
      {
622

    
623
         logMetacat.info("Failed to update last_checked for server "  +
624
                                  server + " in db because couldn't get time "
625
                                  );
626
         throw new Exception("Couldn't get time for server "+ server);
627
      }
628

    
629
    }//try
630
    catch(Exception e)
631
    {
632

    
633
      logMetacat.error("Failed to update last_checked for server " +
634
                                server + " in db because because " +
635
                                e.getMessage());
636
      throw e;
637
    }
638
    finally
639
    {
640
       //return DBConnection
641
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
642
    }//finally
643
  }
644

    
645

    
646

    
647
  /**
648
   * updates xml_catalog with entries from other servers.
649
   */
650
  private void updateCatalog()
651
  {
652
    logMetacat.info("Start of updateCatalog");
653
    // ReplicationServer object in server list
654
    ReplicationServer replServer = null;
655
    PreparedStatement pstmt = null;
656
    String server = null;
657

    
658

    
659
    // Go through each ReplicationServer object in sererlist
660
    for (int j=0; j<serverList.size(); j++)
661
    {
662
      Vector remoteCatalog = new Vector();
663
      Vector publicId = new Vector();
664
      try
665
      {
666
        // Get ReplicationServer object from server list
667
        replServer = serverList.serverAt(j);
668
        // Get server name from the ReplicationServer object
669
        server = replServer.getServerName();
670
        // Try to get catalog
671
        URL u = new URL("https://" + server + "?server="+
672
        util.getLocalReplicationServerName()+"&action=getcatalog");
673
        logMetacat.info("sending message " + u.toString());
674
        String catxml = MetacatReplication.getURLContent(u);
675

    
676
        // Make sure there are not error, no empty string
677
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
678
        {
679
          throw new Exception("Couldn't get catalog list form server " +server);
680
        }
681
        logMetacat.info("catxml: " + catxml);
682
        CatalogMessageHandler cmh = new CatalogMessageHandler();
683
        XMLReader catparser = initParser(cmh);
684
        catparser.parse(new InputSource(new StringReader(catxml)));
685
        //parse the returned catalog xml and put it into a vector
686
        remoteCatalog = cmh.getCatalogVect();
687

    
688
        // Makse sure remoteCatalog is not empty
689
        if (remoteCatalog.isEmpty())
690
        {
691
          throw new Exception("Couldn't get catalog list form server " +server);
692
        }
693

    
694
        String localcatxml = MetacatReplication.getCatalogXML();
695

    
696
        // Make sure local catalog is no empty
697
        if (localcatxml==null||localcatxml.equals(""))
698
        {
699
          throw new Exception("Couldn't get catalog list form server " +server);
700
        }
701

    
702
        cmh = new CatalogMessageHandler();
703
        catparser = initParser(cmh);
704
        catparser.parse(new InputSource(new StringReader(localcatxml)));
705
        Vector localCatalog = cmh.getCatalogVect();
706

    
707
        //now we have the catalog from the remote server and this local server
708
        //we now need to compare the two and merge the differences.
709
        //the comparison is base on the public_id fields which is the 4th
710
        //entry in each row vector.
711
        publicId = new Vector();
712
        for(int i=0; i<localCatalog.size(); i++)
713
        {
714
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
715
          logMetacat.info("v1: " + v.toString());
716
          publicId.add(new String((String)v.elementAt(3)));
717
          //System.out.println("adding " + (String)v.elementAt(3));
718
        }
719
      }//try
720
      catch (Exception e)
721
      {
722
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
723
                                    server + " because " +e.getMessage());
724
        logMetacat.error("Failed to update catalog for server "+
725
                                    server + " because " +e.getMessage());
726
      }//catch
727

    
728
      for(int i=0; i<remoteCatalog.size(); i++)
729
      {
730
         // DConnection
731
        DBConnection dbConn = null;
732
        // DBConnection checkout serial number
733
        int serialNumber = -1;
734
        try
735
        {
736
            dbConn=DBConnectionPool.
737
                  getDBConnection("ReplicationHandler.updateCatalog");
738
            serialNumber=dbConn.getCheckOutSerialNumber();
739
            Vector v = (Vector)remoteCatalog.elementAt(i);
740
            //System.out.println("v2: " + v.toString());
741
            //System.out.println("i: " + i);
742
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
743
            //System.out.println("publicID: " + publicId.toString());
744
            logMetacat.info
745
                              ("v.elementAt(3): " + (String)v.elementAt(3));
746
           if(!publicId.contains(v.elementAt(3)))
747
           { //so we don't have this public id in our local table so we need to
748
             //add it.
749
             //System.out.println("in if");
750
             StringBuffer sql = new StringBuffer();
751
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
752
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
753
             sql.append("?,?)");
754
             //System.out.println("sql: " + sql.toString());
755
             pstmt = dbConn.prepareStatement(sql.toString());
756
             pstmt.setString(1, (String)v.elementAt(0));
757
             pstmt.setString(2, (String)v.elementAt(1));
758
             pstmt.setString(3, (String)v.elementAt(2));
759
             pstmt.setString(4, (String)v.elementAt(3));
760
             pstmt.setString(5, (String)v.elementAt(4));
761
             pstmt.execute();
762
             pstmt.close();
763
             MetacatReplication.replLog("Success fully to insert new publicid "+
764
                               (String)v.elementAt(3) + " from server"+server);
765
             logMetacat.info("Success fully to insert new publicid "+
766
                             (String)v.elementAt(3) + " from server" +server);
767
           }
768
        }
769
        catch(Exception e)
770
        {
771
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
772
                                    server + " because " +e.getMessage());
773
           logMetacat.error("Failed to update catalog for server "+
774
                                    server + " because " +e.getMessage());
775
        }//catch
776
        finally
777
        {
778
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
779
        }//finall
780
      }//for remote catalog
781
    }//for server list
782
    logMetacat.info("End of updateCatalog");
783
  }
784

    
785
  /**
786
   * Method that returns true if docid has already been "deleted" from metacat.
787
   * This method really implements a truth table for deleted documents
788
   * The table is (a docid in one of the tables is represented by the X):
789
   * xml_docs      xml_revs      deleted?
790
   * ------------------------------------
791
   *   X             X             FALSE
792
   *   X             _             FALSE
793
   *   _             X             TRUE
794
   *   _             _             TRUE
795
   */
796
  private static boolean alreadyDeleted(String docid) throws Exception
797
  {
798
    DBConnection dbConn = null;
799
    int serialNumber = -1;
800
    PreparedStatement pstmt = null;
801
    try
802
    {
803
      dbConn=DBConnectionPool.
804
                  getDBConnection("ReplicationHandler.alreadyDeleted");
805
      serialNumber=dbConn.getCheckOutSerialNumber();
806
      boolean xml_docs = false;
807
      boolean xml_revs = false;
808

    
809
      StringBuffer sb = new StringBuffer();
810
      sb.append("select docid from xml_revisions where docid like '");
811
      sb.append(docid).append("'");
812
      pstmt = dbConn.prepareStatement(sb.toString());
813
      pstmt.execute();
814
      ResultSet rs = pstmt.getResultSet();
815
      boolean tablehasrows = rs.next();
816
      if(tablehasrows)
817
      {
818
        xml_revs = true;
819
      }
820

    
821
      sb = new StringBuffer();
822
      sb.append("select docid from xml_documents where docid like '");
823
      sb.append(docid).append("'");
824
      pstmt.close();
825
      pstmt = dbConn.prepareStatement(sb.toString());
826
      //increase usage count
827
      dbConn.increaseUsageCount(1);
828
      pstmt.execute();
829
      rs = pstmt.getResultSet();
830
      tablehasrows = rs.next();
831
      pstmt.close();
832
      if(tablehasrows)
833
      {
834
        xml_docs = true;
835
      }
836

    
837
      if(xml_docs && xml_revs)
838
      {
839
        return false;
840
      }
841
      else if(xml_docs && !xml_revs)
842
      {
843
        return false;
844
      }
845
      else if(!xml_docs && xml_revs)
846
      {
847
        return true;
848
      }
849
      else if(!xml_docs && !xml_revs)
850
      {
851
        return true;
852
      }
853
    }
854
    catch(Exception e)
855
    {
856
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
857
                          e.getMessage());
858
      throw e;
859
    }
860
    finally
861
    {
862
      try
863
      {
864
        pstmt.close();
865
      }//try
866
      catch (SQLException ee)
867
      {
868
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
869
                          "to close pstmt: "+ee.getMessage());
870
        throw ee;
871
      }//catch
872
      finally
873
      {
874
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
875
      }//finally
876
    }//finally
877
    return false;
878
  }
879

    
880

    
881
  /**
882
   * Method to initialize the message parser
883
   */
884
  public static XMLReader initParser(DefaultHandler dh)
885
          throws Exception
886
  {
887
    XMLReader parser = null;
888

    
889
    try {
890
      ContentHandler chandler = dh;
891

    
892
      // Get an instance of the parser
893
      MetaCatUtil util = new MetaCatUtil();
894
      String parserName = util.getOption("saxparser");
895
      parser = XMLReaderFactory.createXMLReader(parserName);
896

    
897
      // Turn off validation
898
      parser.setFeature("http://xml.org/sax/features/validation", false);
899

    
900
      parser.setContentHandler((ContentHandler)chandler);
901
      parser.setErrorHandler((ErrorHandler)chandler);
902

    
903
    } catch (Exception e) {
904
      throw e;
905
    }
906

    
907
    return parser;
908
  }
909

    
910
  /**
911
   * This method will combinate given time string(in short format) to
912
   * current date. If the given time (e.g 10:00 AM) passed the current time
913
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
914
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
915
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
916
   * 10:00 AM Aug 21, 2005.
917
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
918
   * @return
919
   * @throws Exception
920
   */
921
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
922
  {
923
     Date givenDate = parseTime(givenTime);
924
     Date newDate = null;
925
     Date now = new Date();
926
     String currentTimeString = getTimeString(now);
927
     Date currentTime = parseTime(currentTimeString); 
928
     if ( currentTime.getTime() >= givenDate.getTime())
929
     {
930
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
931
        String dateAndTime = getDateString(now) + " " + givenTime;
932
        Date combinationDate = parseDateTime(dateAndTime);
933
        // new date should plus 24 hours to make is the second day
934
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
935
     }
936
     else
937
     {
938
         logMetacat.info("Today haven't pass the given time, we should it as today");
939
         String dateAndTime = getDateString(now) + " " + givenTime;
940
         newDate = parseDateTime(dateAndTime);
941
     }
942
     logMetacat.warn("final setting time is "+ newDate.toString());
943
     return newDate;
944
  }
945

    
946
  /*
947
   * parse a given string to Time in short format.
948
   * For example, given time is 10:00 AM, the date will be return as
949
   * Jan 1 1970, 10:00 AM
950
   */
951
  private static Date parseTime(String timeString) throws Exception
952
  {
953
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
954
    Date time = format.parse(timeString); 
955
    logMetacat.info("Date string is after parse a time string "
956
                              +time.toString());
957
    return time;
958

    
959
  }
960
  
961
  /*
962
   * Pasre a given string to date and time. Date format is long and time
963
   * format is short.
964
   */
965
  private static Date parseDateTime(String timeString) throws Exception
966
  {
967
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
968
    Date time = format.parse(timeString);
969
    logMetacat.info("Date string is after parse a time string "+
970
                             time.toString());
971
    return time;
972
  }
973
  
974
  /*
975
   * Get a date string from a Date object. The date format will be long
976
   */
977
  private static String getDateString(Date now)
978
  {
979
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
980
     String s = df.format(now);
981
     logMetacat.info("Today is " + s);
982
     return s;
983
  }
984
  
985
  /*
986
   * Get a time string from a Date object, the time format will be short
987
   */
988
  private static String getTimeString(Date now)
989
  {
990
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
991
     String s = df.format(now);
992
     logMetacat.info("Time is " + s);
993
     return s;
994
  }
995
  
996
  
997
  /*
998
   * This method will go through the docid list both in xml_Documents table
999
   * and in xml_revisions table
1000
   * @author tao
1001
   */
1002
   private void handleDocList(Vector docList, String tableName)
1003
   {
1004
       boolean dataFile=false;
1005
       for(int j=0; j<docList.size(); j++)
1006
       {
1007
         //initial dataFile is false
1008
         dataFile=false;
1009
         //w is information for one document, information contain
1010
         //docid, rev, server or datafile.
1011
         Vector w = new Vector((Vector)(docList.elementAt(j)));
1012
         //Check if the vector w contain "datafile"
1013
         //If it has, this document is data file
1014
         if (w.contains((String)util.getOption("datafileflag")))
1015
         {
1016
           dataFile=true;
1017
         }
1018
         //System.out.println("w: " + w.toString());
1019
         // Get docid
1020
         String docid = (String)w.elementAt(0);
1021
         logMetacat.info("docid: " + docid);
1022
         // Get revision number
1023
         int rev = Integer.parseInt((String)w.elementAt(1));
1024
         logMetacat.info("rev: " + rev);
1025
         // Get remote server name (it is may not be doc homeserver because
1026
         // the new hub feature
1027
         String remoteServer = (String)w.elementAt(2);
1028
         remoteServer = remoteServer.trim();
1029

    
1030
         try
1031
         {
1032
             if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
1033
             {
1034
                 handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1035
             }
1036
             else if (tableName.equals(DocumentImpl.REVISIONTABLE))
1037
             {
1038
                 handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1039
             }
1040
             else
1041
             {
1042
                 continue;
1043
             }
1044
                 
1045
         }
1046
         catch (Exception e)
1047
         {
1048
             logMetacat.error("error to handle update doc in "+ 
1049
                     tableName +" in time replication"+e.getMessage());
1050
             continue;
1051
         }
1052
        
1053
       }//for update docs
1054

    
1055
   }
1056
   
1057
   /*
1058
    * This method will handle doc in xml_documents table.
1059
    */
1060
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1061
                                        throws Exception
1062
   {
1063
       // compare the update rev and local rev to see what need happen
1064
       int localrev = -1;
1065
       String action = null;
1066
       boolean flag = false;
1067
       try
1068
       {
1069
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1070
       }
1071
       catch (SQLException e)
1072
       {
1073
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1074
                                " be found because " + e.getMessage());
1075
         MetacatReplication.replErrorLog(""+DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1076
                 "written because error happend to find it's local revision");
1077
         DOCERRORNUMBER++;
1078
         throw new Exception (e.getMessage());
1079
       }
1080
       logMetacat.info("Local rev for docid "+ docid + " is "+
1081
                               localrev);
1082

    
1083
       //check the revs for an update because this document is in the
1084
       //local DB, it might be out of date.
1085
       if (localrev == -1)
1086
       {
1087
          // check if the revision is in the revision table
1088
         Vector localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1089
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1090
         {
1091
             // this version was deleted, so don't need replicate
1092
             flag = false;
1093
         }
1094
         else
1095
         {
1096
           //insert this document as new because it is not in the local DB
1097
           action = "INSERT";
1098
           flag = true;
1099
         }
1100
       }
1101
       else
1102
       {
1103
         if(localrev == rev)
1104
         {
1105
           // Local meatacat has the same rev to remote host, don't need
1106
           // update and flag set false
1107
           flag = false;
1108
         }
1109
         else if(localrev < rev)
1110
         {
1111
           //this document needs to be updated so send an read request
1112
           action = "UPDATE";
1113
           flag = true;
1114
         }
1115
       }
1116
       
1117
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1118
       // this is non-data file
1119
       if(flag && !dataFile)
1120
       {
1121
         try
1122
         {
1123
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1124
         }
1125
         catch(Exception e)
1126
         {
1127
           // skip this document
1128
           throw e;
1129
         }
1130
       }//if for non-data file
1131

    
1132
        // this is for data file
1133
       if(flag && dataFile)
1134
       {
1135
         try
1136
         {
1137
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1138
         }
1139
         catch(Exception e)
1140
         {
1141
           // skip this datafile
1142
           throw e;
1143
         }
1144

    
1145
       }//for datafile
1146
   }
1147
   
1148
   /*
1149
    * This method will handle doc in xml_documents table.
1150
    */
1151
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1152
                                        throws Exception
1153
   {
1154
       // compare the update rev and local rev to see what need happen
1155
       logMetacat.info("In handle repliation revsion table");
1156
       logMetacat.info("the docid is "+ docid);
1157
       logMetacat.info("The rev is "+rev);
1158
       Vector localrev = null;
1159
       String action = "INSERT";
1160
       boolean flag = false;
1161
       try
1162
       {
1163
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1164
       }
1165
       catch (SQLException e)
1166
       {
1167
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1168
                                " be found because " + e.getMessage());
1169
         MetacatReplication.replErrorLog(""+REVERRORNUMBER+" Docid "+ docid + " could not be "+
1170
                 "written because error happend to find it's local revision");
1171
         REVERRORNUMBER++;
1172
         throw new Exception (e.getMessage());
1173
       }
1174
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1175
                               localrev.toString());
1176
       
1177
       // if the rev is not in the xml_revision, we need insert it
1178
       if (!localrev.contains(new Integer(rev)))
1179
       {
1180
           flag = true;    
1181
       }
1182
     
1183
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1184
       // this is non-data file
1185
       if(flag && !dataFile)
1186
       {
1187
         try
1188
         {
1189
           
1190
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1191
         }
1192
         catch(Exception e)
1193
         {
1194
           // skip this document
1195
           throw e;
1196
         }
1197
       }//if for non-data file
1198

    
1199
        // this is for data file
1200
       if(flag && dataFile)
1201
       {
1202
         try
1203
         {
1204
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1205
         }
1206
         catch(Exception e)
1207
         {
1208
           // skip this datafile
1209
           throw e;
1210
         }
1211

    
1212
       }//for datafile
1213
   }
1214
   
1215
   /*
1216
    * Return a ip address for given url
1217
    */
1218
   private String getIpFromURL(URL url)
1219
   {
1220
	   String ip = null;
1221
	   try
1222
	   {
1223
	      InetAddress address = InetAddress.getByName(url.getHost());
1224
	      ip = address.getHostAddress();
1225
	   }
1226
	   catch(UnknownHostException e)
1227
	   {
1228
		   logMetacat.error("Error in get ip address for host: "
1229
                   +e.getMessage());
1230
	   }
1231

    
1232
	   return ip;
1233
   }
1234
  
1235
}
1236

    
(60-60/66)