Project

General

Profile

1 522 berkley
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12 669 jones
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 522 berkley
 */
27 2286 tao
28 522 berkley
package edu.ucsb.nceas.metacat;
29
30 1751 tao
import edu.ucsb.nceas.dbadapter.AbstractDatabase;
31 522 berkley
import java.sql.*;
32
import java.util.*;
33 2555 tao
import java.util.Date;
34 2286 tao
import java.lang.Thread;
35 522 berkley
import java.io.*;
36
import java.net.*;
37 543 berkley
import java.text.*;
38 2663 sgarg
39
import org.apache.log4j.Logger;
40 522 berkley
import org.xml.sax.AttributeList;
41
import org.xml.sax.ContentHandler;
42
import org.xml.sax.DTDHandler;
43
import org.xml.sax.EntityResolver;
44
import org.xml.sax.ErrorHandler;
45
import org.xml.sax.InputSource;
46
import org.xml.sax.XMLReader;
47
import org.xml.sax.SAXException;
48
import org.xml.sax.SAXParseException;
49
import org.xml.sax.helpers.XMLReaderFactory;
50 561 berkley
import org.xml.sax.helpers.DefaultHandler;
51 522 berkley
52 561 berkley
53
54 522 berkley
/**
55
 * This class handles deltaT replication checking.  Whenever this TimerTask
56
 * is fired it checks each server in xml_replication for updates and updates
57
 * the local db as needed.
58
 */
59
public class ReplicationHandler extends TimerTask
60
{
61 573 berkley
  int serverCheckCode = 1;
62 522 berkley
  MetaCatUtil util = new MetaCatUtil();
63 2286 tao
  ReplicationServerList serverList = null;
64 2573 tao
  //PrintWriter out;
65 1751 tao
  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
66 2663 sgarg
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
67 2608 tao
68
69 2573 tao
  public ReplicationHandler()
70 522 berkley
  {
71 2573 tao
    //this.out = o;
72 1292 tao
    serverList = new ReplicationServerList();
73 522 berkley
  }
74 2286 tao
75 2573 tao
  public ReplicationHandler(int serverCheckCode)
76 573 berkley
  {
77 2573 tao
    //this.out = o;
78 573 berkley
    this.serverCheckCode = serverCheckCode;
79 1292 tao
    serverList = new ReplicationServerList();
80 573 berkley
  }
81 2286 tao
82 522 berkley
  /**
83 2286 tao
   * Method that implements TimerTask.run().  It runs whenever the timer is
84 522 berkley
   * fired.
85
   */
86
  public void run()
87
  {
88
    //find out the last_checked time of each server in the server list and
89 2286 tao
    //send a query to each server to see if there are any documents in
90 522 berkley
    //xml_documents with an update_date > last_checked
91
    try
92
    {
93 1032 tao
      //if serverList is null, metacat don't need to replication
94
      if (serverList==null||serverList.isEmpty())
95
      {
96
        return;
97
      }
98 1292 tao
      updateCatalog();
99
      update();
100 1217 tao
      //conn.close();
101
    }//try
102 522 berkley
    catch (Exception e)
103
    {
104 2663 sgarg
      logMetacat.error("Error in replicationHandler.run():"
105
                                                    +e.getMessage());
106 1292 tao
      e.printStackTrace();
107 1217 tao
    }//catch
108 522 berkley
  }
109 2286 tao
110 522 berkley
  /**
111 577 berkley
   * Method that uses revision taging for replication instead of update_date.
112
   */
113 1292 tao
  private void update()
114 577 berkley
  {
115
    /*
116
     Pseudo-algorithm
117
     - request a doc list from each server in xml_replication
118 2286 tao
     - check the rev number of each of those documents agains the
119 577 berkley
       documents in the local database
120
     - pull any documents that have a lesser rev number on the local server
121
       from the remote server
122
     - delete any documents that still exist in the local xml_documents but
123
       are in the deletedDocuments tag of the remote host response.
124
     - update last_checked to keep track of the last time it was checked.
125 2286 tao
       (this info is theoretically not needed using this system but probably
126 577 berkley
       should be kept anyway)
127
    */
128 2286 tao
129
    ReplicationServer replServer = null; // Variable to store the
130
                                        // ReplicationServer got from
131 1292 tao
                                        // Server list
132 2298 tao
    String server = null; // Variable to store server name
133 577 berkley
    String update;
134
    ReplMessageHandler message = new ReplMessageHandler();
135
    Vector responses = new Vector();
136
    XMLReader parser;
137
    URL u;
138 2286 tao
139
140 577 berkley
    try
141
    {
142
      parser = initParser(message);
143 1585 tao
    }
144
    catch (Exception e)
145
    {
146
      MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
147
                                 " initParser for message and" +e.getMessage());
148 2663 sgarg
      logMetacat.error("Failed to replicate becaue couldn't " +
149
                            " initParser for message and " +e.getMessage());
150 1585 tao
       // stop replication
151
       return;
152
    }
153
    //Check for every server in server list to get updated list and put
154
    // them in to response
155
    for (int i=0; i<serverList.size(); i++)
156
    {
157 1292 tao
        // Get ReplicationServer object from server list
158
        replServer = serverList.serverAt(i);
159
        // Get server name from ReplicationServer object
160 2578 tao
        server = replServer.getServerName().trim();
161 1585 tao
        String result = null;
162 584 berkley
        MetacatReplication.replLog("full update started to: " + server);
163 1292 tao
        // Send command to that server to get updated docid information
164
        try
165
        {
166 1585 tao
          u = new URL("https://" + server + "?server="
167
          +util.getLocalReplicationServerName()+"&action=update");
168 2663 sgarg
          logMetacat.info("Sending infomation " +u.toString());
169 1292 tao
          result = MetacatReplication.getURLContent(u);
170 1585 tao
        }
171 1292 tao
        catch (Exception e)
172
        {
173 1585 tao
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
174
                          "for server " + server + " because "+e.getMessage());
175 2663 sgarg
          logMetacat.error( "Failed to get updated doc list "+
176
                       "for server " + server + " because "+e.getMessage());
177 1292 tao
          continue;
178 1585 tao
        }
179 2286 tao
180 2663 sgarg
        logMetacat.info("docid: "+server+" "+result);
181 1292 tao
        //check if result have error or not, if has skip it.
182 1609 tao
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
183 1102 tao
        {
184 1585 tao
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
185
                          "for server " + server + " because "+result);
186 2663 sgarg
          logMetacat.info( "Failed to get updated doc list "+
187
                       "for server " + server + " because "+result);
188 1102 tao
          continue;
189
        }
