Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2011-09-16 15:17:00 -0700 (Fri, 16 Sep 2011) $'
10
 * '$Revision: 6448 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.text.DateFormat;
39
import java.text.ParseException;
40
import java.util.Date;
41
import java.util.Hashtable;
42
import java.util.TimerTask;
43
import java.util.Vector;
44

    
45
import org.apache.log4j.Logger;
46
import org.dataone.service.types.v1.SystemMetadata;
47
import org.dataone.service.util.TypeMarshaller;
48
import org.xml.sax.ContentHandler;
49
import org.xml.sax.ErrorHandler;
50
import org.xml.sax.InputSource;
51
import org.xml.sax.SAXException;
52
import org.xml.sax.XMLReader;
53
import org.xml.sax.helpers.DefaultHandler;
54
import org.xml.sax.helpers.XMLReaderFactory;
55

    
56
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
57
import edu.ucsb.nceas.metacat.DBUtil;
58
import edu.ucsb.nceas.metacat.DocInfoHandler;
59
import edu.ucsb.nceas.metacat.DocumentImpl;
60
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
61
import edu.ucsb.nceas.metacat.EventLog;
62
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
63
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
64
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
65
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
66
import edu.ucsb.nceas.metacat.database.DBConnection;
67
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
68
import edu.ucsb.nceas.metacat.database.DatabaseService;
69
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
70
import edu.ucsb.nceas.metacat.properties.PropertyService;
71
import edu.ucsb.nceas.metacat.shared.HandlerException;
72
import edu.ucsb.nceas.metacat.util.MetacatUtil;
73
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
74

    
75

    
76

    
77
/**
78
 * This class handles deltaT replication checking.  Whenever this TimerTask
79
 * is fired it checks each server in xml_replication for updates and updates
80
 * the local db as needed.
81
 */
82
public class ReplicationHandler extends TimerTask
83
{
84
  int serverCheckCode = 1;
85
  ReplicationServerList serverList = null;
86
  //PrintWriter out;
87
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
88
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
89
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
90
  private static Logger logD1 = Logger.getLogger("DataOneLogger");
91
  
92
  private static int DOCINSERTNUMBER = 1;
93
  private static int DOCERRORNUMBER  = 1;
94
  private static int REVINSERTNUMBER = 1;
95
  private static int REVERRORNUMBER  = 1;
96
  
97
  private static int _xmlDocQueryCount = 0;
98
  private static int _xmlRevQueryCount = 0;
99
  private static long _xmlDocQueryTime = 0;
100
  private static long _xmlRevQueryTime = 0;
101
  
102
  
103
  public ReplicationHandler()
104
  {
105
    //this.out = o;
106
    serverList = new ReplicationServerList();
107
  }
108

    
109
  public ReplicationHandler(int serverCheckCode)
110
  {
111
    //this.out = o;
112
    this.serverCheckCode = serverCheckCode;
113
    serverList = new ReplicationServerList();
114
  }
115

    
116
  /**
117
   * Method that implements TimerTask.run().  It runs whenever the timer is
118
   * fired.
119
   */
120
  public void run()
121
  {
122
    //find out the last_checked time of each server in the server list and
123
    //send a query to each server to see if there are any documents in
124
    //xml_documents with an update_date > last_checked
125
	  
126
      //if serverList is null, metacat don't need to replication
127
      if (serverList==null||serverList.isEmpty())
128
      {
129
        return;
130
      }
131
      updateCatalog();
132
      update();
133
      //conn.close();
134
  }
135

    
136
  /**
137
   * Method that uses revision tagging for replication instead of update_date.
138
   */
139
  private void update()
140
  {
141
	  
142
	  _xmlDocQueryCount = 0;
143
	  _xmlRevQueryCount = 0;
144
	  _xmlDocQueryTime = 0;
145
	  _xmlRevQueryTime = 0;
146
    /*
147
     Pseudo-algorithm
148
     - request a doc list from each server in xml_replication
149
     - check the rev number of each of those documents agains the
150
       documents in the local database
151
     - pull any documents that have a lesser rev number on the local server
152
       from the remote server
153
     - delete any documents that still exist in the local xml_documents but
154
       are in the deletedDocuments tag of the remote host response.
155
     - update last_checked to keep track of the last time it was checked.
156
       (this info is theoretically not needed using this system but probably
157
       should be kept anyway)
158
    */
159

    
160
    ReplicationServer replServer = null; // Variable to store the
161
                                        // ReplicationServer got from
162
                                        // Server list
163
    String server = null; // Variable to store server name
164
//    String update;
165
    Vector<String> responses = new Vector<String>();
166
    URL u;
167
    long replicationStartTime = System.currentTimeMillis();
168
    long timeToGetServerList = 0;
169
    
170
    //Check for every server in server list to get updated list and put
171
    // them in to response
172
    long startTimeToGetServers = System.currentTimeMillis();
173
    for (int i=0; i<serverList.size(); i++)
174
    {
175
        // Get ReplicationServer object from server list
176
        replServer = serverList.serverAt(i);
177
        // Get server name from ReplicationServer object
178
        server = replServer.getServerName().trim();
179
        String result = null;
180
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
181
        // Send command to that server to get updated docid information
182
        try
183
        {
184
          u = new URL("https://" + server + "?server="
185
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
186
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
187
          result = ReplicationService.getURLContent(u);
188
        }
189
        catch (Exception e)
190
        {
191
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
192
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
193
                       "for server " + server + " because "+e.getMessage());
194
          continue;
195
        }
196

    
197
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
198
        //check if result have error or not, if has skip it.
199
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
200
        {
201
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
202
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
203
                       "for server " + server + " because "+result);
204
          continue;
205
        }
206
        //Add result to vector
207
        responses.add(result);
208
    }
209
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
210

    
211
    //make sure that there is updated file list
212
    //If response is null, metacat don't need do anything
213
    if (responses==null || responses.isEmpty())
214
    {
215
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
216
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
217
                           "every server and failed to replicate");
218
        return;
219
    }
