Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2008-10-13 11:34:13 -0700 (Mon, 13 Oct 2008) $'
10
 * '$Revision: 4450 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat;
28

    
29
import edu.ucsb.nceas.metacat.service.DatabaseService;
30
import edu.ucsb.nceas.metacat.service.PropertyService;
31
import edu.ucsb.nceas.metacat.util.MetaCatUtil;
32
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
33

    
34
import java.sql.*;
35
import java.util.*;
36
import java.util.Date;
37
import java.lang.Thread;
38
import java.io.*;
39
import java.net.*;
40
import java.text.*;
41

    
42
import org.apache.log4j.Logger;
43
import org.xml.sax.AttributeList;
44
import org.xml.sax.ContentHandler;
45
import org.xml.sax.DTDHandler;
46
import org.xml.sax.EntityResolver;
47
import org.xml.sax.ErrorHandler;
48
import org.xml.sax.InputSource;
49
import org.xml.sax.XMLReader;
50
import org.xml.sax.SAXException;
51
import org.xml.sax.SAXParseException;
52
import org.xml.sax.helpers.XMLReaderFactory;
53
import org.xml.sax.helpers.DefaultHandler;
54

    
55

    
56

    
57
/**
58
 * This class handles deltaT replication checking.  Whenever this TimerTask
59
 * is fired it checks each server in xml_replication for updates and updates
60
 * the local db as needed.
61
 */