190 1585 tao
        //Add result to vector
191 577 berkley
        responses.add(result);
192 1585 tao
    }
193 2286 tao
194 1585 tao
    //make sure that there is updated file list
195
    //If response is null, metacat don't need do anything
196
    if (responses==null || responses.isEmpty())
197
    {
198
        MetacatReplication.replErrorLog("No updated doc list for "+
199
                           "every server and failed to replicate");
200 2663 sgarg
        logMetacat.info( "No updated doc list for "+
201
                           "every server and failed to replicate");
202 1032 tao
        return;
203 1585 tao
    }
204 2286 tao
205
206 2663 sgarg
    logMetacat.info("Responses from remote metacat about updated "+
207
                   "document information: "+ responses.toString());
208 1585 tao
    // go through response vector(it contains updated vector and delete vector
209
    for(int i=0; i<responses.size(); i++)
210 2286 tao
    {
211 1585 tao
        try
212
        {
213
          parser.parse(new InputSource(
214 577 berkley
                     new StringReader(
215
                     (String)(responses.elementAt(i)))));
216 1585 tao
        }
217
        catch(Exception e)
218
        {
219
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
220
                           "because "+ e.getMessage());
221 2663 sgarg
          logMetacat.error("Couldn't parse one responses "+
222
                                   "because "+ e.getMessage());
223 1585 tao
          continue;
224
        }
225 1037 tao
        //v is the list of updated documents
226 1585 tao
        Vector updateList = new Vector(message.getUpdatesVect());
227 577 berkley
        //System.out.println("v: " + v.toString());
228 1037 tao
        //d is the list of deleted documents
229 1585 tao
        Vector deleteList = new Vector(message.getDeletesVect());
230 1037 tao
        //System.out.println("d: " + d.toString());
231 2663 sgarg
        logMetacat.info("Update vector size: "+ updateList.size());
232
        logMetacat.info("Delete vector size: "+ deleteList.size());
233 1585 tao
        // go though every element in updated document vector
234 2608 tao
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
235 1037 tao
        //handle deleted docs
236 1585 tao
        for(int k=0; k<deleteList.size(); k++)
237 577 berkley
        { //delete the deleted documents;
238 1585 tao
          Vector w = new Vector((Vector)deleteList.elementAt(k));
239
          String docId = (String)w.elementAt(0);
240
          try
241 579 berkley
          {
242 2298 tao
            handleDeleteSingleDocument(docId, server);
243 579 berkley
          }
244 1585 tao
          catch (Exception ee)
245
          {
246
            continue;
247
          }
248 1037 tao
        }//for delete docs
249 2608 tao
250
        // handle replicate doc in xml_revision
251
        Vector revisionList = new Vector(message.getRevisionsVect());
252
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
253 1585 tao
    }//for response
254 2286 tao
255 1585 tao
    //updated last_checked
256
    for (int i=0;i<serverList.size(); i++)
257
    {
258
       // Get ReplicationServer object from server list
259
       replServer = serverList.serverAt(i);
260
       try
261
       {
262
         updateLastCheckTimeForSingleServer(replServer);
263
       }
264
       catch(Exception e)
265
       {
266
         continue;
267
       }
268
    }//for
269 2286 tao
270 1585 tao
  }//update
271 2286 tao
272 1585 tao
  /* Handle replicate single xml document*/
273 2286 tao
  private void handleSingleXMLDocument(String remoteserver, String actions,
274 2641 tao
                                       String accNumber, String tableName)
275 1585 tao
               throws Exception