220

    
221

    
222
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
223
    //               "document information: "+ responses.toString());
224
    
225
    long totalServerListParseTime = 0;
226
    // go through response vector(it contains updated vector and delete vector
227
    for(int i=0; i<responses.size(); i++)
228
    {
229
    	long startServerListParseTime = System.currentTimeMillis();
230
    	XMLReader parser;
231
    	ReplMessageHandler message = new ReplMessageHandler();
232
    	try
233
        {
234
          parser = initParser(message);
235
        }
236
        catch (Exception e)
237
        {
238
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
239
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
240
                                " initParser for message and " +e.getMessage());
241
           // stop replication
242
           return;
243
        }
244
    	
245
        try
246
        {
247
          parser.parse(new InputSource(
248
                     new StringReader(
249
                     (String)(responses.elementAt(i)))));
250
        }
251
        catch(Exception e)
252
        {
253
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
254
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
255
                                   "because "+ e.getMessage());
256
          continue;
257
        }
258
        //v is the list of updated documents
259
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
260
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
261
        //d is the list of deleted documents
262
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
263
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
264
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
265
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
266
        // go though every element in updated document vector
267
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
268
        //handle deleted docs
269
        for(int k=0; k<deleteList.size(); k++)
270
        { //delete the deleted documents;
271
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
272
          String docId = (String)w.elementAt(0);
273
          try
274
          {
275
            handleDeleteSingleDocument(docId, server);
276
          }
277
          catch (Exception ee)
278
          {
279
            continue;
280
          }
281
        }//for delete docs
282
        
283
        // handle replicate doc in xml_revision
284
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
285
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
286
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
287
        DOCINSERTNUMBER = 1;
288
        DOCERRORNUMBER  = 1;
289
        REVINSERTNUMBER = 1;
290
        REVERRORNUMBER  = 1;
291
        
292
        // handle system metadata
293
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
294
        for(int k = 0; k < systemMetadataList.size(); k++) { 
295
        	Vector<String> w = systemMetadataList.elementAt(k);
296
        	String guid = (String) w.elementAt(0);
297
        	String remoteserver = (String) w.elementAt(1);
298
        	try {
299
        		handleSystemMetadata(remoteserver, guid);
300
        	}
301
        	catch (Exception ee) {
302
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
303
        		continue;
304
        	}
305
        }
306
        
307
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
308
    }//for response
309

    
310
    //updated last_checked
311
    for (int i=0;i<serverList.size(); i++)
312
    {
313
       // Get ReplicationServer object from server list
314
       replServer = serverList.serverAt(i);
315
       try
316
       {
317
         updateLastCheckTimeForSingleServer(replServer);
318
       }
319
       catch(Exception e)
320
       {
321
         continue;
322
       }
323
    }//for
324
    
325
    long replicationEndTime = System.currentTimeMillis();
326
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
327
    		(replicationEndTime - replicationStartTime));
328
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
329
    		timeToGetServerList);
330
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
331
    		totalServerListParseTime);
332
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
333
    		_xmlDocQueryCount);
334
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
335
    		_xmlDocQueryTime + " ms");
336
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
337
    		_xmlRevQueryCount);
338
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
339
    		_xmlRevQueryTime + " ms");;
340

    
341
  }//update
342

    
343
  /* Handle replicate single xml document*/
344
  private void handleSingleXMLDocument(String remoteserver, String actions,
345
                                       String accNumber, String tableName)
346
               throws HandlerException
