Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: tao $'
9
 *     '$Date: 2008-05-19 18:46:55 -0700 (Mon, 19 May 2008) $'
10
 * '$Revision: 3898 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat;
28

    
29
import edu.ucsb.nceas.dbadapter.AbstractDatabase;
30
import java.sql.*;
31
import java.util.*;
32
import java.util.Date;
33
import java.lang.Thread;
34
import java.io.*;
35
import java.net.*;
36
import java.text.*;
37

    
38
import org.apache.log4j.Logger;
39
import org.xml.sax.AttributeList;
40
import org.xml.sax.ContentHandler;
41
import org.xml.sax.DTDHandler;
42
import org.xml.sax.EntityResolver;
43
import org.xml.sax.ErrorHandler;
44
import org.xml.sax.InputSource;
45
import org.xml.sax.XMLReader;
46
import org.xml.sax.SAXException;
47
import org.xml.sax.SAXParseException;
48
import org.xml.sax.helpers.XMLReaderFactory;
49
import org.xml.sax.helpers.DefaultHandler;
50

    
51

    
52

    
53
/**
54
 * This class handles deltaT replication checking.  Whenever this TimerTask
55
 * is fired it checks each server in xml_replication for updates and updates
56
 * the local db as needed.
57
 */
58
public class ReplicationHandler extends TimerTask
59
{
60
  int serverCheckCode = 1;
61
  MetaCatUtil util = new MetaCatUtil();
62
  ReplicationServerList serverList = null;
63
  //PrintWriter out;
64
  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
65
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
66
  private static int DOCINSERTNUMBER = 1;
67
  private static int DOCERRORNUMBER  = 1;
68
  private static int REVINSERTNUMBER = 1;
69
  private static int REVERRORNUMBER  = 1;
70
  public ReplicationHandler()
71
  {
72
    //this.out = o;
73
    serverList = new ReplicationServerList();
74
  }
75

    
76
  public ReplicationHandler(int serverCheckCode)
77
  {
78
    //this.out = o;
79
    this.serverCheckCode = serverCheckCode;
80
    serverList = new ReplicationServerList();
81
  }
82

    
83
  /**
84
   * Method that implements TimerTask.run().  It runs whenever the timer is
85
   * fired.
86
   */
87
  public void run()
88
  {
89
    //find out the last_checked time of each server in the server list and
90
    //send a query to each server to see if there are any documents in
91
    //xml_documents with an update_date > last_checked
92
    try
93
    {
94
      //if serverList is null, metacat don't need to replication
95
      if (serverList==null||serverList.isEmpty())
96
      {
97
        return;
98
      }
99
      updateCatalog();
100
      update();
101
      //conn.close();
102
    }//try
103
    catch (Exception e)
104
    {
105
      logMetacat.error("Error in replicationHandler.run():"
106
                                                    +e.getMessage());
107
      e.printStackTrace();
108
    }//catch
109
  }
110

    
111
  /**
112
   * Method that uses revision taging for replication instead of update_date.
113
   */
114
  private void update()
115
  {
116
    /*
117
     Pseudo-algorithm
118
     - request a doc list from each server in xml_replication
119
     - check the rev number of each of those documents agains the
120
       documents in the local database
121
     - pull any documents that have a lesser rev number on the local server
122
       from the remote server
123
     - delete any documents that still exist in the local xml_documents but
124
       are in the deletedDocuments tag of the remote host response.
125
     - update last_checked to keep track of the last time it was checked.
126
       (this info is theoretically not needed using this system but probably
127
       should be kept anyway)
128
    */
129

    
130
    ReplicationServer replServer = null; // Variable to store the
131
                                        // ReplicationServer got from
132
                                        // Server list
133
    String server = null; // Variable to store server name
134
    String update;
135
    Vector responses = new Vector();
136
    URL u;
137

    
138

    
139
    
140
    //Check for every server in server list to get updated list and put
141
    // them in to response
142
    for (int i=0; i<serverList.size(); i++)
143
    {
144
    	
145
        // Get ReplicationServer object from server list
146
        replServer = serverList.serverAt(i);
147
        // Get server name from ReplicationServer object
148
        server = replServer.getServerName().trim();
149
        String result = null;
150
        MetacatReplication.replLog("full update started to: " + server);
151
        // Send command to that server to get updated docid information
152
        try
153
        {
154
          u = new URL("https://" + server + "?server="
155
          +util.getLocalReplicationServerName()+"&action=update");
156
          logMetacat.info("Sending infomation " +u.toString());
157
          result = MetacatReplication.getURLContent(u);
158
        }
159
        catch (Exception e)
160
        {
161
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
162
                          "for server " + server + " because "+e.getMessage());
163
          logMetacat.error( "Failed to get updated doc list "+
164
                       "for server " + server + " because "+e.getMessage());
165
          continue;
166
        }
167

    
168
        logMetacat.info("docid: "+server+" "+result);
169
        //check if result have error or not, if has skip it.
170
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
171
        {
172
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
173
                          "for server " + server + " because "+result);
174
          logMetacat.info( "Failed to get updated doc list "+
175
                       "for server " + server + " because "+result);
176
          continue;
177
        }
178
        //Add result to vector
179
        responses.add(result);
180
    }