276
  {
277
    DBConnection dbConn = null;
278
    int serialNumber = -1;
279
    try
280
    {
281
      // Get DBConnection from pool
282
      dbConn=DBConnectionPool.
283
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
284
      serialNumber=dbConn.getCheckOutSerialNumber();
285
      //if the document needs to be updated or inserted, this is executed
286 1600 tao
      String readDocURLString = "https://" + remoteserver + "?server="+
287 2641 tao
              util.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
288 1600 tao
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
289
      URL u = new URL(readDocURLString);
290 2286 tao
291 1585 tao
      // Get docid content
292
      String newxmldoc = MetacatReplication.getURLContent(u);
293
      // If couldn't get skip it
294 1609 tao
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
295 1292 tao
      {
296 1585 tao
         throw new Exception(newxmldoc);
297
      }
298 2663 sgarg
      //logMetacat.info("xml documnet:");
299
      //logMetacat.info(newxmldoc);
300 2286 tao
301 1585 tao
      // Try get the docid info from remote server
302
      DocInfoHandler dih = new DocInfoHandler();
303
      XMLReader docinfoParser = initParser(dih);
304 2286 tao
      String docInfoURLStr = "https://" + remoteserver +
305 1585 tao
                       "?server="+util.getLocalReplicationServerName()+
306 2641 tao
                       "&action=getdocumentinfo&docid="+accNumber;
307 1600 tao
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
308
      URL docinfoUrl = new URL(docInfoURLStr);
309 2663 sgarg
      logMetacat.info("Sending message: " +
310
                                                  docinfoUrl.toString());
311 1585 tao
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
312
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
313
      Hashtable docinfoHash = dih.getDocInfo();
314
      // Get home server of the docid
315
      String docHomeServer = (String)docinfoHash.get("home_server");
316 2663 sgarg
      logMetacat.info("doc home server in repl: "+docHomeServer);
317 2624 tao
      String createdDate = (String)docinfoHash.get("date_created");
318
      String updatedDate = (String)docinfoHash.get("date_updated");
319 1585 tao
      //docid should include rev number too
320 2641 tao
      /*String accnum=docId+util.getOption("accNumSeparator")+
321
                                              (String)docinfoHash.get("rev");*/
322 2663 sgarg
      logMetacat.info("docid in repl: "+accNumber);
323 1585 tao
      String docType = (String)docinfoHash.get("doctype");
324 2663 sgarg
      logMetacat.info("doctype in repl: "+docType);
325 2286 tao
326 1585 tao
      String parserBase = null;
327
      // this for eml2 and we need user eml2 parser
328 2169 sgarg
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
329 1585 tao
      {
330 2163 tao
         parserBase = DocumentImpl.EML200;
331 1585 tao
      }
332 2286 tao
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
333
      {
334
        parserBase = DocumentImpl.EML200;
335
      }
336
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
337
      {
338
        parserBase = DocumentImpl.EML210;
339
      }
340 1585 tao
      // Write the document into local host
341
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
342 2286 tao
      String newDocid = wrapper.writeReplication(dbConn,
343 1585 tao
                              new StringReader(newxmldoc),
344
                              (String)docinfoHash.get("public_access"),
345
                              null,  /* the dtd text */
346 2286 tao
                              actions,
347 2641 tao
                              accNumber,
348 1585 tao
                              (String)docinfoHash.get("user_owner"),
349
                              null, /* null for groups[] */
350 2286 tao
                              docHomeServer,
351 2624 tao
                              remoteserver, tableName, true,// true is for time replication
352
                              createdDate,
353
                              updatedDate);
354 2663 sgarg
      logMetacat.info("Successfully replicated doc " + accNumber);
355 2641 tao
      MetacatReplication.replLog("wrote doc " + accNumber + " from " +
356 1585 tao
                                         remoteserver);
357 2286 tao
358
    }//try
359 577 berkley
    catch(Exception e)
360
    {
361 2641 tao
      MetacatReplication.replErrorLog("Failed to write doc " + accNumber +
362 1585 tao
                                      " into db because " +e.getMessage());
363 2663 sgarg
      logMetacat.error("Failed to write doc " + accNumber +
364
                                      " into db because " +e.getMessage());
365 1585 tao
      throw e;
366 577 berkley
    }
367 667 berkley
    finally
368
    {
369 1585 tao
       //return DBConnection
370
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
371
    }//finally
372
  }
373 2286 tao
374
375
376 1585 tao
  /* Handle replicate single xml document*/
377 2286 tao
  private void handleSingleDataFile(String remoteserver, String actions,
378 2641 tao
                                    String accNumber, String tableName)
379 1585 tao
               throws Exception