347
  {
348
    DBConnection dbConn = null;
349
    int serialNumber = -1;
350
    try
351
    {
352
      // Get DBConnection from pool
353
      dbConn=DBConnectionPool.
354
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
355
      serialNumber=dbConn.getCheckOutSerialNumber();
356
      //if the document needs to be updated or inserted, this is executed
357
      String readDocURLString = "https://" + remoteserver + "?server="+
358
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
359
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
360
      URL u = new URL(readDocURLString);
361

    
362
      // Get docid content
363
      String newxmldoc = ReplicationService.getURLContent(u);
364
      // If couldn't get skip it
365
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
366
      {
367
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
368
      }
369
      //logReplication.info("xml documnet:");
370
      //logReplication.info(newxmldoc);
371

    
372
      // Try get the docid info from remote server
373
      DocInfoHandler dih = new DocInfoHandler();
374
      XMLReader docinfoParser = initParser(dih);
375
      String docInfoURLStr = "https://" + remoteserver +
376
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
377
                       "&action=getdocumentinfo&docid="+accNumber;
378
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
379
      URL docinfoUrl = new URL(docInfoURLStr);
380
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
381
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
382
      
383
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
384
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
385
      // Get home server of the docid
386
      String docHomeServer = docinfoHash.get("home_server");
387
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
388
      String createdDate = docinfoHash.get("date_created");
389
      String updatedDate = docinfoHash.get("date_updated");
390
      //docid should include rev number too
391
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
392
                                              (String)docinfoHash.get("rev");*/
393
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
394
      String docType = docinfoHash.get("doctype");
395
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
396

    
397
      String parserBase = null;
398
      // this for eml2 and we need user eml2 parser
399
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
400
      {
401
         parserBase = DocumentImpl.EML200;
402
      }
403
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
404
      {
405
        parserBase = DocumentImpl.EML200;
406
      }
407
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
408
      {
409
        parserBase = DocumentImpl.EML210;
410
      }
411
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
412
      {
413
        parserBase = DocumentImpl.EML210;
414
      }
415
      // Write the document into local host
416
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
417
      String newDocid = wrapper.writeReplication(dbConn,
418
                              newxmldoc,
419
                              docinfoHash.get("public_access"),
420
                              null,  /* the dtd text */
421
                              actions,
422
                              accNumber,
423
                              null, //docinfoHash.get("user_owner"),                              
424
                              null, /* null for groups[] */
425
                              docHomeServer,
426
                              remoteserver, tableName, true,// true is for time replication 
427
                              createdDate,
428
                              updatedDate);
429
      
430
      //set the user information
431
      String user = (String) docinfoHash.get("user_owner");
432
      String updated = (String) docinfoHash.get("user_updated");
433
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
434
      
435
      //process extra access rules 
436
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
437
      if (xmlAccessDAOList != null) {
438
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
439
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
440
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
441
      			acfsf.insertPermissions(xmlAccessDAO);
442
      		}
443
          }
444
      }
445
      
446
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
447
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
448
      {
449
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
450
                                     " into "+tableName + " from " +
451
                                         remoteserver);
452
        DOCINSERTNUMBER++;
453
      }
454
      else
455
      {
456
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
457
                  " into "+tableName + " from " +
458
                      remoteserver);
459
          REVINSERTNUMBER++;
460
      }
461
      String ip = getIpFromURL(u);
462
      EventLog.getInstance().log(ip, ReplicationService.REPLICATIONUSER, accNumber, actions);
463
      
464

    
465
    }//try
466
    catch(Exception e)
467
    {
468
        
469
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
470
        {
471
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
472
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
473
                                       " into "+tableName + " from " +
474
                                           remoteserver + " because "+e.getMessage());
475
          DOCERRORNUMBER++;
476
        }
477
        else
478
        {
479
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
480
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
481
                    " into "+tableName + " from " +
482
                        remoteserver +" because "+e.getMessage());
483
            REVERRORNUMBER++;
484
        }
485
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
486
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
487
                                      " into db because " +e.getMessage());
488
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
489
    		  + "writing Replication: " +e.getMessage());
490
    }
491
    finally
492
    {
493
       //return DBConnection
494
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
495
    }//finally
496
    logD1.info("replication.create localId:" + accNumber);
497
  }
498

    
499

    
500

    
501
  /* Handle replicate single xml document*/
502
  private void handleSingleDataFile(String remoteserver, String actions,
503
                                    String accNumber, String tableName)
504
               throws HandlerException
505
  {
506
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
507
    DBConnection dbConn = null;
508
    int serialNumber = -1;
509
    try
510
    {
511
      // Get DBConnection from pool
512
      dbConn=DBConnectionPool.
513
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
514
      serialNumber=dbConn.getCheckOutSerialNumber();
515
      // Try get docid info from remote server
516
      DocInfoHandler dih = new DocInfoHandler();
517
      XMLReader docinfoParser = initParser(dih);
518
      String docInfoURLString = "https://" + remoteserver +
519
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
520
                  "&action=getdocumentinfo&docid="+accNumber;
521
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
522
      URL docinfoUrl = new URL(docInfoURLString);
523

    
524
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
525
      
526
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
527
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
528
      
529
      // Get docid name (such as acl or dataset)
530
      String docName = docinfoHash.get("docname");
531
      // Get doc type (eml public id)
532
      String docType = docinfoHash.get("doctype");
533
      // Get docid home sever. it might be different to remoteserver
534
      // because of hub feature
535
      String docHomeServer = docinfoHash.get("home_server");
536
      String createdDate = docinfoHash.get("date_created");
537
      String updatedDate = docinfoHash.get("date_updated");
538
      //docid should include rev number too
539
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
540
                                              (String)docinfoHash.get("rev");*/
541

    
542
      String datafilePath = PropertyService.getProperty("application.datafilepath");
543
      // Get data file content
544
      String readDataURLString = "https://" + remoteserver + "?server="+
545
                                        MetacatUtil.getLocalReplicationServerName()+
546
                                            "&action=readdata&docid="+accNumber;
547
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
548
      URL u = new URL(readDataURLString);
549
      InputStream input = u.openStream();
550
      //register data file into xml_documents table and wite data file
551
      //into file system
552
      if ( input != null)
553
      {
554
        DocumentImpl.writeDataFileInReplication(input,
555
                                                datafilePath,
556
                                                docName,docType,
557
                                                accNumber,
558
                                                null,
559
                                                docHomeServer,
560
                                                remoteserver,
561
                                                tableName,
562
                                                true, //true means timed replication
563
                                                createdDate,
564
                                                updatedDate);
565
                                         
566
        //set the user information
567
        String user = (String) docinfoHash.get("user_owner");
568
		String updated = (String) docinfoHash.get("user_updated");
569
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
570
        
571
        //process extra access rules
572
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
573
        if (xmlAccessDAOList != null) {
574
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
575
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
576
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
577
        			acfsf.insertPermissions(xmlAccessDAO);
578
        		}
579
            }
580
        }
581
        
582
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
583
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
584
                                    remote server);*/