62
public class ReplicationHandler extends TimerTask
63
{
64
  int serverCheckCode = 1;
65
  ReplicationServerList serverList = null;
66
  //PrintWriter out;
67
//  private static final AbstractDatabase dbAdapter = MetaCatUtil.dbAdapter;
68
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
69
  private static int DOCINSERTNUMBER = 1;
70
  private static int DOCERRORNUMBER  = 1;
71
  private static int REVINSERTNUMBER = 1;
72
  private static int REVERRORNUMBER  = 1;
73
  public ReplicationHandler()
74
  {
75
    //this.out = o;
76
    serverList = new ReplicationServerList();
77
  }
78

    
79
  public ReplicationHandler(int serverCheckCode)
80
  {
81
    //this.out = o;
82
    this.serverCheckCode = serverCheckCode;
83
    serverList = new ReplicationServerList();
84
  }
85

    
86
  /**
87
   * Method that implements TimerTask.run().  It runs whenever the timer is
88
   * fired.
89
   */
90
  public void run()
91
  {
92
    //find out the last_checked time of each server in the server list and
93
    //send a query to each server to see if there are any documents in
94
    //xml_documents with an update_date > last_checked
95
    try
96
    {
97
      //if serverList is null, metacat don't need to replication
98
      if (serverList==null||serverList.isEmpty())
99
      {
100
        return;
101
      }
102
      updateCatalog();
103
      update();
104
      //conn.close();
105
    }//try
106
    catch (Exception e)
107
    {
108
      logMetacat.error("Error in replicationHandler.run():"
109
                                                    +e.getMessage());
110
      e.printStackTrace();
111
    }//catch
112
  }
113

    
114
  /**
115
   * Method that uses revision taging for replication instead of update_date.
116
   */
117
  private void update()
118
  {
119
    /*
120
     Pseudo-algorithm
121
     - request a doc list from each server in xml_replication
122
     - check the rev number of each of those documents agains the
123
       documents in the local database
124
     - pull any documents that have a lesser rev number on the local server
125
       from the remote server
126
     - delete any documents that still exist in the local xml_documents but
127
       are in the deletedDocuments tag of the remote host response.
128
     - update last_checked to keep track of the last time it was checked.
129
       (this info is theoretically not needed using this system but probably
130
       should be kept anyway)
131
    */
132

    
133
    ReplicationServer replServer = null; // Variable to store the
134
                                        // ReplicationServer got from
135
                                        // Server list
136
    String server = null; // Variable to store server name
137
    String update;
138
    Vector responses = new Vector();
139
    URL u;
140

    
141

    
142
    
143
    //Check for every server in server list to get updated list and put
144
    // them in to response
145
    for (int i=0; i<serverList.size(); i++)
146
    {
147
        // Get ReplicationServer object from server list
148
        replServer = serverList.serverAt(i);
149
        // Get server name from ReplicationServer object
150
        server = replServer.getServerName().trim();
151
        String result = null;
152
        MetacatReplication.replLog("full update started to: " + server);
153
        // Send command to that server to get updated docid information
154
        try
155
        {
156
          u = new URL("https://" + server + "?server="
157
          +MetaCatUtil.getLocalReplicationServerName()+"&action=update");
158
          logMetacat.info("Sending infomation " +u.toString());
159
          result = MetacatReplication.getURLContent(u);
160
        }
161
        catch (Exception e)
162
        {
163
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
164
                          "for server " + server + " because "+e.getMessage());
165
          logMetacat.error( "Failed to get updated doc list "+
166
                       "for server " + server + " because "+e.getMessage());
167
          continue;
168
        }
169

    
170
        logMetacat.info("docid: "+server+" "+result);
171
        //check if result have error or not, if has skip it.
172
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
173
        {
174
          MetacatReplication.replErrorLog("Failed to get updated doc list "+
175
                          "for server " + server + " because "+result);
176
          logMetacat.info( "Failed to get updated doc list "+
177
                       "for server " + server + " because "+result);
178
          continue;
179
        }
180
        //Add result to vector
181
        responses.add(result);
182
    }
183

    
184
    //make sure that there is updated file list
185
    //If response is null, metacat don't need do anything
186
    if (responses==null || responses.isEmpty())
187
    {
188
        MetacatReplication.replErrorLog("No updated doc list for "+
189
                           "every server and failed to replicate");
190
        logMetacat.info( "No updated doc list for "+
191
                           "every server and failed to replicate");
192
        return;
193
    }
194

    
195

    
196
    logMetacat.info("Responses from remote metacat about updated "+
197
                   "document information: "+ responses.toString());
198
    // go through response vector(it contains updated vector and delete vector
199
    for(int i=0; i<responses.size(); i++)
200
    {
201
    	XMLReader parser;
202
    	ReplMessageHandler message = new ReplMessageHandler();
203
    	try
204
        {
205
          parser = initParser(message);
206
        }
207
        catch (Exception e)
208
        {
209
          MetacatReplication.replErrorLog("Failed to replicate becaue couldn't "+
210
                                     " initParser for message and" +e.getMessage());
211
          logMetacat.error("Failed to replicate becaue couldn't " +
212
                                " initParser for message and " +e.getMessage());
213
           // stop replication
214
           return;
215
        }
216
    	
217
        try
218
        {
219
          parser.parse(new InputSource(
220
                     new StringReader(
221
                     (String)(responses.elementAt(i)))));
222
        }
223
        catch(Exception e)
224
        {
225
          MetacatReplication.replErrorLog("Couldn't parse one responses "+
226
                           "because "+ e.getMessage());
227
          logMetacat.error("Couldn't parse one responses "+
228
                                   "because "+ e.getMessage());
229
          continue;
230
        }
231
        //v is the list of updated documents
232
        Vector updateList = new Vector(message.getUpdatesVect());
233
        MetacatReplication.replLog("The document list size is "+updateList.size()+ " from "+message.getServerName());
234
        //System.out.println("v: " + v.toString());
235
        //d is the list of deleted documents
236
        Vector deleteList = new Vector(message.getDeletesVect());
237
        //System.out.println("d: " + d.toString());
238
        logMetacat.info("Update vector size: "+ updateList.size()+" from "+message.getServerName());
239
        logMetacat.info("Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
240
        MetacatReplication.replLog("The delete document list size is "+deleteList.size()+" from "+message.getServerName());
241
        // go though every element in updated document vector
242
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
243
        //handle deleted docs
244
        for(int k=0; k<deleteList.size(); k++)
245
        { //delete the deleted documents;
246
          Vector w = new Vector((Vector)deleteList.elementAt(k));
247
          String docId = (String)w.elementAt(0);
248
          try
249
          {
250
            handleDeleteSingleDocument(docId, server);
251
          }
252
          catch (Exception ee)
253
          {
254
            continue;
255
          }
256
        }//for delete docs
257
        
258
        // handle replicate doc in xml_revision
259
        Vector revisionList = new Vector(message.getRevisionsVect());
260
        MetacatReplication.replLog("The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
261
        logMetacat.info("The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
262
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
263
        DOCINSERTNUMBER = 1;
264
        DOCERRORNUMBER  = 1;
265
        REVINSERTNUMBER = 1;
266
        REVERRORNUMBER  = 1;
267
    }//for response
268

    
269
    //updated last_checked
270
    for (int i=0;i<serverList.size(); i++)
271
    {
272
       // Get ReplicationServer object from server list
273
       replServer = serverList.serverAt(i);
274
       try
275
       {
276
         updateLastCheckTimeForSingleServer(replServer);
277
       }
278
       catch(Exception e)
279
       {
280
         continue;
281
       }
282
    }//for
283

    
284
  }//update
285

    
286
  /* Handle replicate single xml document*/
287
  private void handleSingleXMLDocument(String remoteserver, String actions,
288
                                       String accNumber, String tableName)
289
               throws Exception
290
  {
291
    DBConnection dbConn = null;
292
    int serialNumber = -1;
293
    try
294
    {
295
      // Get DBConnection from pool
296
      dbConn=DBConnectionPool.
297
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
298
      serialNumber=dbConn.getCheckOutSerialNumber();
299
      //if the document needs to be updated or inserted, this is executed
300
      String readDocURLString = "https://" + remoteserver + "?server="+
301
              MetaCatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
302
      readDocURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDocURLString);
303
      URL u = new URL(readDocURLString);
304

    
305
      // Get docid content
306
      String newxmldoc = MetacatReplication.getURLContent(u);
307
      // If couldn't get skip it
308
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
309
      {
310
         throw new Exception(newxmldoc);
311
      }
312
      //logMetacat.info("xml documnet:");
313
      //logMetacat.info(newxmldoc);
314

    
315
      // Try get the docid info from remote server
316
      DocInfoHandler dih = new DocInfoHandler();
317
      XMLReader docinfoParser = initParser(dih);
318
      String docInfoURLStr = "https://" + remoteserver +
319
                       "?server="+MetaCatUtil.getLocalReplicationServerName()+
320
                       "&action=getdocumentinfo&docid="+accNumber;
321
      docInfoURLStr = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
322
      URL docinfoUrl = new URL(docInfoURLStr);
323
      logMetacat.info("Sending message: " +
324
                                                  docinfoUrl.toString());
325
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
326
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
327
      Hashtable docinfoHash = dih.getDocInfo();
328
      // Get home server of the docid
329
      String docHomeServer = (String)docinfoHash.get("home_server");
330
      logMetacat.info("doc home server in repl: "+docHomeServer);
331
      String createdDate = (String)docinfoHash.get("date_created");
332
      String updatedDate = (String)docinfoHash.get("date_updated");
333
      //docid should include rev number too
334
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
335
                                              (String)docinfoHash.get("rev");*/
336
      logMetacat.info("docid in repl: "+accNumber);
337
      String docType = (String)docinfoHash.get("doctype");
338
      logMetacat.info("doctype in repl: "+docType);
339

    
340
      String parserBase = null;
341
      // this for eml2 and we need user eml2 parser
342
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
343
      {
344
         parserBase = DocumentImpl.EML200;
345
      }
346
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
347
      {
348
        parserBase = DocumentImpl.EML200;
349
      }
350
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
351
      {
352
        parserBase = DocumentImpl.EML210;
353
      }
354
      // Write the document into local host
355
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
356
      String newDocid = wrapper.writeReplication(dbConn,
357
                              new StringReader(newxmldoc),
358
                              (String)docinfoHash.get("public_access"),
359
                              null,  /* the dtd text */
360
                              actions,
361
                              accNumber,
362
                              (String)docinfoHash.get("user_owner"),
363
                              null, /* null for groups[] */
364
                              docHomeServer,
365
                              remoteserver, tableName, true,// true is for time replication 
366
                              createdDate,
367
                              updatedDate);
368
      
369
      //process extra access rules
370
      Vector accessControlList = (Vector) docinfoHash.get("accessControl");
371
      if (accessControlList != null) {
372
    	  for (int i = 0; i < accessControlList.size(); i++) {
373
        	  AccessControlForSingleFile acfsf = (AccessControlForSingleFile) accessControlList.get(i);
374
        	  if (!acfsf.accessControlExists()) {
375
        		  acfsf.insertPermissions();
376
        	  }
377
          }
378
      }
379
      
380
      logMetacat.info("Successfully replicated doc " + accNumber);
381
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
382
      {
383
        MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
384
                                     " into "+tableName + " from " +
385
                                         remoteserver);
386
        DOCINSERTNUMBER++;
387
      }
388
      else
389
      {
390
          MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
391
                  " into "+tableName + " from " +
392
                      remoteserver);
393
          REVINSERTNUMBER++;
394
      }
395
      String ip = getIpFromURL(u);
396
      EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
397
      
398

    
399
    }//try
400
    catch(Exception e)
401
    {
402
        
403
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
404
        {
405
          MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
406
                                       " into "+tableName + " from " +
407
                                           remoteserver + " because "+e.getMessage());
408
          DOCERRORNUMBER++;
409
        }
410
        else
411
        {
412
            MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
413
                    " into "+tableName + " from " +
414
                        remoteserver +" because "+e.getMessage());
415
            REVERRORNUMBER++;
416
        }