380
  {
381 2663 sgarg
    logMetacat.info("Try to replicate data file: "+accNumber);
382 1585 tao
    DBConnection dbConn = null;
383
    int serialNumber = -1;
384
    try
385
    {
386
      // Get DBConnection from pool
387
      dbConn=DBConnectionPool.
388
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
389
      serialNumber=dbConn.getCheckOutSerialNumber();
390
      // Try get docid info from remote server
391
      DocInfoHandler dih = new DocInfoHandler();
392
      XMLReader docinfoParser = initParser(dih);
393 2286 tao
      String docInfoURLString = "https://" + remoteserver +
394 1585 tao
                  "?server="+util.getLocalReplicationServerName()+
395 2641 tao
                  "&action=getdocumentinfo&docid="+accNumber;
396 1600 tao
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
397
      URL docinfoUrl = new URL(docInfoURLString);
398 2286 tao
399 1585 tao
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
400
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
401
      Hashtable docinfoHash = dih.getDocInfo();
402
      // Get doicd owner
403
      String user = (String)docinfoHash.get("user_owner");
404
      // Get docid name (such as acl or dataset)
405
      String docName = (String)docinfoHash.get("docname");
406 2286 tao
      // Get doc type (eml public id)
407 1585 tao
      String docType = (String)docinfoHash.get("doctype");
408
      // Get docid home sever. it might be different to remoteserver
409
      // becuause of hub feature
410
      String docHomeServer = (String)docinfoHash.get("home_server");
411 2624 tao
      String createdDate = (String)docinfoHash.get("date_created");
412
      String updatedDate = (String)docinfoHash.get("date_updated");
413 1585 tao
      //docid should include rev number too
414 2641 tao
      /*String accnum=docId+util.getOption("accNumSeparator")+
415
                                              (String)docinfoHash.get("rev");*/
416 2286 tao
417
418 1585 tao
      String datafilePath = util.getOption("datafilepath");
419
      // Get data file content
420 1600 tao
      String readDataURLString = "https://" + remoteserver + "?server="+
421 1585 tao
                                        util.getLocalReplicationServerName()+
422 2641 tao
                                            "&action=readdata&docid="+accNumber;
423 1600 tao
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
424
      URL u = new URL(readDataURLString);
425 2286 tao
      InputStream input = u.openStream();
426 1585 tao
      //register data file into xml_documents table and wite data file
427
      //into file system
428 1600 tao
      if ( input != null)
429 667 berkley
      {
430 2286 tao
        DocumentImpl.writeDataFileInReplication(input,
431 2608 tao
                                                datafilePath,
432
                                                docName,docType,
433 2641 tao
                                                accNumber, user,
434 2608 tao
                                                docHomeServer,
435
                                                remoteserver,
436
                                                tableName,
437 2624 tao
                                                true, //true means timed replication
438
                                                createdDate,
439
                                                updatedDate);
440
441 2663 sgarg
        logMetacat.info("Successfully to write datafile " + accNumber);
442 2641 tao
        MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
443 1585 tao
                                    remoteserver);
444
      }//if
445
      else
446 1217 tao
      {
447 2663 sgarg
         logMetacat.info("Couldn't open the data file: " + accNumber);
448 2641 tao
         throw new Exception("Couldn't open the data file: " + accNumber);
449 1585 tao
      }//else
450 2286 tao
451
    }//try
452 1585 tao
    catch(Exception e)
453
    {
454 2641 tao
      MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
455 1585 tao
                                      " because " +e.getMessage());
456 2663 sgarg
      logMetacat.error("Failed to try wrote datafile " + accNumber +
457
                                      " because " +e.getMessage());
458 1585 tao
      throw e;
459
    }
460
    finally
461
    {
462
       //return DBConnection
463
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
464
    }//finally
465
  }
466 2286 tao
467
468
469 1585 tao
  /* Handle delete single document*/
470 2298 tao
  private void handleDeleteSingleDocument(String docId, String notifyServer)
471 1585 tao
               throws Exception
472
  {
473 2663 sgarg
    logMetacat.info("Try delete doc: "+docId);
474 1585 tao
    DBConnection dbConn = null;
475
    int serialNumber = -1;
476
    try
477
    {
478
      // Get DBConnection from pool
479
      dbConn=DBConnectionPool.
480
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
481
      serialNumber=dbConn.getCheckOutSerialNumber();
482
      if(!alreadyDeleted(docId))
483 1217 tao
      {
484 2286 tao
485 1585 tao
         //because delete method docid should have rev number
486
         //so we just add one for it. This rev number is no sence.
487
         String accnum=docId+util.getOption("accNumSeparator")+"1";
488
         //System.out.println("accnum: "+accnum);
489 2298 tao
         DocumentImpl.delete(accnum, null, null, notifyServer);
490 2663 sgarg
         logMetacat.info("Successfully deleted doc " + docId);
491 1585 tao
         MetacatReplication.replLog("Doc " + docId + " deleted");
492
      }
493 2286 tao
494
    }//try
495 1585 tao
    catch(Exception e)
496
    {
497
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
498
                                      " in db because " +e.getMessage());
499 2663 sgarg
      logMetacat.error("Failed to delete doc " + docId +
500
                                 " in db because because " + e.getMessage());
501 1585 tao
      throw e;
502
    }
503
    finally
504
    {
505
       //return DBConnection
506
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
507 1037 tao
    }//finally
508 1585 tao
  }
509 2286 tao
510 1585 tao
  /* Handle updateLastCheckTimForSingleServer*/
511 2286 tao
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
512 1585 tao
                                                  throws Exception