585
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
586
        {
587
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
588
                                       " into "+tableName + " from " +
589
                                           remoteserver);
590
          DOCINSERTNUMBER++;
591
        }
592
        else
593
        {
594
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
595
                    " into "+tableName + " from " +
596
                        remoteserver);
597
            REVINSERTNUMBER++;
598
        }
599
        String ip = getIpFromURL(u);
600
        EventLog.getInstance().log(ip, ReplicationService.REPLICATIONUSER, accNumber, actions);
601
        
602
      }//if
603
      else
604
      {
605
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
606
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
607
      }//else
608

    
609
    }//try
610
    catch(Exception e)
611
    {
612
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
613
                                      " because " +e.getMessage());*/
614
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
615
      {
616
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
617
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
618
                                     " into " + tableName + " from " +
619
                                         remoteserver + " because " + e.getMessage());
620
        DOCERRORNUMBER++;
621
      }
622
      else
623
      {
624
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
625
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
626
                  " into " + tableName + " from " +
627
                      remoteserver +" because "+ e.getMessage());
628
          REVERRORNUMBER++;
629
      }
630
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
631
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
632
                                      " because " + e.getMessage());
633
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
634
    		  + "writing Replication: " + e.getMessage());
635
    }
636
    finally
637
    {
638
       //return DBConnection
639
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
640
    }//finally
641
    logD1.info("replication.create localId:" + accNumber);
642
  }
643

    
644

    
645

    
646
  /* Handle delete single document*/
647
  private void handleDeleteSingleDocument(String docId, String notifyServer)
648
               throws HandlerException
649
  {
650
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
651
    DBConnection dbConn = null;
652
    int serialNumber = -1;
653
    try
654
    {
655
      // Get DBConnection from pool
656
      dbConn=DBConnectionPool.
657
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
658
      serialNumber=dbConn.getCheckOutSerialNumber();
659
      if(!alreadyDeleted(docId))
660
      {
661

    
662
         //because delete method docid should have rev number
663
         //so we just add one for it. This rev number is no sence.
664
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
665
         DocumentImpl.delete(accnum, null, null, notifyServer);
666
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
667
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
668
         URL u = new URL("https://"+notifyServer);
669
         String ip = getIpFromURL(u);
670
         EventLog.getInstance().log(ip, ReplicationService.REPLICATIONUSER, docId, "delete");
671
      }
672

    
673
    }//try
674
    catch(McdbDocNotFoundException e)
675
    {
676
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
677
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
678
                                 " in db because because " + e.getMessage());
679
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
680
    		  + "when handling document: " + e.getMessage());
681
    }
682
    catch(InsufficientKarmaException e)
683
    {
684
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
685
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
686
                                 " in db because because " + e.getMessage());
687
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
688
    		  + "when handling document: " + e.getMessage());
689
    }
690
    catch(SQLException e)
691
    {
692
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
693
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
694
                                 " in db because because " + e.getMessage());
695
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
696
    		  + "when handling document: " + e.getMessage());
697
    }
698
    catch(Exception e)
699
    {
700
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
701
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
702
                                 " in db because because " + e.getMessage());
703
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
704
    		  + "when handling document: " + e.getMessage());
705
    }
706
    finally
707
    {
708
       //return DBConnection
709
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
710
    }//finally
711
    logD1.info("replication.handleDeleteSingleDocument localId:" + docId);
712
  }
713

    
714
  /* Handle updateLastCheckTimForSingleServer*/
715
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
716
                                                  throws HandlerException
717
  {
718
    String server = repServer.getServerName();
719
    DBConnection dbConn = null;
720
    int serialNumber = -1;
721
    PreparedStatement pstmt = null;
722
    try
723
    {
724
      // Get DBConnection from pool
725
      dbConn=DBConnectionPool.
726
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
727
      serialNumber=dbConn.getCheckOutSerialNumber();
728

    
729
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
730
      // Get time from remote server
731
      URL dateurl = new URL("https://" + server + "?server="+
732
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
733
      String datexml = ReplicationService.getURLContent(dateurl);
734
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
735
      if (datexml!=null && !datexml.equals(""))
736
      {
737
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
738
         StringBuffer sql = new StringBuffer();
739
         /*sql.append("update xml_replication set last_checked = to_date('");
740
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
741
         sql.append("server like '").append(server).append("'");*/
742
         sql.append("update xml_replication set last_checked = ");
743
         sql.append(DatabaseService.getInstance().getDBAdapter().toDate(datestr, "MM/DD/YY HH24:MI:SS"));
744
         sql.append(" where server like '").append(server).append("'");
745
         pstmt = dbConn.prepareStatement(sql.toString());
746

    
747
         pstmt.executeUpdate();
748
         dbConn.commit();
749
         pstmt.close();
750
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
751
                                      + server);
752
      }//if
753
      else
754
      {
755

    
756
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
757
                                  server + " in db because couldn't get time "
758
                                  );
759
         throw new Exception("Couldn't get time for server "+ server);
760
      }
761

    
762
    }//try