417
       
418
      logMetacat.error("Failed to write doc " + accNumber +
419
                                      " into db because " +e.getMessage());
420
      throw e;
421
    }
422
    finally
423
    {
424
       //return DBConnection
425
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
426
    }//finally
427
  }
428

    
429

    
430

    
431
  /* Handle replicate single xml document*/
432
  private void handleSingleDataFile(String remoteserver, String actions,
433
                                    String accNumber, String tableName)
434
               throws Exception
435
  {
436
    logMetacat.info("Try to replicate data file: "+accNumber);
437
    DBConnection dbConn = null;
438
    int serialNumber = -1;
439
    try
440
    {
441
      // Get DBConnection from pool
442
      dbConn=DBConnectionPool.
443
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
444
      serialNumber=dbConn.getCheckOutSerialNumber();
445
      // Try get docid info from remote server
446
      DocInfoHandler dih = new DocInfoHandler();
447
      XMLReader docinfoParser = initParser(dih);
448
      String docInfoURLString = "https://" + remoteserver +
449
                  "?server="+MetaCatUtil.getLocalReplicationServerName()+
450
                  "&action=getdocumentinfo&docid="+accNumber;
451
      docInfoURLString = MetaCatUtil.replaceWhiteSpaceForURL(docInfoURLString);
452
      URL docinfoUrl = new URL(docInfoURLString);
453

    
454
      String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
455
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
456
      Hashtable docinfoHash = dih.getDocInfo();
457
      // Get doicd owner
458
      String user = (String)docinfoHash.get("user_owner");
459
      // Get docid name (such as acl or dataset)
460
      String docName = (String)docinfoHash.get("docname");
461
      // Get doc type (eml public id)
462
      String docType = (String)docinfoHash.get("doctype");
463
      // Get docid home sever. it might be different to remoteserver
464
      // becuause of hub feature
465
      String docHomeServer = (String)docinfoHash.get("home_server");
466
      String createdDate = (String)docinfoHash.get("date_created");
467
      String updatedDate = (String)docinfoHash.get("date_updated");
468
      //docid should include rev number too
469
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
470
                                              (String)docinfoHash.get("rev");*/
471

    
472

    
473
      String datafilePath = PropertyService.getProperty("application.datafilepath");
474
      // Get data file content
475
      String readDataURLString = "https://" + remoteserver + "?server="+
476
                                        MetaCatUtil.getLocalReplicationServerName()+
477
                                            "&action=readdata&docid="+accNumber;
478
      readDataURLString = MetaCatUtil.replaceWhiteSpaceForURL(readDataURLString);
479
      URL u = new URL(readDataURLString);
480
      InputStream input = u.openStream();
481
      //register data file into xml_documents table and wite data file
482
      //into file system
483
      if ( input != null)
484
      {
485
        DocumentImpl.writeDataFileInReplication(input,
486
                                                datafilePath,
487
                                                docName,docType,
488
                                                accNumber, user,
489
                                                docHomeServer,
490
                                                remoteserver,
491
                                                tableName,
492
                                                true, //true means timed replication
493
                                                createdDate,
494
                                                updatedDate);
495
                                         
496
        //process extra access rules
497
        Vector accessControlList = (Vector) docinfoHash.get("accessControl");
498
        if (accessControlList != null) {
499
      	  for (int i = 0; i < accessControlList.size(); i++) {
500
          	  AccessControlForSingleFile acfsf = (AccessControlForSingleFile) accessControlList.get(i);
501
          	  if (!acfsf.accessControlExists()) {
502
          		acfsf.insertPermissions();
503
          	  }
504
            }
505
        }
506
        
507
        logMetacat.info("Successfully to write datafile " + accNumber);
508
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
509
                                    remoteserver);*/
510
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
511
        {
512
          MetacatReplication.replLog("" +DOCINSERTNUMBER + " Wrote data file" + accNumber +
513
                                       " into "+tableName + " from " +
514
                                           remoteserver);
515
          DOCINSERTNUMBER++;
516
        }
517
        else
518
        {
519
            MetacatReplication.replLog("" +REVINSERTNUMBER + " Wrote data file" + accNumber +
520
                    " into "+tableName + " from " +
521
                        remoteserver);
522
            REVINSERTNUMBER++;
523
        }
524
        String ip = getIpFromURL(u);
525
        EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, accNumber, actions);
526
        
527
      }//if