513 590 berkley
  {
514 1585 tao
    String server = repServer.getServerName();
515 1217 tao
    DBConnection dbConn = null;
516
    int serialNumber = -1;
517
    PreparedStatement pstmt = null;
518 590 berkley
    try
519
    {
520 1585 tao
      // Get DBConnection from pool
521 1217 tao
      dbConn=DBConnectionPool.
522 1585 tao
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
523 1217 tao
      serialNumber=dbConn.getCheckOutSerialNumber();
524 2286 tao
525 2663 sgarg
      logMetacat.info("Try to update last_check for server: "+server);
526 1585 tao
      // Get time from remote server
527
      URL dateurl = new URL("https://" + server + "?server="+
528
      util.getLocalReplicationServerName()+"&action=gettime");
529
      String datexml = MetacatReplication.getURLContent(dateurl);
530 2663 sgarg
      logMetacat.info("datexml: "+datexml);
531 1585 tao
      if (datexml!=null && !datexml.equals(""))
532
      {
533
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
534
         StringBuffer sql = new StringBuffer();
535 1751 tao
         /*sql.append("update xml_replication set last_checked = to_date('");
536 1585 tao
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
537 1751 tao
         sql.append("server like '").append(server).append("'");*/
538
         sql.append("update xml_replication set last_checked = ");
539 1753 tao
         sql.append(dbAdapter.toDate(datestr, "MM/DD/YY HH24:MI:SS"));
540 1751 tao
         sql.append(" where server like '").append(server).append("'");
541 1585 tao
         pstmt = dbConn.prepareStatement(sql.toString());
542 2286 tao
543 1585 tao
         pstmt.executeUpdate();
544
         dbConn.commit();
545
         pstmt.close();
546 2663 sgarg
         logMetacat.info("last_checked updated to "+datestr+" on "
547
                                      + server);
548 1585 tao
      }//if
549
      else
550
      {
551 2286 tao
552 2663 sgarg
         logMetacat.info("Failed to update last_checked for server "  +
553 2286 tao
                                  server + " in db because couldn't get time "
554 2663 sgarg
                                  );
555 1585 tao
         throw new Exception("Couldn't get time for server "+ server);
556
      }
557 2286 tao
558
    }//try
559 1585 tao
    catch(Exception e)
560
    {
561 2286 tao
562 2663 sgarg
      logMetacat.error("Failed to update last_checked for server " +
563 1585 tao
                                server + " in db because because " +
564 2663 sgarg
                                e.getMessage());
565 1585 tao
      throw e;
566
    }
567
    finally
568
    {
569
       //return DBConnection
570
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
571
    }//finally
572
  }
573 2286 tao
574
575
576 1585 tao
  /**
577
   * updates xml_catalog with entries from other servers.
578
   */
579
  private void updateCatalog()
580
  {
581 2663 sgarg
    logMetacat.info("Start of updateCatalog");
582 1585 tao
    // ReplicationServer object in server list
583
    ReplicationServer replServer = null;
584
    PreparedStatement pstmt = null;
585
    String server = null;
586 2286 tao
587
588 1585 tao
    // Go through each ReplicationServer object in sererlist
589
    for (int j=0; j<serverList.size(); j++)
590 2286 tao
    {
591 1585 tao
      Vector remoteCatalog = new Vector();
592
      Vector publicId = new Vector();
593
      try
594
      {
595 1292 tao
        // Get ReplicationServer object from server list
596
        replServer = serverList.serverAt(j);
597
        // Get server name from the ReplicationServer object
598
        server = replServer.getServerName();
599
        // Try to get catalog
600 1011 tao
        URL u = new URL("https://" + server + "?server="+
601 1015 tao
        util.getLocalReplicationServerName()+"&action=getcatalog");
602 2663 sgarg
        logMetacat.info("sending message " + u.toString());
603 590 berkley
        String catxml = MetacatReplication.getURLContent(u);
604 2286 tao
605 1292 tao
        // Make sure there are not error, no empty string
606
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
607
        {
608 1585 tao
          throw new Exception("Couldn't get catalog list form server " +server);
609 1292 tao
        }
610 2663 sgarg
        logMetacat.info("catxml: " + catxml);
611 590 berkley
        CatalogMessageHandler cmh = new CatalogMessageHandler();
612
        XMLReader catparser = initParser(cmh);
613
        catparser.parse(new InputSource(new StringReader(catxml)));
614
        //parse the returned catalog xml and put it into a vector
615 1585 tao
        remoteCatalog = cmh.getCatalogVect();
616 2286 tao
617 1292 tao
        // Makse sure remoteCatalog is not empty
618
        if (remoteCatalog.isEmpty())
619
        {
620 1585 tao
          throw new Exception("Couldn't get catalog list form server " +server);
621 1292 tao
        }
622 2286 tao
623 590 berkley
        String localcatxml = MetacatReplication.getCatalogXML();
624 2286 tao
625 1292 tao
        // Make sure local catalog is no empty
626
        if (localcatxml==null||localcatxml.equals(""))
627
        {
628 1585 tao
          throw new Exception("Couldn't get catalog list form server " +server);
629 1292 tao
        }
630 2286 tao
631 590 berkley
        cmh = new CatalogMessageHandler();
632
        catparser = initParser(cmh);
633
        catparser.parse(new InputSource(new StringReader(localcatxml)));
634
        Vector localCatalog = cmh.getCatalogVect();
635 2286 tao
636 590 berkley
        //now we have the catalog from the remote server and this local server
637
        //we now need to compare the two and merge the differences.
638
        //the comparison is base on the public_id fields which is the 4th
639
        //entry in each row vector.
640 1585 tao
        publicId = new Vector();
641 590 berkley
        for(int i=0; i<localCatalog.size(); i++)
642
        {
643
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
644 2663 sgarg
          logMetacat.info("v1: " + v.toString());
645 590 berkley
          publicId.add(new String((String)v.elementAt(3)));
646 595 berkley
          //System.out.println("adding " + (String)v.elementAt(3));
647 590 berkley
        }
648 1585 tao
      }//try
649
      catch (Exception e)
650
      {
651
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
652
                                    server + " because " +e.getMessage());
653 2663 sgarg
        logMetacat.error("Failed to update catalog for server "+
654
                                    server + " because " +e.getMessage());
655 1585 tao
      }//catch
656 2286 tao
657 1585 tao
      for(int i=0; i<remoteCatalog.size(); i++)
658
      {
659
         // DConnection
660
        DBConnection dbConn = null;
661
        // DBConnection checkout serial number
662
        int serialNumber = -1;
663
        try
664 590 berkley
        {
665 1585 tao
            dbConn=DBConnectionPool.
666
                  getDBConnection("ReplicationHandler.updateCatalog");
667
            serialNumber=dbConn.getCheckOutSerialNumber();
668
            Vector v = (Vector)remoteCatalog.elementAt(i);
669
            //System.out.println("v2: " + v.toString());
670
            //System.out.println("i: " + i);
671
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
672
            //System.out.println("publicID: " + publicId.toString());
673 2663 sgarg
            logMetacat.info
674
                              ("v.elementAt(3): " + (String)v.elementAt(3));
675 1585 tao
           if(!publicId.contains(v.elementAt(3)))
676
           { //so we don't have this public id in our local table so we need to
677
             //add it.
678
             //System.out.println("in if");
679
             StringBuffer sql = new StringBuffer();
680
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
681
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
682
             sql.append("?,?)");
683
             //System.out.println("sql: " + sql.toString());
684
             pstmt = dbConn.prepareStatement(sql.toString());
685
             pstmt.setString(1, (String)v.elementAt(0));
686
             pstmt.setString(2, (String)v.elementAt(1));
687
             pstmt.setString(3, (String)v.elementAt(2));
688
             pstmt.setString(4, (String)v.elementAt(3));
689
             pstmt.setString(5, (String)v.elementAt(4));
690
             pstmt.execute();
691
             pstmt.close();
692
             MetacatReplication.replLog("Success fully to insert new publicid "+
693
                               (String)v.elementAt(3) + " from server"+server);
694 2663 sgarg
             logMetacat.info("Success fully to insert new publicid "+
695
                             (String)v.elementAt(3) + " from server" +server);
696 1585 tao
           }
697 590 berkley
        }
698 1585 tao
        catch(Exception e)
699
        {
700
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
701
                                    server + " because " +e.getMessage());
702 2663 sgarg
           logMetacat.error("Failed to update catalog for server "+
703
                                    server + " because " +e.getMessage());
704 1585 tao
        }//catch
705
        finally
706
        {
707
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
708
        }//finall
709
      }//for remote catalog