181

    
182
    //make sure that there is updated file list
183
    //If response is null, metacat don't need do anything
184
    if (responses==null || responses.isEmpty())
185
    {
186
        MetacatReplication.replErrorLog("No updated doc list for "+
187
                           "every server and failed to replicate");
188
        logMetacat.info( "No updated doc list for "+
189
                           "every server and failed to replicate");
190
        return;
191
    }
192

    
193

    
194
    logMetacat.info("Responses from remote metacat about updated "+
195
                   "document information: "+ responses.toString());
196
    // go through response vector(it contains updated vector and delete vector
197
    for(int i=0; i<responses.size(); i++)
198
    {
199
    	XMLReader parser;
200
    	ReplMessageHandler message = new ReplMessageHandler();
201
    	try
202
        {
203
          parser = initParser(message);
204
        }
205
        catch (Exception e)
206
        {
207
          MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
208
                                     " initParser for message and" +e.getMessage());
209
          logMetacat.error("Failed to replicate becaue couldn't " +
210
                                " initParser for message and " +e.getMessage());
211
           // stop replication
212
           return;
213
        }
214
    	
215
        try
216
        {
217
          parser.parse(new InputSource(
218
                     new StringReader(
219
                     (String)(responses.elementAt(i)))));
220
        }
221
        catch(Exception e)
222
        {
223
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
224
                           "because "+ e.getMessage());
225
          logMetacat.error("Couldn't parse one responses "+
226
                                   "because "+ e.getMessage());
227
          continue;
228
        }
229
        //v is the list of updated documents
230
        Vector updateList = new Vector(message.getUpdatesVect());
231
        MetacatReplication.replLog("The document list size is "+updateList.size()+ " from "+message.getServerName());
232
        //System.out.println("v: " + v.toString());
233
        //d is the list of deleted documents
234
        Vector deleteList = new Vector(message.getDeletesVect());
235
        //System.out.println("d: " + d.toString());
236
        logMetacat.info("Update vector size: "+ updateList.size()+" from "+message.getServerName());
237
        logMetacat.info("Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
238
        MetacatReplication.replLog("The delete document list size is "+deleteList.size()+" from "+message.getServerName());
239
        // go though every element in updated document vector
240
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
241
        //handle deleted docs
242
        for(int k=0; k<deleteList.size(); k++)
243
        { //delete the deleted documents;
244
          Vector w = new Vector((Vector)deleteList.elementAt(k));
245
          String docId = (String)w.elementAt(0);
246
          try
247
          {
248
            handleDeleteSingleDocument(docId, server);
249
          }
250
          catch (Exception ee)
251
          {
252
            continue;
253
          }
254
        }//for delete docs
255
        
256
        // handle replicate doc in xml_revision
257
        Vector revisionList = new Vector(message.getRevisionsVect());
258
        MetacatReplication.replLog("The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
259
        logMetacat.info("The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
260
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
261
        DOCINSERTNUMBER = 1;
262
        DOCERRORNUMBER  = 1;
263
        REVINSERTNUMBER = 1;
264
        REVERRORNUMBER  = 1;
265
    }//for response
266

    
267
    //updated last_checked
268
    for (int i=0;i<serverList.size(); i++)
269
    {
270
       // Get ReplicationServer object from server list
271
       replServer = serverList.serverAt(i);
272
       try
273
       {
274
         updateLastCheckTimeForSingleServer(replServer);
275
       }
276
       catch(Exception e)
277
       {
278
         continue;
279
       }
280
    }//for
281

    
282
  }//update
283

    
284
  /* Handle replicate single xml document*/
285
  private void handleSingleXMLDocument(String remoteserver, String actions,
286
                                       String accNumber, String tableName)
287
               throws Exception
288
  {
289
    DBConnection dbConn = null;
290
    int serialNumber = -1;
291
    try
292
    {
293
      // Get DBConnection from pool
294
      dbConn=DBConnectionPool.
295
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
296
      serialNumber=dbConn.getCheckOutSerialNumber();
297
      //if the document needs to be updated or inserted, this is executed
298
      String readDocURLString = "https://" + remoteserver + "?server="+
299
              util.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
300
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
301
      URL u = new URL(readDocURLString);
302

    
303
      // Get docid content
304
      String newxmldoc = MetacatReplication.getURLContent(u);
305
      // If couldn't get skip it
306
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
307
      {
308
         throw new Exception(newxmldoc);
309
      }
310
      //logMetacat.info("xml documnet:");
311
      //logMetacat.info(newxmldoc);
312

    
313
      // Try get the docid info from remote server
314
      DocInfoHandler dih = new DocInfoHandler();
315
      XMLReader docinfoParser = initParser(dih);
316
      String docInfoURLStr = "https://" + remoteserver +
317
                       "?server="+util.getLocalReplicationServerName()+
318
                       "&action=getdocumentinfo&docid="+accNumber;
319
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
320
      URL docinfoUrl = new URL(docInfoURLStr);
321
      logMetacat.info("Sending message: " +
322
                                                  docinfoUrl.toString());
323
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
324
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
325
      Hashtable docinfoHash = dih.getDocInfo();
326
      // Get home server of the docid
327
      String docHomeServer = (String)docinfoHash.get("home_server");
328
      logMetacat.info("doc home server in repl: "+docHomeServer);
329
      String createdDate = (String)docinfoHash.get("date_created");
330
      String updatedDate = (String)docinfoHash.get("date_updated");
331
      //docid should include rev number too
332
      /*String accnum=docId+util.getOption("accNumSeparator")+
333
                                              (String)docinfoHash.get("rev");*/
334
      logMetacat.info("docid in repl: "+accNumber);
335
      String docType = (String)docinfoHash.get("doctype");
336
      logMetacat.info("doctype in repl: "+docType);
337

    
338
      String parserBase = null;
339
      // this for eml2 and we need user eml2 parser
340
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
341
      {
342
         parserBase = DocumentImpl.EML200;
343
      }
344
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
345
      {
346
        parserBase = DocumentImpl.EML200;
347
      }
348
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
349
      {
350
        parserBase = DocumentImpl.EML210;
351
      }
352
      // Write the document into local host
353
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
354
      String newDocid = wrapper.writeReplication(dbConn,
355
                              new StringReader(newxmldoc),
356
                              (String)docinfoHash.get("public_access"),
357
                              null,  /* the dtd text */
358
                              actions,
359
                              accNumber,
360
                              (String)docinfoHash.get("user_owner"),
361
                              null, /* null for groups[] */
362
                              docHomeServer,
363
                              remoteserver, tableName, true,// true is for time replication 
364
                              createdDate,
365
                              updatedDate);
366
      logMetacat.info("Successfully replicated doc " + accNumber);
367
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
368
      {
369
        MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
370
                                     " into "+tableName + " from " +
371
                                         remoteserver);
372
        DOCINSERTNUMBER++;
373
      }
374
      else
375
      {
376
          MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
377
                  " into "+tableName + " from " +
378
                      remoteserver);
379
          REVINSERTNUMBER++;
380
      }
381
      String ip = getIpFromURL(u);
382
      EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
383
      
384

    
385
    }//try