763
    catch(Exception e)
764
    {
765
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
766
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
767
                                server + " in db because because " + e.getMessage());
768
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
769
    		  + "Error updating last checked time: " + e.getMessage());
770
    }
771
    finally
772
    {
773
       //return DBConnection
774
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
775
    }//finally
776
  }
777
  
778
  	/**
779
	 * Handle replicate system metadata
780
	 * 
781
	 * @param remoteserver
782
	 * @param guid
783
	 * @throws HandlerException
784
	 */
785
	private void handleSystemMetadata(String remoteserver, String guid) 
786
		throws HandlerException {
787
		try {
788

    
789
			// Try get the system metadata from remote server
790
			String sysMetaURLStr = "https://" + remoteserver + "?server="
791
					+ MetacatUtil.getLocalReplicationServerName()
792
					+ "&action=getsystemmetadata&guid=" + guid;
793
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
794
			URL sysMetaUrl = new URL(sysMetaURLStr);
795
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
796
							+ sysMetaUrl.toString());
797
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
798

    
799
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
800

    
801
			// process system metadata
802
			if (systemMetadataXML != null) {
803
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
804
								new ByteArrayInputStream(systemMetadataXML
805
										.getBytes("UTF-8")));
806
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
807
			}
808

    
809
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
810
							+ guid);
811

    
812
			String ip = getIpFromURL(sysMetaUrl);
813
			EventLog.getInstance().log(ip, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
814

    
815
		} catch (Exception e) {
816
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
817
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
818
			logReplication
819
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
820
							+ guid + " into db because " + e.getMessage());
821
			throw new HandlerException(
822
					"ReplicationHandler.handleSystemMetadata - generic exception "
823
							+ "writing Replication: " + e.getMessage());
824
		}
825

    
826
	}
827

    
828
  /**
829
   * updates xml_catalog with entries from other servers.
830
   */
831
  private void updateCatalog()
832
  {
833
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
834
    // ReplicationServer object in server list
835
    ReplicationServer replServer = null;
836
    PreparedStatement pstmt = null;
837
    String server = null;
838

    
839

    
840
    // Go through each ReplicationServer object in sererlist
841
    for (int j=0; j<serverList.size(); j++)
842
    {
843
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
844
      Vector<String> publicId = new Vector<String>();
845
      try
846
      {
847
        // Get ReplicationServer object from server list
848
        replServer = serverList.serverAt(j);
849
        // Get server name from the ReplicationServer object
850
        server = replServer.getServerName();
851
        // Try to get catalog
852
        URL u = new URL("https://" + server + "?server="+
853
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
854
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
855
        String catxml = ReplicationService.getURLContent(u);
856

    
857
        // Make sure there are not error, no empty string
858
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
859
        {
860
          throw new Exception("Couldn't get catalog list form server " +server);
861
        }
862
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
863
        CatalogMessageHandler cmh = new CatalogMessageHandler();
864
        XMLReader catparser = initParser(cmh);
865
        catparser.parse(new InputSource(new StringReader(catxml)));
866
        //parse the returned catalog xml and put it into a vector
867
        remoteCatalog = cmh.getCatalogVect();
868

    
869
        // Make sure remoteCatalog is not empty
870
        if (remoteCatalog.isEmpty())
871
        {
872
          throw new Exception("Couldn't get catalog list form server " +server);
873
        }
874

    
875
        String localcatxml = ReplicationService.getCatalogXML();
876

    
877
        // Make sure local catalog is no empty
878
        if (localcatxml==null||localcatxml.equals(""))
879
        {
880
          throw new Exception("Couldn't get catalog list form server " +server);
881
        }
882

    
883
        cmh = new CatalogMessageHandler();
884
        catparser = initParser(cmh);
885
        catparser.parse(new InputSource(new StringReader(localcatxml)));
886
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
887

    
888
        //now we have the catalog from the remote server and this local server
889
        //we now need to compare the two and merge the differences.
890
        //the comparison is base on the public_id fields which is the 4th
891
        //entry in each row vector.
892
        publicId = new Vector<String>();
893
        for(int i=0; i<localCatalog.size(); i++)
894
        {
895
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
896
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
897
          publicId.add(new String((String)v.elementAt(3)));
898
        }
899
      }//try
900
      catch (Exception e)
901
      {
902
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
903
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
904
                                    server + " because " +e.getMessage());
905
      }//catch
906

    
907
      for(int i=0; i<remoteCatalog.size(); i++)
908
      {
909
         // DConnection
910
        DBConnection dbConn = null;
911
        // DBConnection checkout serial number
912
        int serialNumber = -1;
913
        try
914
        {
915
            dbConn=DBConnectionPool.
916
                  getDBConnection("ReplicationHandler.updateCatalog");
917
            serialNumber=dbConn.getCheckOutSerialNumber();
918
            Vector<String> v = remoteCatalog.elementAt(i);
919
            //logMetacat.debug("v2: " + v.toString());
920
            //logMetacat.debug("i: " + i);
921
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
922
            //logMetacat.debug("publicID: " + publicId.toString());
923
            logReplication.info
924
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
925
           if(!publicId.contains(v.elementAt(3)))
926
           { //so we don't have this public id in our local table so we need to
927
             //add it.
928
             //logMetacat.debug("in if");
929
             StringBuffer sql = new StringBuffer();
930
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
931
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
932
             sql.append("?,?)");
933
             //logMetacat.debug("sql: " + sql.toString());
934
             pstmt = dbConn.prepareStatement(sql.toString());
935
             pstmt.setString(1, (String)v.elementAt(0));
936
             pstmt.setString(2, (String)v.elementAt(1));
937
             pstmt.setString(3, (String)v.elementAt(2));
938
             pstmt.setString(4, (String)v.elementAt(3));
939
             pstmt.setString(5, (String)v.elementAt(4));
940
             pstmt.execute();
941
             pstmt.close();
942
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
943
                               (String)v.elementAt(3) + " from server"+server);
944
           }
945
        }
946
        catch(Exception e)
947
        {
948
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
949
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
950
                                    server + " because " +e.getMessage());
951
        }//catch
952
        finally
953
        {
954
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
955
        }//finally
956
      }//for remote catalog