528
      else
529
      {
530
         logMetacat.info("Couldn't open the data file: " + accNumber);
531
         throw new Exception("Couldn't open the data file: " + accNumber);
532
      }//else
533

    
534
    }//try
535
    catch(Exception e)
536
    {
537
      /*MetacatReplication.replErrorLog("Failed to try wrote datafile " + accNumber +
538
                                      " because " +e.getMessage());*/
539
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
540
      {
541
        MetacatReplication.replErrorLog("" +DOCERRORNUMBER + " Failed to write data file " + accNumber +
542
                                     " into "+tableName + " from " +
543
                                         remoteserver + " because "+e.getMessage());
544
        DOCERRORNUMBER++;
545
      }
546
      else
547
      {
548
          MetacatReplication.replErrorLog("" +REVERRORNUMBER + " Failed to write data file" + accNumber +
549
                  " into "+tableName + " from " +
550
                      remoteserver +" because "+e.getMessage());
551
          REVERRORNUMBER++;
552
      }
553
      logMetacat.error("Failed to try wrote datafile " + accNumber +
554
                                      " because " +e.getMessage());
555
      throw e;
556
    }
557
    finally
558
    {
559
       //return DBConnection
560
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
561
    }//finally
562
  }
563

    
564

    
565

    
566
  /* Handle delete single document*/
567
  private void handleDeleteSingleDocument(String docId, String notifyServer)
568
               throws Exception
569
  {
570
    logMetacat.info("Try delete doc: "+docId);
571
    DBConnection dbConn = null;
572
    int serialNumber = -1;
573
    try
574
    {
575
      // Get DBConnection from pool
576
      dbConn=DBConnectionPool.
577
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
578
      serialNumber=dbConn.getCheckOutSerialNumber();
579
      if(!alreadyDeleted(docId))
580
      {
581

    
582
         //because delete method docid should have rev number
583
         //so we just add one for it. This rev number is no sence.
584
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
585
         //System.out.println("accnum: "+accnum);
586
         DocumentImpl.delete(accnum, null, null, notifyServer);
587
         logMetacat.info("Successfully deleted doc " + docId);
588
         MetacatReplication.replLog("Doc " + docId + " deleted");
589
         URL u = new URL("https://"+notifyServer);
590
         String ip = getIpFromURL(u);
591
         EventLog.getInstance().log(ip, MetacatReplication.REPLICATIONUSER, docId, "delete");
592
      }
593

    
594
    }//try
595
    catch(Exception e)
596
    {
597
      MetacatReplication.replErrorLog("Failed to delete doc " + docId +
598
                                      " in db because " +e.getMessage());
599
      logMetacat.error("Failed to delete doc " + docId +
600
                                 " in db because because " + e.getMessage());
601
      throw e;
602
    }
603
    finally
604
    {
605
       //return DBConnection
606
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
607
    }//finally
608
  }
609

    
610
  /* Handle updateLastCheckTimForSingleServer*/
611
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
612
                                                  throws Exception