386
    catch(Exception e)
387
    {
388
        
389
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
390
        {
391
          MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
392
                                       " into "+tableName + " from " +
393
                                           remoteserver + " because "+e.getMessage());
394
          DOCERRORNUMBER++;
395
        }
396
        else
397
        {
398
            MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
399
                    " into "+tableName + " from " +
400
                        remoteserver +" because "+e.getMessage());
401
            REVERRORNUMBER++;
402
        }
403
       
404
      logMetacat.error("Failed to write doc " + accNumber +
405
                                      " into db because " +e.getMessage());
406
      throw e;
407
    }
408
    finally
409
    {
410
       //return DBConnection
411
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
412
    }//finally
413
  }
414

    
415

    
416

    
417
  /* Handle replicate single xml document*/
418
  private void handleSingleDataFile(String remoteserver, String actions,
419
                                    String accNumber, String tableName)
420
               throws Exception
421
  {
422
    logMetacat.info("Try to replicate data file: "+accNumber);
423
    DBConnection dbConn = null;
424
    int serialNumber = -1;
425
    try
426
    {
427
      // Get DBConnection from pool
428
      dbConn=DBConnectionPool.
429
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
430
      serialNumber=dbConn.getCheckOutSerialNumber();
431
      // Try get docid info from remote server
432
      DocInfoHandler dih = new DocInfoHandler();
433
      XMLReader docinfoParser = initParser(dih);
434
      String docInfoURLString = "https://" + remoteserver +
435
                  "?server="+util.getLocalReplicationServerName()+
436
                  "&action=getdocumentinfo&docid="+accNumber;
437
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
438
      URL docinfoUrl = new URL(docInfoURLString);
439

    
440
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
441
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
442
      Hashtable docinfoHash = dih.getDocInfo();
443
      // Get doicd owner
444
      String user = (String)docinfoHash.get("user_owner");
445
      // Get docid name (such as acl or dataset)
446
      String docName = (String)docinfoHash.get("docname");
447
      // Get doc type (eml public id)
448
      String docType = (String)docinfoHash.get("doctype");
449
      // Get docid home sever. it might be different to remoteserver
450
      // becuause of hub feature
451
      String docHomeServer = (String)docinfoHash.get("home_server");
452
      String createdDate = (String)docinfoHash.get("date_created");
453
      String updatedDate = (String)docinfoHash.get("date_updated");
454
      //docid should include rev number too
455
      /*String accnum=docId+util.getOption("accNumSeparator")+
456
                                              (String)docinfoHash.get("rev");*/
457

    
458

    
459
      String datafilePath = util.getOption("datafilepath");
460
      // Get data file content
461
      String readDataURLString = "https://" + remoteserver + "?server="+
462
                                        util.getLocalReplicationServerName()+
463
                                            "&action=readdata&docid="+accNumber;
464
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
465
      URL u = new URL(readDataURLString);
466
      InputStream input = u.openStream();
467
      //register data file into xml_documents table and wite data file
468
      //into file system
469
      if ( input != null)
470
      {
471
        DocumentImpl.writeDataFileInReplication(input,
472
                                                datafilePath,
473
                                                docName,docType,
474
                                                accNumber, user,
475
                                                docHomeServer,
476
                                                remoteserver,
477
                                                tableName,
478
                                                true, //true means timed replication
479
                                                createdDate,
480
                                                updatedDate);
481
                                         
482
        logMetacat.info("Successfully to write datafile " + accNumber);
483
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
484
                                    remoteserver);*/
485
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
486
        {
487
          MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote data file" + accNumber +
488
                                       " into "+tableName + " from " +
489
                                           remoteserver);
490
          DOCINSERTNUMBER++;
491
        }
492
        else
493
        {
494
            MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote data file" + accNumber +
495
                    " into "+tableName + " from " +
496
                        remoteserver);
497
            REVINSERTNUMBER++;
498
        }