957
    }//for server list
958
    logReplication.info("End of updateCatalog");
959
  }
960

    
961
  /**
962
   * Method that returns true if docid has already been "deleted" from metacat.
963
   * This method really implements a truth table for deleted documents
964
   * The table is (a docid in one of the tables is represented by the X):
965
   * xml_docs      xml_revs      deleted?
966
   * ------------------------------------
967
   *   X             X             FALSE
968
   *   X             _             FALSE
969
   *   _             X             TRUE
970
   *   _             _             TRUE
971
   */
972
  private static boolean alreadyDeleted(String docid) throws HandlerException
973
  {
974
    DBConnection dbConn = null;
975
    int serialNumber = -1;
976
    PreparedStatement pstmt = null;
977
    try
978
    {
979
      dbConn=DBConnectionPool.
980
                  getDBConnection("ReplicationHandler.alreadyDeleted");
981
      serialNumber=dbConn.getCheckOutSerialNumber();
982
      boolean xml_docs = false;
983
      boolean xml_revs = false;
984

    
985
      StringBuffer sb = new StringBuffer();
986
      sb.append("select docid from xml_revisions where docid like '");
987
      sb.append(docid).append("'");
988
      pstmt = dbConn.prepareStatement(sb.toString());
989
      pstmt.execute();
990
      ResultSet rs = pstmt.getResultSet();
991
      boolean tablehasrows = rs.next();
992
      if(tablehasrows)
993
      {
994
        xml_revs = true;
995
      }
996

    
997
      sb = new StringBuffer();
998
      sb.append("select docid from xml_documents where docid like '");
999
      sb.append(docid).append("'");
1000
      pstmt.close();
1001
      pstmt = dbConn.prepareStatement(sb.toString());
1002
      //increase usage count
1003
      dbConn.increaseUsageCount(1);
1004
      pstmt.execute();
1005
      rs = pstmt.getResultSet();
1006
      tablehasrows = rs.next();
1007
      pstmt.close();
1008
      if(tablehasrows)
1009
      {
1010
        xml_docs = true;
1011
      }
1012

    
1013
      if(xml_docs && xml_revs)
1014
      {
1015
        return false;
1016
      }
1017
      else if(xml_docs && !xml_revs)
1018
      {
1019
        return false;
1020
      }
1021
      else if(!xml_docs && xml_revs)
1022
      {
1023
        return true;
1024
      }
1025
      else if(!xml_docs && !xml_revs)
1026
      {
1027
        return true;
1028
      }
1029
    }
1030
    catch(Exception e)
1031
    {
1032
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1033
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1034
                          e.getMessage());
1035
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1036
    		  + e.getMessage());
1037
    }
1038
    finally
1039
    {
1040
      try
1041
      {
1042
        pstmt.close();
1043
      }//try
1044
      catch (SQLException ee)
1045
      {
1046
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1047
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1048
                          "to close pstmt: "+ee.getMessage());
1049
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1050
      		  + ee.getMessage());
1051
      }//catch
1052
      finally
1053
      {
1054
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1055
      }//finally
1056
    }//finally
1057
    return false;
1058
  }
1059

    
1060

    
1061
  /**
1062
   * Method to initialize the message parser
1063
   */
1064
  public static XMLReader initParser(DefaultHandler dh)
1065
          throws HandlerException