613
  {
614
    String server = repServer.getServerName();
615
    DBConnection dbConn = null;
616
    int serialNumber = -1;
617
    PreparedStatement pstmt = null;
618
    try
619
    {
620
      // Get DBConnection from pool
621
      dbConn=DBConnectionPool.
622
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
623
      serialNumber=dbConn.getCheckOutSerialNumber();
624

    
625
      logMetacat.info("Try to update last_check for server: "+server);
626
      // Get time from remote server
627
      URL dateurl = new URL("https://" + server + "?server="+
628
      MetaCatUtil.getLocalReplicationServerName()+"&action=gettime");
629
      String datexml = MetacatReplication.getURLContent(dateurl);
630
      logMetacat.info("datexml: "+datexml);
631
      if (datexml!=null && !datexml.equals(""))
632
      {
633
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
634
         StringBuffer sql = new StringBuffer();
635
         /*sql.append("update xml_replication set last_checked = to_date('");
636
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
637
         sql.append("server like '").append(server).append("'");*/
638
         sql.append("update xml_replication set last_checked = ");
639
         sql.append(DatabaseService.getDBAdapter().toDate(datestr, "MM/DD/YY HH24:MI:SS"));
640
         sql.append(" where server like '").append(server).append("'");
641
         pstmt = dbConn.prepareStatement(sql.toString());
642

    
643
         pstmt.executeUpdate();
644
         dbConn.commit();
645
         pstmt.close();
646
         logMetacat.info("last_checked updated to "+datestr+" on "
647
                                      + server);
648
      }//if
649
      else
650
      {
651

    
652
         logMetacat.info("Failed to update last_checked for server "  +
653
                                  server + " in db because couldn't get time "
654
                                  );
655
         throw new Exception("Couldn't get time for server "+ server);
656
      }
657

    
658
    }//try
659
    catch(Exception e)
660
    {
661

    
662
      logMetacat.error("Failed to update last_checked for server " +
663
                                server + " in db because because " +
664
                                e.getMessage());
665
      throw e;
666
    }
667
    finally
668
    {
669
       //return DBConnection
670
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
671
    }//finally
672
  }
673

    
674

    
675

    
676
  /**
677
   * updates xml_catalog with entries from other servers.
678
   */
679
  private void updateCatalog()
680
  {
681
    logMetacat.info("Start of updateCatalog");
682
    // ReplicationServer object in server list
683
    ReplicationServer replServer = null;
684
    PreparedStatement pstmt = null;
685
    String server = null;
686

    
687

    
688
    // Go through each ReplicationServer object in sererlist
689
    for (int j=0; j<serverList.size(); j++)
690
    {
691
      Vector remoteCatalog = new Vector();
692
      Vector publicId = new Vector();
693
      try
694
      {
695
        // Get ReplicationServer object from server list
696
        replServer = serverList.serverAt(j);
697
        // Get server name from the ReplicationServer object
698
        server = replServer.getServerName();
699
        // Try to get catalog
700
        URL u = new URL("https://" + server + "?server="+
701
        MetaCatUtil.getLocalReplicationServerName()+"&action=getcatalog");
702
        logMetacat.info("sending message " + u.toString());
703
        String catxml = MetacatReplication.getURLContent(u);
704

    
705
        // Make sure there are not error, no empty string
706
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
707
        {
708
          throw new Exception("Couldn't get catalog list form server " +server);
709
        }
710
        logMetacat.info("catxml: " + catxml);
711
        CatalogMessageHandler cmh = new CatalogMessageHandler();
712
        XMLReader catparser = initParser(cmh);
713
        catparser.parse(new InputSource(new StringReader(catxml)));
714
        //parse the returned catalog xml and put it into a vector
715
        remoteCatalog = cmh.getCatalogVect();
716

    
717
        // Makse sure remoteCatalog is not empty
718
        if (remoteCatalog.isEmpty())
719
        {
720
          throw new Exception("Couldn't get catalog list form server " +server);
721
        }
722

    
723
        String localcatxml = MetacatReplication.getCatalogXML();
724

    
725
        // Make sure local catalog is no empty
726
        if (localcatxml==null||localcatxml.equals(""))
727
        {
728
          throw new Exception("Couldn't get catalog list form server " +server);
729
        }
730

    
731
        cmh = new CatalogMessageHandler();
732
        catparser = initParser(cmh);
733
        catparser.parse(new InputSource(new StringReader(localcatxml)));
734
        Vector localCatalog = cmh.getCatalogVect();
735

    
736
        //now we have the catalog from the remote server and this local server
737
        //we now need to compare the two and merge the differences.
738
        //the comparison is base on the public_id fields which is the 4th
739
        //entry in each row vector.
740
        publicId = new Vector();
741
        for(int i=0; i<localCatalog.size(); i++)
742
        {
743
          Vector v = new Vector((Vector)localCatalog.elementAt(i));
744
          logMetacat.info("v1: " + v.toString());
745
          publicId.add(new String((String)v.elementAt(3)));
746
          //System.out.println("adding " + (String)v.elementAt(3));
747
        }
748
      }//try
749
      catch (Exception e)
750
      {
751
        MetacatReplication.replErrorLog("Failed to update catalog for server "+
752
                                    server + " because " +e.getMessage());
753
        logMetacat.error("Failed to update catalog for server "+
754
                                    server + " because " +e.getMessage());
755
      }//catch
756

    
757
      for(int i=0; i<remoteCatalog.size(); i++)
758
      {
759
         // DConnection
760
        DBConnection dbConn = null;
761
        // DBConnection checkout serial number
762
        int serialNumber = -1;
763
        try
764
        {
765
            dbConn=DBConnectionPool.
766
                  getDBConnection("ReplicationHandler.updateCatalog");
767
            serialNumber=dbConn.getCheckOutSerialNumber();
768
            Vector v = (Vector)remoteCatalog.elementAt(i);
769
            //System.out.println("v2: " + v.toString());
770
            //System.out.println("i: " + i);
771
            //System.out.println("remoteCatalog.size(): " + remoteCatalog.size());
772
            //System.out.println("publicID: " + publicId.toString());
773
            logMetacat.info
774
                              ("v.elementAt(3): " + (String)v.elementAt(3));
775
           if(!publicId.contains(v.elementAt(3)))
776
           { //so we don't have this public id in our local table so we need to
777
             //add it.
778
             //System.out.println("in if");
779
             StringBuffer sql = new StringBuffer();
780
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
781
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
782
             sql.append("?,?)");
783
             //System.out.println("sql: " + sql.toString());
784
             pstmt = dbConn.prepareStatement(sql.toString());
785
             pstmt.setString(1, (String)v.elementAt(0));
786
             pstmt.setString(2, (String)v.elementAt(1));
787
             pstmt.setString(3, (String)v.elementAt(2));
788
             pstmt.setString(4, (String)v.elementAt(3));
789
             pstmt.setString(5, (String)v.elementAt(4));
790
             pstmt.execute();
791
             pstmt.close();
792
             MetacatReplication.replLog("Success fully to insert new publicid "+
793
                               (String)v.elementAt(3) + " from server"+server);
794
             logMetacat.info("Success fully to insert new publicid "+
795
                             (String)v.elementAt(3) + " from server" +server);
796
           }
797
        }
798
        catch(Exception e)
799
        {
800
           MetacatReplication.replErrorLog("Failed to update catalog for server "+
801
                                    server + " because " +e.getMessage());
802
           logMetacat.error("Failed to update catalog for server "+
803
                                    server + " because " +e.getMessage());
804
        }//catch
805
        finally
806
        {
807
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
808
        }//finall
809
      }//for remote catalog
810
    }//for server list
811
    logMetacat.info("End of updateCatalog");
812
  }