499
        String ip = getIpFromURL(u);
500
        EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
501
        
502
      }//if
503
      else
504
      {
505
         logMetacat.info("Couldn't open the data file: " + accNumber);
506
         throw new Exception("Couldn't open the data file: " + accNumber);
507
      }//else
508

    
509
    }//try
510
    catch(Exception e)
511
    {
512
      /*MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
513
                                      " because " +e.getMessage());*/
514
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
515
      {
516
        MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write data file " + accNumber +
517
                                     " into "+tableName + " from " +
518
                                         remoteserver + " because "+e.getMessage());
519
        DOCERRORNUMBER++;
520
      }
521
      else
522
      {
523
          MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write data file" + accNumber +
524
                  " into "+tableName + " from " +
525
                      remoteserver +" because "+e.getMessage());
526
          REVERRORNUMBER++;
527
      }
528
      logMetacat.error("Failed to try wrote datafile " + accNumber +
529
                                      " because " +e.getMessage());
530
      throw e;
531
    }
532
    finally
533
    {
534
       //return DBConnection
535
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
536
    }//finally
537
  }
538

    
539

    
540

    
541
  /* Handle delete single document*/
542
  private void handleDeleteSingleDocument(String docId, String notifyServer)
543
               throws Exception
544
  {
545
    logMetacat.info("Try delete doc: "+docId);
546
    DBConnection dbConn = null;
547
    int serialNumber = -1;
548
    try
549
    {
550
      // Get DBConnection from pool
551
      dbConn=DBConnectionPool.
552
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
553
      serialNumber=dbConn.getCheckOutSerialNumber();
554
      if(!alreadyDeleted(docId))
555
      {
556

    
557
         //because delete method docid should have rev number
558
         //so we just add one for it. This rev number is no sence.
559
         String accnum=docId+util.getOption("accNumSeparator")+"1";
560
         //System.out.println("accnum: "+accnum);
561
         DocumentImpl.delete(accnum, null, null, notifyServer);
562
         logMetacat.info("Successfully deleted doc " + docId);
563
         MetacatReplication.replLog("Doc " + docId + " deleted");
564
         URL u = new URL("https://"+notifyServer);
565
         String ip = getIpFromURL(u);
566
         EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, docId, "delete");
567
      }
568

    
569
    }//try
570
    catch(Exception e)
571
    {
572
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
573
                                      " in db because " +e.getMessage());
574
      logMetacat.error("Failed to delete doc " + docId +
575
                                 " in db because because " + e.getMessage());
576
      throw e;
577
    }
578
    finally
579
    {
580
       //return DBConnection
581
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
582
    }//finally
583
  }
584

    
585
  /* Handle updateLastCheckTimForSingleServer*/
586
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
587
                                                  throws Exception