1066
  {
1067
    XMLReader parser = null;
1068

    
1069
    try {
1070
      ContentHandler chandler = dh;
1071

    
1072
      // Get an instance of the parser
1073
      String parserName = PropertyService.getProperty("xml.saxparser");
1074
      parser = XMLReaderFactory.createXMLReader(parserName);
1075

    
1076
      // Turn off validation
1077
      parser.setFeature("http://xml.org/sax/features/validation", false);
1078

    
1079
      parser.setContentHandler((ContentHandler)chandler);
1080
      parser.setErrorHandler((ErrorHandler)chandler);
1081

    
1082
    } catch (SAXException se) {
1083
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1084
    		  + " initializing parser: " + se.getMessage());
1085
    } catch (PropertyNotFoundException pnfe) {
1086
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1087
      		  + " getting parser name: " + pnfe.getMessage());
1088
    } 
1089

    
1090
    return parser;
1091
  }
1092

    
1093
  /**
1094
	 * This method will combine given time string(in short format) to current
1095
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1096
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1097
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1098
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1099
	 * 
1100
	 * @param givenTime
1101
	 *            the format should be "10:00 AM " or "2:00 PM"
1102
	 * @return
1103
	 * @throws Exception
1104
	 */
1105
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1106
  {
1107
	  try {
1108
     Date givenDate = parseTime(givenTime);
1109
     Date newDate = null;
1110
     Date now = new Date();
1111
     String currentTimeString = getTimeString(now);
1112
     Date currentTime = parseTime(currentTimeString); 
1113
     if ( currentTime.getTime() >= givenDate.getTime())
1114
     {
1115
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1116
        String dateAndTime = getDateString(now) + " " + givenTime;
1117
        Date combinationDate = parseDateTime(dateAndTime);
1118
        // new date should plus 24 hours to make is the second day
1119
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1120
     }
1121
     else
1122
     {
1123
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1124
         String dateAndTime = getDateString(now) + " " + givenTime;
1125
         newDate = parseDateTime(dateAndTime);
1126
     }
1127
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1128
     return newDate;
1129
	  } catch (ParseException pe) {
1130
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1131
				  + "parsing error: "  + pe.getMessage());
1132
	  }
1133
  }
1134

    
1135
  /*
1136
	 * parse a given string to Time in short format. For example, given time is
1137
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1138
	 */
1139
  private static Date parseTime(String timeString) throws ParseException
1140
  {
1141
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1142
    Date time = format.parse(timeString); 
1143
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1144
                              +time.toString());
1145
    return time;
1146

    
1147
  }
1148
  
1149
  /*
1150
   * Parse a given string to date and time. Date format is long and time
1151
   * format is short.
1152
   */
1153
  private static Date parseDateTime(String timeString) throws ParseException
1154
  {
1155
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1156
    Date time = format.parse(timeString);
1157
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1158
                             time.toString());
1159
    return time;
1160
  }
1161
  
1162
  /*
1163
   * Get a date string from a Date object. The date format will be long
1164
   */
1165
  private static String getDateString(Date now)
1166
  {
1167
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1168
     String s = df.format(now);
1169
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1170
     return s;
1171
  }
1172
  
1173
  /*
1174
   * Get a time string from a Date object, the time format will be short
1175
   */
1176
  private static String getTimeString(Date now)
1177
  {
1178
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1179
     String s = df.format(now);
1180
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1181
     return s;
1182
  }
1183
  
1184
  
1185
  /*
1186
	 * This method will go through the docid list both in xml_Documents table
1187
	 * and in xml_revisions table @author tao
1188
	 */
1189
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1190
		boolean dataFile = false;
1191
		for (int j = 0; j < docList.size(); j++) {
1192
			// initial dataFile is false
1193
			dataFile = false;
1194
			// w is information for one document, information contain
1195
			// docid, rev, server or datafile.
1196
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1197
			// Check if the vector w contain "datafile"
1198
			// If it has, this document is data file
1199
			try {
1200
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1201
					dataFile = true;
1202
				}
1203
			} catch (PropertyNotFoundException pnfe) {
1204
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1205
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1206
						+ "Leaving as false: " + pnfe.getMessage());
1207
			}
1208
			// logMetacat.debug("w: " + w.toString());
1209
			// Get docid
1210
			String docid = (String) w.elementAt(0);
1211
			logReplication.info("docid: " + docid);
1212
			// Get revision number
1213
			int rev = Integer.parseInt((String) w.elementAt(1));
1214
			logReplication.info("rev: " + rev);
1215
			// Get remote server name (it is may not be doc home server because
1216
			// the new hub feature
1217
			String remoteServer = (String) w.elementAt(2);
1218
			remoteServer = remoteServer.trim();
1219

    
1220
			try {
1221
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1222
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1223
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1224
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1225
				} else {
1226
					continue;
1227
				}
1228

    
1229
			} catch (Exception e) {
1230
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1231
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1232
						+ " in time replication" + e.getMessage());
1233
				continue;
1234
			}
1235
			
1236
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1237
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1238
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1239
	        }
1240
	        
1241
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1242
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1243
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1244
	        }
1245

    
1246
		}// for update docs
1247

    
1248
	}
1249
   
1250
   /*
1251
	 * This method will handle doc in xml_documents table.
1252
	 */
1253
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1254
                                        throws HandlerException