813

    
814
  /**
815
   * Method that returns true if docid has already been "deleted" from metacat.
816
   * This method really implements a truth table for deleted documents
817
   * The table is (a docid in one of the tables is represented by the X):
818
   * xml_docs      xml_revs      deleted?
819
   * ------------------------------------
820
   *   X             X             FALSE
821
   *   X             _             FALSE
822
   *   _             X             TRUE
823
   *   _             _             TRUE
824
   */
825
  private static boolean alreadyDeleted(String docid) throws Exception
826
  {
827
    DBConnection dbConn = null;
828
    int serialNumber = -1;
829
    PreparedStatement pstmt = null;
830
    try
831
    {
832
      dbConn=DBConnectionPool.
833
                  getDBConnection("ReplicationHandler.alreadyDeleted");
834
      serialNumber=dbConn.getCheckOutSerialNumber();
835
      boolean xml_docs = false;
836
      boolean xml_revs = false;
837

    
838
      StringBuffer sb = new StringBuffer();
839
      sb.append("select docid from xml_revisions where docid like '");
840
      sb.append(docid).append("'");
841
      pstmt = dbConn.prepareStatement(sb.toString());
842
      pstmt.execute();
843
      ResultSet rs = pstmt.getResultSet();
844
      boolean tablehasrows = rs.next();
845
      if(tablehasrows)
846
      {
847
        xml_revs = true;
848
      }
849

    
850
      sb = new StringBuffer();
851
      sb.append("select docid from xml_documents where docid like '");
852
      sb.append(docid).append("'");
853
      pstmt.close();
854
      pstmt = dbConn.prepareStatement(sb.toString());
855
      //increase usage count
856
      dbConn.increaseUsageCount(1);
857
      pstmt.execute();
858
      rs = pstmt.getResultSet();
859
      tablehasrows = rs.next();
860
      pstmt.close();
861
      if(tablehasrows)
862
      {
863
        xml_docs = true;
864
      }
865

    
866
      if(xml_docs && xml_revs)
867
      {
868
        return false;
869
      }
870
      else if(xml_docs && !xml_revs)
871
      {
872
        return false;
873
      }
874
      else if(!xml_docs && xml_revs)
875
      {
876
        return true;
877
      }
878
      else if(!xml_docs && !xml_revs)
879
      {
880
        return true;
881
      }
882
    }
883
    catch(Exception e)
884
    {
885
      logMetacat.error("error in ReplicationHandler.alreadyDeleted: " +
886
                          e.getMessage());
887
      throw e;
888
    }
889
    finally
890
    {
891
      try
892
      {
893
        pstmt.close();
894
      }//try
895
      catch (SQLException ee)
896
      {
897
        logMetacat.error("Error in replicationHandler.alreadyDeleted "+
898
                          "to close pstmt: "+ee.getMessage());
899
        throw ee;
900
      }//catch
901
      finally
902
      {
903
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
904
      }//finally
905
    }//finally
906
    return false;
907
  }
908

    
909

    
910
  /**
911
   * Method to initialize the message parser
912
   */