588
  {
589
    String server = repServer.getServerName();
590
    DBConnection dbConn = null;
591
    int serialNumber = -1;
592
    PreparedStatement pstmt = null;
593
    try
594
    {
595
      // Get DBConnection from pool
596
      dbConn=DBConnectionPool.
597
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
598
      serialNumber=dbConn.getCheckOutSerialNumber();
599

    
600
      logMetacat.info("Try to update last_check for server: "+server);
601
      // Get time from remote server
602
      URL dateurl = new URL("https://" + server + "?server="+
603
      util.getLocalReplicationServerName()+"&action=gettime");
604
      String datexml = MetacatReplication.getURLContent(dateurl);
605
      logMetacat.info("datexml: "+datexml);
606
      if (datexml!=null && !datexml.equals(""))
607
      {
608
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
609
         StringBuffer sql = new StringBuffer();
610
         /*sql.append("update xml_replication set last_checked = to_date('");
611
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
612
         sql.append("server like '").append(server).append("'");*/
613
         sql.append("update xml_replication set last_checked = ");
614
         sql.append(dbAdapter.toDate(datestr, "MM/DD/YY HH24:MI:SS"));
615
         sql.append(" where server like '").append(server).append("'");
616
         pstmt = dbConn.prepareStatement(sql.toString());
617

    
618
         pstmt.executeUpdate();
619
         dbConn.commit();
620
         pstmt.close();
621
         logMetacat.info("last_checked updated to "+datestr+" on "
622
                                      + server);
623
      }//if
624
      else
625
      {
626

    
627
         logMetacat.info("Failed to update last_checked for server "  +
628
                                  server + " in db because couldn't get time "
629
                                  );
630
         throw new Exception("Couldn't get time for server "+ server);
631
      }
632

    
633
    }//try
634
    catch(Exception e)
635
    {
636

    
637
      logMetacat.error("Failed to update last_checked for server " +
638
                                server + " in db because because " +
639
                                e.getMessage());
640
      throw e;
641
    }
642
    finally
643
    {
644
       //return DBConnection
645
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
646
    }//finally
647
  }
648

    
649

    
650

    
651
  /**
652
   * updates xml_catalog with entries from other servers.
653
   */
654
  private void updateCatalog()
655
  {
656
    logMetacat.info("Start of updateCatalog");
657
    // ReplicationServer object in server list
658
    ReplicationServer replServer = null;
659
    PreparedStatement pstmt = null;
660
    String server = null;
661

    
662

    
663
    // Go through each ReplicationServer object in sererlist
664
    for (int j=0; j<serverList.size(); j++)
665
    {
666
      Vector remoteCatalog = new Vector();
667
      Vector publicId = new Vector();
668
      try
669
      {
670
        // Get ReplicationServer object from server list
671
        replServer = serverList.serverAt(j);
672
        // Get server name from the ReplicationServer object
673
        server = replServer.getServerName();
674
        // Try to get catalog
675
        URL u = new URL("https://" + server + "?server="+
676
        util.getLocalReplicationServerName()+"&action=getcatalog");
677
        logMetacat.info("sending message " + u.toString());
678
        String catxml = MetacatReplication.getURLContent(u);
679

    
680
        // Make sure there are not error, no empty string
681
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
682
        {
683
          throw new Exception("Couldn't get catalog list form server " +server);
684
        }
685
        logMetacat.info("catxml: " + catxml);
686
        CatalogMessageHandler cmh = new CatalogMessageHandler();
687
        XMLReader catparser = initParser(cmh);
688
        catparser.parse(new InputSource(new StringReader(catxml)));
689
        //parse the returned catalog xml and put it into a vector
690
        remoteCatalog = cmh.getCatalogVect();
691

    
692
        // Makse sure remoteCatalog is not empty
693
        if (remoteCatalog.isEmpty())
694
        {
695
          throw new Exception("Couldn't get catalog list form server " +server);
696
        }
697

    
698
        String localcatxml = MetacatReplication.getCatalogXML();
699

    
700
        // Make sure local catalog is no empty
701
        if (localcatxml==null||localcatxml.equals(""))
702
        {
703
          throw new Exception("Couldn't get catalog list form server " +server);
704
        }
705

    
706
        cmh = new CatalogMessageHandler();
707
        catparser = initParser(cmh);
708
        catparser.parse(new InputSource(new StringReader(localcatxml)));
709
        Vector localCatalog = cmh.getCatalogVect();
710

    
711
        //now we have the catalog from the remote server and this local server
712
        //we now need to compare the two and merge the differences.
713
        //the comparison is base on the public_id fields which is the 4th
714
        //entry in each row vector.
715
        publicId = new Vector();
716
        for(int i=0; i<localCatalog.size(); i++)
717
        {
718
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
719
          logMetacat.info("v1: " + v.toString());
720
          publicId.add(new String((String)v.elementAt(3)));
721
          //System.out.println("adding " + (String)v.elementAt(3));
722
        }
723
      }//try
724
      catch (Exception e)
725
      {
726
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
727
                                    server + " because " +e.getMessage());
728
        logMetacat.error("Failed to update catalog for server "+
729
                                    server + " because " +e.getMessage());
730
      }//catch
731

    
732
      for(int i=0; i<remoteCatalog.size(); i++)
733
      {
734
         // DConnection
735
        DBConnection dbConn = null;
736
        // DBConnection checkout serial number
737
        int serialNumber = -1;
738
        try
739
        {
740
            dbConn=DBConnectionPool.
741
                  getDBConnection("ReplicationHandler.updateCatalog");
742
            serialNumber=dbConn.getCheckOutSerialNumber();
743
            Vector v = (Vector)remoteCatalog.elementAt(i);
744
            //System.out.println("v2: " + v.toString());
745
            //System.out.println("i: " + i);
746
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
747
            //System.out.println("publicID: " + publicId.toString());
748
            logMetacat.info
749
                              ("v.elementAt(3): " + (String)v.elementAt(3));
750
           if(!publicId.contains(v.elementAt(3)))
751
           { //so we don't have this public id in our local table so we need to
752
             //add it.
753
             //System.out.println("in if");
754
             StringBuffer sql = new StringBuffer();
755
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
756
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
757
             sql.append("?,?)");
758
             //System.out.println("sql: " + sql.toString());
759
             pstmt = dbConn.prepareStatement(sql.toString());
760
             pstmt.setString(1, (String)v.elementAt(0));
761
             pstmt.setString(2, (String)v.elementAt(1));
762
             pstmt.setString(3, (String)v.elementAt(2));
763
             pstmt.setString(4, (String)v.elementAt(3));
764
             pstmt.setString(5, (String)v.elementAt(4));
765
             pstmt.execute();
766
             pstmt.close();
767
             MetacatReplication.replLog("Success fully to insert new publicid "+
768
                               (String)v.elementAt(3) + " from server"+server);
769
             logMetacat.info("Success fully to insert new publicid "+
770
                             (String)v.elementAt(3) + " from server" +server);
771
           }
772
        }
773
        catch(Exception e)
774
        {
775
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
776
                                    server + " because " +e.getMessage());
777
           logMetacat.error("Failed to update catalog for server "+
778
                                    server + " because " +e.getMessage());
779
        }//catch
780
        finally
781
        {
782
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
783
        }//finall
784
      }//for remote catalog