1255
   {
1256
       // compare the update rev and local rev to see what need happen
1257
       int localrev = -1;
1258
       String action = null;
1259
       boolean flag = false;
1260
       try
1261
       {
1262
    	 long docQueryStartTime = System.currentTimeMillis();
1263
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1264
         long docQueryEndTime = System.currentTimeMillis();
1265
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1266
         _xmlDocQueryCount++;
1267
       }
1268
       catch (SQLException e)
1269
       {
1270
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1271
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1272
                                " be found because " + e.getMessage());
1273
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1274
                 "written because error happend to find it's local revision");
1275
         DOCERRORNUMBER++;
1276
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1277
                 " be found: " + e.getMessage());
1278
       }
1279
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1280
                               localrev);
1281

    
1282
       //check the revs for an update because this document is in the
1283
       //local DB, it might be out of date.
1284
       if (localrev == -1)
1285
       {
1286
          // check if the revision is in the revision table
1287
    	   Vector<Integer> localRevVector = null;
1288
    	 try {
1289
        	 long revQueryStartTime = System.currentTimeMillis();
1290
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1291
             long revQueryEndTime = System.currentTimeMillis();
1292
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1293
             _xmlRevQueryCount++;
1294
    	 } catch (SQLException sqle) {
1295
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1296
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1297
    	 }
1298
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1299
         {
1300
             // this version was deleted, so don't need replicate
1301
             flag = false;
1302
         }
1303
         else
1304
         {
1305
           //insert this document as new because it is not in the local DB
1306
           action = "INSERT";
1307
           flag = true;
1308
         }
1309
       }
1310
       else
1311
       {
1312
         if(localrev == rev)
1313
         {
1314
           // Local meatacat has the same rev to remote host, don't need
1315
           // update and flag set false
1316
           flag = false;
1317
         }
1318
         else if(localrev < rev)
1319
         {
1320
           //this document needs to be updated so send an read request
1321
           action = "UPDATE";
1322
           flag = true;
1323
         }
1324
       }
1325
       
1326
       String accNumber = null;
1327
       try {
1328
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1329
       } catch (PropertyNotFoundException pnfe) {
1330
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1331
    			   + "account number separator : " + pnfe.getMessage());
1332
       }
1333
       // this is non-data file
1334
       if(flag && !dataFile)
1335
       {
1336
         try
1337
         {
1338
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1339
         }
1340
         catch(HandlerException he)
1341
         {
1342
           // skip this document
1343
           throw he;
1344
         }
1345
       }//if for non-data file
1346

    
1347
        // this is for data file
1348
       if(flag && dataFile)
1349
       {
1350
         try
1351
         {
1352
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1353
         }
1354
         catch(HandlerException he)
1355
         {
1356
           // skip this data file
1357
           throw he;
1358
         }
1359

    
1360
       }//for data file
1361
   }
1362
   
1363
   /*
1364
    * This method will handle doc in xml_documents table.
1365
    */
1366
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1367
                                        throws HandlerException
1368
   {
1369
       // compare the update rev and local rev to see what need happen
1370
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1371
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1372
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1373
       Vector<Integer> localrev = null;
1374
       String action = "INSERT";
1375
       boolean flag = false;
1376
       try
1377
       {
1378
      	 long revQueryStartTime = System.currentTimeMillis();
1379
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1380
         long revQueryEndTime = System.currentTimeMillis();
1381
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1382
         _xmlRevQueryCount++;
1383
       }
1384
       catch (SQLException sqle)
1385
       {
1386
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1387
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1388
                                " be found because " + sqle.getMessage());
1389
         REVERRORNUMBER++;
1390
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1391
        		 + sqle.getMessage());
1392
       }
1393
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1394
                               localrev.toString());
1395
       
1396
       // if the rev is not in the xml_revision, we need insert it
1397
       if (!localrev.contains(new Integer(rev)))
1398
       {
1399
           flag = true;    
1400
       }
1401
     
1402
       String accNumber = null;
1403
       try {
1404
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1405
       } catch (PropertyNotFoundException pnfe) {
1406
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1407
    			   + "account number separator : " + pnfe.getMessage());
1408
       }
1409
       // this is non-data file
1410
       if(flag && !dataFile)
1411
       {
1412
         try
1413
         {
1414
           
1415
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1416
         }
1417
         catch(HandlerException he)
1418
         {
1419
           // skip this document
1420
           throw he;
1421
         }
1422
       }//if for non-data file
1423

    
1424
        // this is for data file
1425
       if(flag && dataFile)
1426
       {
1427
         try
1428
         {
1429
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1430
         }
1431
         catch(HandlerException he)
1432
         {
1433
           // skip this data file
1434
           throw he;
1435
         }
1436

    
1437
       }//for data file
1438
   }
1439
   
1440
   /*
1441
    * Return a ip address for given url
1442
    */
1443
   private String getIpFromURL(URL url)
1444
   {
1445
	   String ip = null;
1446
	   try
1447
	   {
1448
	      InetAddress address = InetAddress.getByName(url.getHost());
1449
	      ip = address.getHostAddress();
1450
	   }
1451
	   catch(UnknownHostException e)
1452
	   {
1453
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1454
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1455
                   +e.getMessage());
1456
	   }
1457

    
1458
	   return ip;
1459
   }
1460
  
1461
}
1462

    
(4-4/8)