913
  public static XMLReader initParser(DefaultHandler dh)
914
          throws Exception
915
  {
916
    XMLReader parser = null;
917

    
918
    try {
919
      ContentHandler chandler = dh;
920

    
921
      // Get an instance of the parser
922
      String parserName = PropertyService.getProperty("xml.saxparser");
923
      parser = XMLReaderFactory.createXMLReader(parserName);
924

    
925
      // Turn off validation
926
      parser.setFeature("http://xml.org/sax/features/validation", false);
927

    
928
      parser.setContentHandler((ContentHandler)chandler);
929
      parser.setErrorHandler((ErrorHandler)chandler);
930

    
931
    } catch (Exception e) {
932
      throw e;
933
    }
934

    
935
    return parser;
936
  }
937

    
938
  /**
939
   * This method will combinate given time string(in short format) to
940
   * current date. If the given time (e.g 10:00 AM) passed the current time
941
   * (e.g 2:00 PM Aug 21, 2005), then the time will set to second day,
942
   * 10:00 AM Aug 22, 2005. If the given time (e.g 10:00 AM) haven't passed
943
   * the current time (e.g 8:00 AM Aug 21, 2005) The time will set to be
944
   * 10:00 AM Aug 21, 2005.
945
   * @param givenTime  the format should be "10:00 AM " or "2:00 PM"
946
   * @return
947
   * @throws Exception
948
   */
949
  public static Date combinateCurrentDateAndGivenTime(String givenTime) throws Exception
950
  {
951
     Date givenDate = parseTime(givenTime);
952
     Date newDate = null;
953
     Date now = new Date();
954
     String currentTimeString = getTimeString(now);
955
     Date currentTime = parseTime(currentTimeString); 
956
     if ( currentTime.getTime() >= givenDate.getTime())
957
     {
958
        logMetacat.info("Today already pass the given time, we should set it as tomorrow");
959
        String dateAndTime = getDateString(now) + " " + givenTime;
960
        Date combinationDate = parseDateTime(dateAndTime);
961
        // new date should plus 24 hours to make is the second day
962
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
963
     }
964
     else
965
     {
966
         logMetacat.info("Today haven't pass the given time, we should it as today");
967
         String dateAndTime = getDateString(now) + " " + givenTime;
968
         newDate = parseDateTime(dateAndTime);
969
     }
970
     logMetacat.warn("final setting time is "+ newDate.toString());
971
     return newDate;
972
  }
973

    
974
  /*
975
   * parse a given string to Time in short format.
976
   * For example, given time is 10:00 AM, the date will be return as
977
   * Jan 1 1970, 10:00 AM
978
   */
979
  private static Date parseTime(String timeString) throws Exception
980
  {
981
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
982
    Date time = format.parse(timeString); 
983
    logMetacat.info("Date string is after parse a time string "
984
                              +time.toString());
985
    return time;
986

    
987
  }
988
  
989
  /*
990
   * Pasre a given string to date and time. Date format is long and time
991
   * format is short.
992
   */
993
  private static Date parseDateTime(String timeString) throws Exception
994
  {
995
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
996
    Date time = format.parse(timeString);
997
    logMetacat.info("Date string is after parse a time string "+
998
                             time.toString());
999
    return time;
1000
  }
1001
  
1002
  /*
1003
   * Get a date string from a Date object. The date format will be long
1004
   */
1005
  private static String getDateString(Date now)
1006
  {
1007
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1008
     String s = df.format(now);
1009
     logMetacat.info("Today is " + s);
1010
     return s;
1011
  }
1012
  
1013
  /*
1014
   * Get a time string from a Date object, the time format will be short
1015
   */
1016
  private static String getTimeString(Date now)
1017
  {
1018
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1019
     String s = df.format(now);
1020
     logMetacat.info("Time is " + s);
1021
     return s;
1022
  }
1023
  
1024
  
1025
  /*
1026
	 * This method will go through the docid list both in xml_Documents table
1027
	 * and in xml_revisions table @author tao
1028
	 */
1029
	private void handleDocList(Vector docList, String tableName) {
1030
		boolean dataFile = false;
1031
		for (int j = 0; j < docList.size(); j++) {
1032
			// initial dataFile is false
1033
			dataFile = false;
1034
			// w is information for one document, information contain
1035
			// docid, rev, server or datafile.
1036
			Vector w = new Vector((Vector) (docList.elementAt(j)));
1037
			// Check if the vector w contain "datafile"
1038
			// If it has, this document is data file
1039
			try {
1040
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1041
					dataFile = true;
1042
				}
1043
			} catch (PropertyNotFoundException pnfe) {
1044
				logMetacat.error("Could not retrieve data file flag property.  "
1045
						+ "Leaving as false: " + pnfe.getMessage());
1046
			}
1047
			// System.out.println("w: " + w.toString());
1048
			// Get docid
1049
			String docid = (String) w.elementAt(0);
1050
			logMetacat.info("docid: " + docid);
1051
			// Get revision number
1052
			int rev = Integer.parseInt((String) w.elementAt(1));
1053
			logMetacat.info("rev: " + rev);
1054
			// Get remote server name (it is may not be doc homeserver because
1055
			// the new hub feature
1056
			String remoteServer = (String) w.elementAt(2);
1057
			remoteServer = remoteServer.trim();
1058

    
1059
			try {
1060
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1061
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1062
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1063
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1064
				} else {
1065
					continue;
1066
				}
1067

    
1068
			} catch (Exception e) {
1069
				logMetacat.error("error to handle update doc in " + tableName
1070
						+ " in time replication" + e.getMessage());
1071
				continue;
1072
			}