785
    }//for server list
786
    logMetacat.info("End of updateCatalog");
787
  }
788

    
789
  /**
790
   * Method that returns true if docid has already been "deleted" from metacat.
791
   * This method really implements a truth table for deleted documents
792
   * The table is (a docid in one of the tables is represented by the X):
793
   * xml_docs      xml_revs      deleted?
794
   * ------------------------------------
795
   *   X             X             FALSE
796
   *   X             _             FALSE
797
   *   _             X             TRUE
798
   *   _             _             TRUE
799
   */
800
  private static boolean alreadyDeleted(String docid) throws Exception
801
  {
802
    DBConnection dbConn = null;
803
    int serialNumber = -1;
804
    PreparedStatement pstmt = null;
805
    try
806
    {
807
      dbConn=DBConnectionPool.
808
                  getDBConnection("ReplicationHandler.alreadyDeleted");
809
      serialNumber=dbConn.getCheckOutSerialNumber();
810
      boolean xml_docs = false;
811
      boolean xml_revs = false;
812

    
813
      StringBuffer sb = new StringBuffer();
814
      sb.append("select docid from xml_revisions where docid like '");
815
      sb.append(docid).append("'");
816
      pstmt = dbConn.prepareStatement(sb.toString());
817
      pstmt.execute();
818
      ResultSet rs = pstmt.getResultSet();
819
      boolean tablehasrows = rs.next();
820
      if(tablehasrows)
821
      {
822
        xml_revs = true;
823
      }
824

    
825
      sb = new StringBuffer();
826
      sb.append("select docid from xml_documents where docid like '");
827
      sb.append(docid).append("'");
828
      pstmt.close();
829
      pstmt = dbConn.prepareStatement(sb.toString());
830
      //increase usage count
831
      dbConn.increaseUsageCount(1);
832
      pstmt.execute();
833
      rs = pstmt.getResultSet();
834
      tablehasrows = rs.next();
835
      pstmt.close();
836
      if(tablehasrows)
837
      {
838
        xml_docs = true;
839
      }
840

    
841
      if(xml_docs && xml_revs)
842
      {
843
        return false;
844
      }
845
      else if(xml_docs && !xml_revs)
846
      {
847
        return false;
848
      }
849
      else if(!xml_docs && xml_revs)
850
      {
851
        return true;
852
      }
853
      else if(!xml_docs && !xml_revs)
854
      {
855
        return true;
856
      }
857
    }
858
    catch(Exception e)
859
    {
860
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
861
                          e.getMessage());
862
      throw e;
863
    }
864
    finally
865
    {
866
      try
867
      {
868
        pstmt.close();
869
      }//try
870
      catch (SQLException ee)
871
      {
872
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
873
                          "to close pstmt: "+ee.getMessage());
874
        throw ee;
875
      }//catch
876
      finally
877
      {
878
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
879
      }//finally
880
    }//finally
881
    return false;
882
  }
883

    
884

    
885
  /**
886
   * Method to initialize the message parser
887
   */
888
  public static XMLReader initParser(DefaultHandler dh)
889
          throws Exception