710
    }//for server list
711 2663 sgarg
    logMetacat.info("End of updateCatalog");
712 590 berkley
  }
713 2286 tao
714 590 berkley
  /**
715 579 berkley
   * Method that returns true if docid has already been "deleted" from metacat.
716 582 berkley
   * This method really implements a truth table for deleted documents
717 590 berkley
   * The table is (a docid in one of the tables is represented by the X):
718 582 berkley
   * xml_docs      xml_revs      deleted?
719
   * ------------------------------------
720
   *   X             X             FALSE
721
   *   X             _             FALSE
722
   *   _             X             TRUE
723
   *   _             _             TRUE
724 579 berkley
   */
725 1585 tao
  private static boolean alreadyDeleted(String docid) throws Exception
726 579 berkley
  {
727 1217 tao
    DBConnection dbConn = null;
728
    int serialNumber = -1;
729
    PreparedStatement pstmt = null;
730 579 berkley
    try
731
    {
732 1217 tao
      dbConn=DBConnectionPool.
733
                  getDBConnection("ReplicationHandler.alreadyDeleted");
734
      serialNumber=dbConn.getCheckOutSerialNumber();
735 582 berkley
      boolean xml_docs = false;
736
      boolean xml_revs = false;
737 2286 tao
738 579 berkley
      StringBuffer sb = new StringBuffer();
739 582 berkley
      sb.append("select docid from xml_revisions where docid like '");
740 579 berkley
      sb.append(docid).append("'");
741 1217 tao
      pstmt = dbConn.prepareStatement(sb.toString());
742 579 berkley
      pstmt.execute();
743
      ResultSet rs = pstmt.getResultSet();
744
      boolean tablehasrows = rs.next();
745
      if(tablehasrows)
746
      {
747 582 berkley
        xml_revs = true;
748
      }
749 2286 tao
750 582 berkley
      sb = new StringBuffer();
751
      sb.append("select docid from xml_documents where docid like '");
752
      sb.append(docid).append("'");
753 667 berkley
      pstmt.close();
754 1217 tao
      pstmt = dbConn.prepareStatement(sb.toString());
755
      //increase usage count
756
      dbConn.increaseUsageCount(1);
757 582 berkley
      pstmt.execute();
758
      rs = pstmt.getResultSet();
759
      tablehasrows = rs.next();
760 667 berkley
      pstmt.close();
761 582 berkley
      if(tablehasrows)
762
      {
763
        xml_docs = true;
764
      }
765 2286 tao
766 582 berkley
      if(xml_docs && xml_revs)
767
      {
768
        return false;
769
      }
770
      else if(xml_docs && !xml_revs)
771
      {
772
        return false;
773
      }
774
      else if(!xml_docs && xml_revs)
775
      {
776 579 berkley
        return true;
777
      }
778 582 berkley
      else if(!xml_docs && !xml_revs)
779
      {
780
        return true;
781
      }
782 579 berkley
    }
783
    catch(Exception e)
784
    {
785 2663 sgarg
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
786
                          e.getMessage());
787 1585 tao
      throw e;
788 579 berkley
    }