1073

    
1074
		}// for update docs
1075

    
1076
	}
1077
   
1078
   /*
1079
	 * This method will handle doc in xml_documents table.
1080
	 */
1081
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1082
                                        throws Exception
1083
   {
1084
       // compare the update rev and local rev to see what need happen
1085
       int localrev = -1;
1086
       String action = null;
1087
       boolean flag = false;
1088
       try
1089
       {
1090
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1091
       }
1092
       catch (SQLException e)
1093
       {
1094
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1095
                                " be found because " + e.getMessage());
1096
         MetacatReplication.replErrorLog(""+DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1097
                 "written because error happend to find it's local revision");
1098
         DOCERRORNUMBER++;
1099
         throw new Exception (e.getMessage());
1100
       }
1101
       logMetacat.info("Local rev for docid "+ docid + " is "+
1102
                               localrev);
1103

    
1104
       //check the revs for an update because this document is in the
1105
       //local DB, it might be out of date.
1106
       if (localrev == -1)
1107
       {
1108
          // check if the revision is in the revision table
1109
         Vector localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1110
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1111
         {
1112
             // this version was deleted, so don't need replicate
1113
             flag = false;
1114
         }
1115
         else
1116
         {
1117
           //insert this document as new because it is not in the local DB
1118
           action = "INSERT";
1119
           flag = true;
1120
         }
1121
       }
1122
       else
1123
       {
1124
         if(localrev == rev)
1125
         {
1126
           // Local meatacat has the same rev to remote host, don't need
1127
           // update and flag set false
1128
           flag = false;
1129
         }
1130
         else if(localrev < rev)
1131
         {
1132
           //this document needs to be updated so send an read request
1133
           action = "UPDATE";
1134
           flag = true;
1135
         }
1136
       }
1137
       
1138
       String accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1139
       // this is non-data file
1140
       if(flag && !dataFile)
1141
       {
1142
         try
1143
         {
1144
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1145
         }
1146
         catch(Exception e)
1147
         {
1148
           // skip this document
1149
           throw e;
1150
         }
1151
       }//if for non-data file
1152

    
1153
        // this is for data file
1154
       if(flag && dataFile)
1155
       {
1156
         try
1157
         {
1158
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1159
         }
1160
         catch(Exception e)
1161
         {
1162
           // skip this datafile
1163
           throw e;
1164
         }
1165

    
1166
       }//for datafile
1167
   }
1168
   
1169
   /*
1170
    * This method will handle doc in xml_documents table.
1171
    */
1172
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1173
                                        throws Exception
1174
   {
1175
       // compare the update rev and local rev to see what need happen
1176
       logMetacat.info("In handle repliation revsion table");
1177
       logMetacat.info("the docid is "+ docid);
1178
       logMetacat.info("The rev is "+rev);
1179
       Vector localrev = null;
1180
       String action = "INSERT";
1181
       boolean flag = false;
1182
       try
1183
       {
1184
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1185
       }
1186
       catch (SQLException e)
1187
       {
1188
         logMetacat.error("Local rev for docid "+ docid + " could not "+
1189
                                " be found because " + e.getMessage());
1190
         MetacatReplication.replErrorLog(""+REVERRORNUMBER+" Docid "+ docid + " could not be "+
1191
                 "written because error happend to find it's local revision");
1192
         REVERRORNUMBER++;
1193
         throw new Exception (e.getMessage());
1194
       }
1195
       logMetacat.info("rev list in xml_revision table for docid "+ docid + " is "+
1196
                               localrev.toString());
1197
       
1198
       // if the rev is not in the xml_revision, we need insert it
1199
       if (!localrev.contains(new Integer(rev)))
1200
       {
1201
           flag = true;    
1202
       }
1203
     
1204
       String accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1205
       // this is non-data file
1206
       if(flag && !dataFile)
1207
       {
1208
         try
1209
         {
1210
           
1211
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1212
         }
1213
         catch(Exception e)
1214
         {
1215
           // skip this document
1216
           throw e;
1217
         }
1218
       }//if for non-data file
1219

    
1220
        // this is for data file
1221
       if(flag && dataFile)
1222
       {
1223
         try
1224
         {
1225
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1226
         }
1227
         catch(Exception e)
1228
         {
1229
           // skip this datafile
1230
           throw e;
1231
         }
1232

    
1233
       }//for datafile
1234
   }
1235
   
1236
   /*
1237
    * Return a ip address for given url
1238
    */
1239
   private String getIpFromURL(URL url)
1240
   {
1241
	   String ip = null;
1242
	   try
1243
	   {
1244
	      InetAddress address = InetAddress.getByName(url.getHost());
1245
	      ip = address.getHostAddress();
1246
	   }
1247
	   catch(UnknownHostException e)
1248
	   {
1249
		   logMetacat.error("Error in get ip address for host: "
1250
                   +e.getMessage());
1251
	   }
1252

    
1253
	   return ip;
1254
   }
1255
  
1256
}
1257

    
(61-61/68)