890
  {
891
    XMLReader parser = null;
892

    
893
    try {
894
      ContentHandler chandler = dh;
895

    
896
      // Get an instance of the parser
897
      MetaCatUtil util = new MetaCatUtil();
898
      String parserName = util.getOption("saxparser");
899
      parser = XMLReaderFactory.createXMLReader(parserName);
900

    
901
      // Turn off validation
902
      parser.setFeature("http://xml.org/sax/features/validation", false);
903

    
904
      parser.setContentHandler((ContentHandler)chandler);
905
      parser.setErrorHandler((ErrorHandler)chandler);
906

    
907
    } catch (Exception e) {
908
      throw e;
909
    }
910

    
911
    return parser;
912
  }
913

    
914
  /**
915
   * This method will combinate given time string(in short format) to
916
   * current date. If the given time (e.g 10:00 AM) passed the current time
917
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
918
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
919
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
920
   * 10:00 AM Aug 21, 2005.
921
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
922
   * @return
923
   * @throws Exception
924
   */
925
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
926
  {
927
     Date givenDate = parseTime(givenTime);
928
     Date newDate = null;
929
     Date now = new Date();
930
     String currentTimeString = getTimeString(now);
931
     Date currentTime = parseTime(currentTimeString); 
932
     if ( currentTime.getTime() >= givenDate.getTime())
933
     {
934
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
935
        String dateAndTime = getDateString(now) + " " + givenTime;
936
        Date combinationDate = parseDateTime(dateAndTime);
937
        // new date should plus 24 hours to make is the second day
938
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
939
     }
940
     else
941
     {
942
         logMetacat.info("Today haven't pass the given time, we should it as today");
943
         String dateAndTime = getDateString(now) + " " + givenTime;
944
         newDate = parseDateTime(dateAndTime);
945
     }
946
     logMetacat.warn("final setting time is "+ newDate.toString());
947
     return newDate;
948
  }
949

    
950
  /*
951
   * parse a given string to Time in short format.
952
   * For example, given time is 10:00 AM, the date will be return as
953
   * Jan 1 1970, 10:00 AM
954
   */
955
  private static Date parseTime(String timeString) throws Exception
956
  {
957
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
958
    Date time = format.parse(timeString); 
959
    logMetacat.info("Date string is after parse a time string "
960
                              +time.toString());
961
    return time;
962

    
963
  }
964
  
965
  /*
966
   * Pasre a given string to date and time. Date format is long and time
967
   * format is short.
968
   */
969
  private static Date parseDateTime(String timeString) throws Exception
970
  {
971
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
972
    Date time = format.parse(timeString);
973
    logMetacat.info("Date string is after parse a time string "+
974
                             time.toString());
975
    return time;
976
  }
977
  
978
  /*
979
   * Get a date string from a Date object. The date format will be long
980
   */
981
  private static String getDateString(Date now)
982
  {
983
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
984
     String s = df.format(now);
985
     logMetacat.info("Today is " + s);
986
     return s;
987
  }
988
  
989
  /*
990
   * Get a time string from a Date object, the time format will be short
991
   */
992
  private static String getTimeString(Date now)
993
  {
994
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
995
     String s = df.format(now);
996
     logMetacat.info("Time is " + s);
997
     return s;
998
  }
999
  
1000
  
1001
  /*
1002
   * This method will go through the docid list both in xml_Documents table
1003
   * and in xml_revisions table
1004
   * @author tao
1005
   */
1006
   private void handleDocList(Vector docList, String tableName)
1007
   {
1008
       boolean dataFile=false;
1009
       for(int j=0; j<docList.size(); j++)
1010
       {
1011
         //initial dataFile is false
1012
         dataFile=false;
1013
         //w is information for one document, information contain
1014
         //docid, rev, server or datafile.
1015
         Vector w = new Vector((Vector)(docList.elementAt(j)));
1016
         //Check if the vector w contain "datafile"
1017
         //If it has, this document is data file
1018
         if (w.contains((String)util.getOption("datafileflag")))
1019
         {
1020
           dataFile=true;
1021
         }
1022
         //System.out.println("w: " + w.toString());
1023
         // Get docid
1024
         String docid = (String)w.elementAt(0);
1025
         logMetacat.info("docid: " + docid);
1026
         // Get revision number
1027
         int rev = Integer.parseInt((String)w.elementAt(1));
1028
         logMetacat.info("rev: " + rev);
1029
         // Get remote server name (it is may not be doc homeserver because
1030
         // the new hub feature
1031
         String remoteServer = (String)w.elementAt(2);
1032
         remoteServer = remoteServer.trim();
1033

    
1034
         try
1035
         {
1036
             if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
1037
             {
1038
                 handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1039
             }
1040
             else if (tableName.equals(DocumentImpl.REVISIONTABLE))
1041
             {
1042
                 handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1043
             }
1044
             else
1045
             {
1046
                 continue;
1047
             }
1048
                 
1049
         }
1050
         catch (Exception e)
1051
         {
1052
             logMetacat.error("error to handle update doc in "+ 
1053
                     tableName +" in time replication"+e.getMessage());
1054
             continue;
1055
         }
1056
        
1057
       }//for update docs
1058

    
1059
   }