789 667 berkley
    finally
790
    {
791 1217 tao
      try
792
      {
793
        pstmt.close();
794
      }//try
795
      catch (SQLException ee)
796
      {
797 2663 sgarg
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
798
                          "to close pstmt: "+ee.getMessage());
799 1585 tao
        throw ee;
800 1217 tao
      }//catch
801
      finally
802
      {
803
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
804
      }//finally
805
    }//finally
806 579 berkley
    return false;
807
  }
808 533 berkley
809 2286 tao
810 533 berkley
  /**
811 574 berkley
   * Method to initialize the message parser
812
   */
813
  public static XMLReader initParser(DefaultHandler dh)
814
          throws Exception
815
  {
816
    XMLReader parser = null;
817
818
    try {
819
      ContentHandler chandler = dh;
820
821
      // Get an instance of the parser
822
      MetaCatUtil util = new MetaCatUtil();
823
      String parserName = util.getOption("saxparser");
824
      parser = XMLReaderFactory.createXMLReader(parserName);
825
826
      // Turn off validation
827
      parser.setFeature("http://xml.org/sax/features/validation", false);
828 2286 tao
829 574 berkley
      parser.setContentHandler((ContentHandler)chandler);
830
      parser.setErrorHandler((ErrorHandler)chandler);
831
832
    } catch (Exception e) {
833
      throw e;
834
    }
835
836
    return parser;
837
  }
838 2286 tao
839 2555 tao
  /**
840
   * This method will combinate given time string(in short format) to
841
   * current date. If the given time (e.g 10:00 AM) passed the current time
842
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
843
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
844
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
845
   * 10:00 AM Aug 21, 2005.
846
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
847
   * @return
848
   * @throws Exception
849
   */
850
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
851
  {
852
     Date givenDate = parseTime(givenTime);
853
     Date newDate = null;
854
     Date now = new Date();
855
     String currentTimeString = getTimeString(now);
856
     Date currentTime = parseTime(currentTimeString);
857
     if ( currentTime.getTime() >= givenDate.getTime())
858
     {
859 2663 sgarg
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
860 2555 tao
        String dateAndTime = getDateString(now) + " " + givenTime;
861
        Date combinationDate = parseDateTime(dateAndTime);
862
        // new date should plus 24 hours to make is the second day
863
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
864
     }
865
     else
866
     {
867 2663 sgarg
         logMetacat.info("Today haven't pass the given time, we should it as today");
868 2555 tao
         String dateAndTime = getDateString(now) + " " + givenTime;
869
         newDate = parseDateTime(dateAndTime);
870
     }
871 2663 sgarg
     logMetacat.warn("final setting time is "+ newDate.toString());
872 2555 tao
     return newDate;
873
  }
874 2286 tao
875 2555 tao
  /*
876
   * parse a given string to Time in short format.
877
   * For example, given time is 10:00 AM, the date will be return as
878
   * Jan 1 1970, 10:00 AM
879
   */
880
  private static Date parseTime(String timeString) throws Exception
881
  {
882
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
883
    Date time = format.parse(timeString);
884 2663 sgarg
    logMetacat.info("Date string is after parse a time string "
885
                              +time.toString());
886 2555 tao
    return time;
887
888
  }
889
890
  /*
891
   * Pasre a given string to date and time. Date format is long and time
892
   * format is short.
893
   */
894
  private static Date parseDateTime(String timeString) throws Exception
895
  {
896
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
897
    Date time = format.parse(timeString);
898 2663 sgarg
    logMetacat.info("Date string is after parse a time string "+
899
                             time.toString());
900 2555 tao
    return time;
901
  }
902
903
  /*
904
   * Get a date string from a Date object. The date format will be long
905
   */
906
  private static String getDateString(Date now)
907
  {
908
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
909
     String s = df.format(now);
910 2663 sgarg
     logMetacat.info("Today is " + s);
911 2555 tao
     return s;
912
  }
913
914
  /*
915
   * Get a time string from a Date object, the time format will be short
916
   */
917
  private static String getTimeString(Date now)
918
  {
919
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
920
     String s = df.format(now);
921 2663 sgarg
     logMetacat.info("Time is " + s);
922 2555 tao
     return s;
923
  }
924 2608 tao
925
926
  /*
927
   * This method will go through the docid list both in xml_Documents table
928
   * and in xml_revisions table
929
   * @author tao
930
   */
931
   private void handleDocList(Vector docList, String tableName)
932
   {
933
       boolean dataFile=false;
934
       for(int j=0; j<docList.size(); j++)
935
       {
936
         //initial dataFile is false
937
         dataFile=false;
938
         //w is information for one document, information contain
939
         //docid, rev, server or datafile.
940
         Vector w = new Vector((Vector)(docList.elementAt(j)));
941
         //Check if the vector w contain "datafile"
942
         //If it has, this document is data file
943
         if (w.contains((String)util.getOption("datafileflag")))
944
         {
945
           dataFile=true;
946
         }
947
         //System.out.println("w: " + w.toString());
948
         // Get docid
949
         String docid = (String)w.elementAt(0);
950 2663 sgarg
         logMetacat.info("docid: " + docid);
951 2608 tao
         // Get revision number
952
         int rev = Integer.parseInt((String)w.elementAt(1));
953 2663 sgarg
         logMetacat.info("rev: " + rev);
954 2608 tao
         // Get remote server name (it is may not be doc homeserver because
955
         // the new hub feature
956
         String remoteServer = (String)w.elementAt(2);
957
         remoteServer = remoteServer.trim();
958
959
         try
960
         {
961
             if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
962
             {
963
                 handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
964
             }
965
             else if (tableName.equals(DocumentImpl.REVISIONTABLE))
966
             {
967
                 handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
968
             }
969
             else
970
             {
971
                 continue;
972
             }
973
974
         }
975
         catch (Exception e)
976
         {
977 2663 sgarg
             logMetacat.error("error to handle update doc in "+
978
                     tableName +" in time replication"+e.getMessage());
979 2608 tao
             continue;
980
         }
981
982
       }//for update docs
983
984
   }
985
986
   /*
987
    * This method will handle doc in xml_documents table.
988
    */
989
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile)
990
                                        throws Exception
991
   {
992
       // compare the update rev and local rev to see what need happen
993
       int localrev = -1;
994
       String action = null;
995
       boolean flag = false;
996
       try
997
       {
998 2641 tao
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
999 2608 tao
       }
1000
       catch (SQLException e)
1001
       {
1002 2663 sgarg
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1003
                                " be found because " + e.getMessage());
1004 2608 tao
         MetacatReplication.replErrorLog("Docid "+ docid + " could not be "+
1005
                 "written because error happend to find it's local revision");
1006
         throw new Exception (e.getMessage());
1007
       }
1008 2663 sgarg
       logMetacat.info("Local rev for docid "+ docid + " is "+
1009
                               localrev);
1010 2608 tao
1011
       //check the revs for an update because this document is in the
1012
       //local DB, it might be out of date.
1013
       if (localrev == -1)
1014
       {
1015
         //insert this document as new because it is not in the local DB
1016
         action = "INSERT";
1017
         flag = true;
1018
       }
1019
       else
1020
       {
1021
         if(localrev == rev)
1022
         {
1023
           // Local meatacat has the same rev to remote host, don't need
1024
           // update and flag set false
1025
           flag = false;
1026
         }
1027
         else if(localrev < rev)
1028
         {
1029
           //this document needs to be updated so send an read request
1030
           action = "UPDATE";
1031
           flag = true;
1032
         }
1033
       }
1034 2641 tao
1035
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1036 2608 tao
       // this is non-data file
1037
       if(flag && !dataFile)
1038
       {
1039
         try
1040
         {
1041 2641 tao
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1042 2608 tao
         }
1043
         catch(Exception e)
1044
         {
1045
           // skip this document
1046
           throw e;
1047
         }
1048
       }//if for non-data file
1049
1050
        // this is for data file
1051
       if(flag && dataFile)
1052
       {
1053
         try
1054
         {
1055 2641 tao
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1056 2608 tao
         }
1057
         catch(Exception e)
1058
         {
1059
           // skip this datafile
1060
           throw e;
1061
         }
1062
1063
       }//for datafile
1064
   }
1065
1066
   /*
1067
    * This method will handle doc in xml_documents table.
1068
    */
1069
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile)
1070
                                        throws Exception
1071
   {
1072
       // compare the update rev and local rev to see what need happen
1073 2663 sgarg
       logMetacat.info("In handle repliation revsion table");
1074
       logMetacat.info("the docid is "+ docid);
1075
       logMetacat.info("The rev is "+rev);
1076 2608 tao
       Vector localrev = null;
1077
       String action = "INSERT";
1078
       boolean flag = false;
1079
       try
1080
       {
1081
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1082
       }
1083
       catch (SQLException e)
1084
       {
1085 2663 sgarg
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1086
                                " be found because " + e.getMessage());
1087 2608 tao
         MetacatReplication.replErrorLog("Docid "+ docid + " could not be "+
1088
                 "written because error happend to find it's local revision");
1089
         throw new Exception (e.getMessage());
1090
       }
1091 2663 sgarg
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1092
                               localrev.toString());
1093 2608 tao
1094
       // if the rev is not in the xml_revision, we need insert it
1095
       if (!localrev.contains(new Integer(rev)))
1096
       {
1097
           flag = true;
1098
       }
1099 2641 tao
1100
       String accNumber = docid + MetaCatUtil.getOption("accNumSeparator") + rev;
1101 2608 tao
       // this is non-data file
1102
       if(flag && !dataFile)
1103
       {
1104
         try
1105
         {
1106 2641 tao
1107
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1108 2608 tao
         }
1109
         catch(Exception e)
1110
         {
1111
           // skip this document
1112
           throw e;
1113
         }
1114
       }//if for non-data file
1115
1116
        // this is for data file
1117
       if(flag && dataFile)
1118
       {
1119
         try
1120
         {
1121 2641 tao
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1122 2608 tao
         }
1123
         catch(Exception e)
1124
         {
1125
           // skip this datafile
1126
           throw e;
1127
         }
1128
1129
       }//for datafile
1130
   }
1131
1132 522 berkley
}