1060
   
1061
   /*
1062
    * This method will handle doc in xml_documents table.
1063
    */
1064
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1065
                                        throws Exception
1066
   {
1067
       // compare the update rev and local rev to see what need happen
1068
       int localrev = -1;
1069
       String action = null;
1070
       boolean flag = false;
1071
       try
1072
       {
1073
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1074
       }
1075
       catch (SQLException e)
1076
       {
1077
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1078
                                " be found because " + e.getMessage());
1079
         MetacatReplication.replErrorLog(""+DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1080
                 "written because error happend to find it's local revision");
1081
         DOCERRORNUMBER++;
1082
         throw new Exception (e.getMessage());
1083
       }
1084
       logMetacat.info("Local rev for docid "+ docid + " is "+
1085
                               localrev);
1086

    
1087
       //check the revs for an update because this document is in the
1088
       //local DB, it might be out of date.
1089
       if (localrev == -1)
1090
       {
1091
          // check if the revision is in the revision table
1092
         Vector localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1093
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1094
         {
1095
             // this version was deleted, so don't need replicate
1096
             flag = false;
1097
         }
1098
         else
1099
         {
1100
           //insert this document as new because it is not in the local DB
1101
           action = "INSERT";
1102
           flag = true;
1103
         }
1104
       }
1105
       else
1106
       {
1107
         if(localrev == rev)
1108
         {
1109
           // Local meatacat has the same rev to remote host, don't need
1110
           // update and flag set false
1111
           flag = false;
1112
         }
1113
         else if(localrev < rev)
1114
         {
1115
           //this document needs to be updated so send an read request
1116
           action = "UPDATE";
1117
           flag = true;
1118
         }
1119
       }
1120
       
1121
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1122
       // this is non-data file
1123
       if(flag && !dataFile)
1124
       {
1125
         try
1126
         {
1127
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1128
         }
1129
         catch(Exception e)
1130
         {
1131
           // skip this document
1132
           throw e;
1133
         }
1134
       }//if for non-data file
1135

    
1136
        // this is for data file
1137
       if(flag && dataFile)
1138
       {
1139
         try
1140
         {
1141
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1142
         }
1143
         catch(Exception e)
1144
         {
1145
           // skip this datafile
1146
           throw e;
1147
         }
1148

    
1149
       }//for datafile
1150
   }
1151
   
1152
   /*
1153
    * This method will handle doc in xml_documents table.
1154
    */
1155
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1156
                                        throws Exception
1157
   {
1158
       // compare the update rev and local rev to see what need happen
1159
       logMetacat.info("In handle repliation revsion table");
1160
       logMetacat.info("the docid is "+ docid);
1161
       logMetacat.info("The rev is "+rev);
1162
       Vector localrev = null;
1163
       String action = "INSERT";
1164
       boolean flag = false;
1165
       try
1166
       {
1167
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1168
       }
1169
       catch (SQLException e)
1170
       {
1171
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1172
                                " be found because " + e.getMessage());
1173
         MetacatReplication.replErrorLog(""+REVERRORNUMBER+" Docid "+ docid + " could not be "+
1174
                 "written because error happend to find it's local revision");
1175
         REVERRORNUMBER++;
1176
         throw new Exception (e.getMessage());
1177
       }
1178
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1179
                               localrev.toString());
1180
       
1181
       // if the rev is not in the xml_revision, we need insert it
1182
       if (!localrev.contains(new Integer(rev)))
1183
       {
1184
           flag = true;    
1185
       }
1186
     
1187
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1188
       // this is non-data file
1189
       if(flag && !dataFile)
1190
       {
1191
         try
1192
         {
1193
           
1194
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1195
         }
1196
         catch(Exception e)
1197
         {
1198
           // skip this document
1199
           throw e;
1200
         }
1201
       }//if for non-data file
1202

    
1203
        // this is for data file
1204
       if(flag && dataFile)
1205
       {
1206
         try
1207
         {
1208
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1209
         }
1210
         catch(Exception e)
1211
         {
1212
           // skip this datafile
1213
           throw e;
1214
         }
1215

    
1216
       }//for datafile
1217
   }
1218
   
1219
   /*
1220
    * Return a ip address for given url
1221
    */
1222
   private String getIpFromURL(URL url)
1223
   {
1224
	   String ip = null;
1225
	   try
1226
	   {
1227
	      InetAddress address = InetAddress.getByName(url.getHost());
1228
	      ip = address.getHostAddress();
1229
	   }
1230
	   catch(UnknownHostException e)
1231
	   {
1232
		   logMetacat.error("Error in get ip address for host: "
1233
                   +e.getMessage());
1234
	   }
1235

    
1236
	   return ip;
1237
   }
1238
  
1239
}
1240

    
(59-59/66)