Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-08-06 11:08:27 -0700 (Mon, 06 Aug 2012) $'
10
 * '$Revision: 7348 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Calendar;
42
import java.util.Date;
43
import java.util.Hashtable;
44
import java.util.TimerTask;
45
import java.util.Vector;
46

    
47
import javax.xml.bind.DatatypeConverter;
48

    
49
import org.apache.log4j.Logger;
50
import org.dataone.service.types.v1.SystemMetadata;
51
import org.dataone.service.util.DateTimeMarshaller;
52
import org.dataone.service.util.TypeMarshaller;
53
import org.xml.sax.ContentHandler;
54
import org.xml.sax.ErrorHandler;
55
import org.xml.sax.InputSource;
56
import org.xml.sax.SAXException;
57
import org.xml.sax.XMLReader;
58
import org.xml.sax.helpers.DefaultHandler;
59
import org.xml.sax.helpers.XMLReaderFactory;
60

    
61
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
62
import edu.ucsb.nceas.metacat.DBUtil;
63
import edu.ucsb.nceas.metacat.DocInfoHandler;
64
import edu.ucsb.nceas.metacat.DocumentImpl;
65
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
66
import edu.ucsb.nceas.metacat.EventLog;
67
import edu.ucsb.nceas.metacat.IdentifierManager;
68
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
69
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
70
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
71
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
72
import edu.ucsb.nceas.metacat.database.DBConnection;
73
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
74
import edu.ucsb.nceas.metacat.database.DatabaseService;
75
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
76
import edu.ucsb.nceas.metacat.properties.PropertyService;
77
import edu.ucsb.nceas.metacat.shared.HandlerException;
78
import edu.ucsb.nceas.metacat.util.MetacatUtil;
79
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
80
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
81

    
82

    
83

    
84
/**
85
 * This class handles deltaT replication checking.  Whenever this TimerTask
86
 * is fired it checks each server in xml_replication for updates and updates
87
 * the local db as needed.
88
 */
89
public class ReplicationHandler extends TimerTask
90
{
91
  int serverCheckCode = 1;
92
  ReplicationServerList serverList = null;
93
  //PrintWriter out;
94
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
95
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
96
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
97
  
98
  private static int DOCINSERTNUMBER = 1;
99
  private static int DOCERRORNUMBER  = 1;
100
  private static int REVINSERTNUMBER = 1;
101
  private static int REVERRORNUMBER  = 1;
102
  
103
  private static int _xmlDocQueryCount = 0;
104
  private static int _xmlRevQueryCount = 0;
105
  private static long _xmlDocQueryTime = 0;
106
  private static long _xmlRevQueryTime = 0;
107
  
108
  
109
  public ReplicationHandler()
110
  {
111
    //this.out = o;
112
    serverList = new ReplicationServerList();
113
  }
114

    
115
  public ReplicationHandler(int serverCheckCode)
116
  {
117
    //this.out = o;
118
    this.serverCheckCode = serverCheckCode;
119
    serverList = new ReplicationServerList();
120
  }
121

    
122
  /**
123
   * Method that implements TimerTask.run().  It runs whenever the timer is
124
   * fired.
125
   */
126
  public void run()
127
  {
128
    //find out the last_checked time of each server in the server list and
129
    //send a query to each server to see if there are any documents in
130
    //xml_documents with an update_date > last_checked
131
	  
132
      //if serverList is null, metacat don't need to replication
133
      if (serverList==null||serverList.isEmpty())
134
      {
135
        return;
136
      }
137
      updateCatalog();
138
      update();
139
      //conn.close();
140
  }
141

    
142
  /**
143
   * Method that uses revision tagging for replication instead of update_date.
144
   */
145
  private void update()
146
  {
147
	  
148
	  _xmlDocQueryCount = 0;
149
	  _xmlRevQueryCount = 0;
150
	  _xmlDocQueryTime = 0;
151
	  _xmlRevQueryTime = 0;
152
    /*
153
     Pseudo-algorithm
154
     - request a doc list from each server in xml_replication
155
     - check the rev number of each of those documents agains the
156
       documents in the local database
157
     - pull any documents that have a lesser rev number on the local server
158
       from the remote server
159
     - delete any documents that still exist in the local xml_documents but
160
       are in the deletedDocuments tag of the remote host response.
161
     - update last_checked to keep track of the last time it was checked.
162
       (this info is theoretically not needed using this system but probably
163
       should be kept anyway)
164
    */
165

    
166
    ReplicationServer replServer = null; // Variable to store the
167
                                        // ReplicationServer got from
168
                                        // Server list
169
    String server = null; // Variable to store server name
170
//    String update;
171
    Vector<String> responses = new Vector<String>();
172
    URL u;
173
    long replicationStartTime = System.currentTimeMillis();
174
    long timeToGetServerList = 0;
175
    
176
    //Check for every server in server list to get updated list and put
177
    // them in to response
178
    long startTimeToGetServers = System.currentTimeMillis();
179
    for (int i=0; i<serverList.size(); i++)
180
    {
181
        // Get ReplicationServer object from server list
182
        replServer = serverList.serverAt(i);
183
        // Get server name from ReplicationServer object
184
        server = replServer.getServerName().trim();
185
        String result = null;
186
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
187
        // Send command to that server to get updated docid information
188
        try
189
        {
190
          u = new URL("https://" + server + "?server="
191
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
192
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
193
          result = ReplicationService.getURLContent(u);
194
        }
195
        catch (Exception e)
196
        {
197
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
198
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
199
                       "for server " + server + " because "+e.getMessage());
200
          continue;
201
        }
202

    
203
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
204
        //check if result have error or not, if has skip it.
205
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
206
        {
207
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
208
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
209
                       "for server " + server + " because "+result);
210
          continue;
211
        }
212
        //Add result to vector
213
        responses.add(result);
214
    }
215
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
216

    
217
    //make sure that there is updated file list
218
    //If response is null, metacat don't need do anything
219
    if (responses==null || responses.isEmpty())
220
    {
221
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
222
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
223
                           "every server and failed to replicate");
224
        return;
225
    }
226

    
227

    
228
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
229
    //               "document information: "+ responses.toString());
230
    
231
    long totalServerListParseTime = 0;
232
    // go through response vector(it contains updated vector and delete vector
233
    for(int i=0; i<responses.size(); i++)
234
    {
235
    	long startServerListParseTime = System.currentTimeMillis();
236
    	XMLReader parser;
237
    	ReplMessageHandler message = new ReplMessageHandler();
238
    	try
239
        {
240
          parser = initParser(message);
241
        }
242
        catch (Exception e)
243
        {
244
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
245
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
246
                                " initParser for message and " +e.getMessage());
247
           // stop replication
248
           return;
249
        }
250
    	
251
        try
252
        {
253
          parser.parse(new InputSource(
254
                     new StringReader(
255
                     (String)(responses.elementAt(i)))));
256
        }
257
        catch(Exception e)
258
        {
259
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
260
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
261
                                   "because "+ e.getMessage());
262
          continue;
263
        }
264
        //v is the list of updated documents
265
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
266
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
267
        //d is the list of deleted documents
268
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
269
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
270
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
271
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
272
        // go though every element in updated document vector
273
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
274
        //handle deleted docs
275
        for(int k=0; k<deleteList.size(); k++)
276
        { //delete the deleted documents;
277
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
278
          String docId = (String)w.elementAt(0);
279
          try
280
          {
281
            handleDeleteSingleDocument(docId, server);
282
          }
283
          catch (Exception ee)
284
          {
285
            continue;
286
          }
287
        }//for delete docs
288
        
289
        // handle replicate doc in xml_revision
290
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
291
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
292
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
293
        DOCINSERTNUMBER = 1;
294
        DOCERRORNUMBER  = 1;
295
        REVINSERTNUMBER = 1;
296
        REVERRORNUMBER  = 1;
297
        
298
        // handle system metadata
299
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
300
        for(int k = 0; k < systemMetadataList.size(); k++) { 
301
        	Vector<String> w = systemMetadataList.elementAt(k);
302
        	String guid = (String) w.elementAt(0);
303
        	String remoteserver = (String) w.elementAt(1);
304
        	try {
305
        		handleSystemMetadata(remoteserver, guid);
306
        	}
307
        	catch (Exception ee) {
308
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
309
        		continue;
310
        	}
311
        }
312
        
313
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
314
    }//for response
315

    
316
    //updated last_checked
317
    for (int i=0;i<serverList.size(); i++)
318
    {
319
       // Get ReplicationServer object from server list
320
       replServer = serverList.serverAt(i);
321
       try
322
       {
323
         updateLastCheckTimeForSingleServer(replServer);
324
       }
325
       catch(Exception e)
326
       {
327
         continue;
328
       }
329
    }//for
330
    
331
    long replicationEndTime = System.currentTimeMillis();
332
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
333
    		(replicationEndTime - replicationStartTime));
334
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
335
    		timeToGetServerList);
336
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
337
    		totalServerListParseTime);
338
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
339
    		_xmlDocQueryCount);
340
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
341
    		_xmlDocQueryTime + " ms");
342
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
343
    		_xmlRevQueryCount);
344
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
345
    		_xmlRevQueryTime + " ms");;
346

    
347
  }//update
348

    
349
  /* Handle replicate single xml document*/
350
  private void handleSingleXMLDocument(String remoteserver, String actions,
351
                                       String accNumber, String tableName)
352
               throws HandlerException
353
  {
354
    DBConnection dbConn = null;
355
    int serialNumber = -1;
356
    try
357
    {
358
      // Get DBConnection from pool
359
      dbConn=DBConnectionPool.
360
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
361
      serialNumber=dbConn.getCheckOutSerialNumber();
362
      //if the document needs to be updated or inserted, this is executed
363
      String readDocURLString = "https://" + remoteserver + "?server="+
364
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
365
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
366
      URL u = new URL(readDocURLString);
367

    
368
      // Get docid content
369
      String newxmldoc = ReplicationService.getURLContent(u);
370
      // If couldn't get skip it
371
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
372
      {
373
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
374
      }
375
      //logReplication.info("xml documnet:");
376
      //logReplication.info(newxmldoc);
377

    
378
      // Try get the docid info from remote server
379
      DocInfoHandler dih = new DocInfoHandler();
380
      XMLReader docinfoParser = initParser(dih);
381
      String docInfoURLStr = "https://" + remoteserver +
382
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
383
                       "&action=getdocumentinfo&docid="+accNumber;
384
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
385
      URL docinfoUrl = new URL(docInfoURLStr);
386
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
387
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
388
      
389
      // strip out the system metadata portion
390
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
391
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
392
      
393
   	  // process system metadata if we have it
394
      if (systemMetadataXML != null) {
395
    	  SystemMetadata sysMeta = 
396
    		  TypeMarshaller.unmarshalTypeFromStream(
397
    				  SystemMetadata.class, 
398
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
399
    	  // need the guid-to-docid mapping
400
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
401
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
402
    	  }
403
      	  // save the system metadata
404
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
405
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
406
      }
407
   	  
408
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
409
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
410
      // Get home server of the docid
411
      String docHomeServer = docinfoHash.get("home_server");
412
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
413
     
414
      // dates
415
      String createdDateString = docinfoHash.get("date_created");
416
      String updatedDateString = docinfoHash.get("date_updated");
417
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
418
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
419
      
420
      //docid should include rev number too
421
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
422
                                              (String)docinfoHash.get("rev");*/
423
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
424
      String docType = docinfoHash.get("doctype");
425
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
426

    
427
      String parserBase = null;
428
      // this for eml2 and we need user eml2 parser
429
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
430
      {
431
         parserBase = DocumentImpl.EML200;
432
      }
433
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
434
      {
435
        parserBase = DocumentImpl.EML200;
436
      }
437
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
438
      {
439
        parserBase = DocumentImpl.EML210;
440
      }
441
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
442
      {
443
        parserBase = DocumentImpl.EML210;
444
      }
445
      // Write the document into local host
446
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
447
      String newDocid = wrapper.writeReplication(dbConn,
448
                              newxmldoc,
449
                              docinfoHash.get("public_access"),
450
                              null,  /* the dtd text */
451
                              actions,
452
                              accNumber,
453
                              null, //docinfoHash.get("user_owner"),                              
454
                              null, /* null for groups[] */
455
                              docHomeServer,
456
                              remoteserver, tableName, true,// true is for time replication 
457
                              createdDate,
458
                              updatedDate);
459
      
460
      //set the user information
461
      String user = (String) docinfoHash.get("user_owner");
462
      String updated = (String) docinfoHash.get("user_updated");
463
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
464
      
465
      //process extra access rules 
466
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
467
      if (xmlAccessDAOList != null) {
468
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
469
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
470
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
471
      			acfsf.insertPermissions(xmlAccessDAO);
472
      		}
473
          }
474
      }
475
      
476
      
477
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
478
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
479
      {
480
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
481
                                     " into "+tableName + " from " +
482
                                         remoteserver);
483
        DOCINSERTNUMBER++;
484
      }
485
      else
486
      {
487
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
488
                  " into "+tableName + " from " +
489
                      remoteserver);
490
          REVINSERTNUMBER++;
491
      }
492
      String ip = getIpFromURL(u);
493
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
494
      
495

    
496
    }//try
497
    catch(Exception e)
498
    {
499
        
500
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
501
        {
502
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
503
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
504
                                       " into "+tableName + " from " +
505
                                           remoteserver + " because "+e.getMessage());
506
          DOCERRORNUMBER++;
507
        }
508
        else
509
        {
510
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
511
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
512
                    " into "+tableName + " from " +
513
                        remoteserver +" because "+e.getMessage());
514
            REVERRORNUMBER++;
515
        }
516
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
517
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
518
                                      " into db because " + e.getMessage(), e);
519
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
520
    		  + "writing Replication: " +e.getMessage());
521
    }
522
    finally
523
    {
524
       //return DBConnection
525
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
526
    }//finally
527
    logMetacat.info("replication.create localId:" + accNumber);
528
  }
529

    
530

    
531

    
532
  /* Handle replicate single xml document*/
533
  private void handleSingleDataFile(String remoteserver, String actions,
534
                                    String accNumber, String tableName)
535
               throws HandlerException
536
  {
537
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
538
    DBConnection dbConn = null;
539
    int serialNumber = -1;
540
    try
541
    {
542
      // Get DBConnection from pool
543
      dbConn=DBConnectionPool.
544
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
545
      serialNumber=dbConn.getCheckOutSerialNumber();
546
      // Try get docid info from remote server
547
      DocInfoHandler dih = new DocInfoHandler();
548
      XMLReader docinfoParser = initParser(dih);
549
      String docInfoURLString = "https://" + remoteserver +
550
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
551
                  "&action=getdocumentinfo&docid="+accNumber;
552
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
553
      URL docinfoUrl = new URL(docInfoURLString);
554

    
555
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
556
      
557
      // strip out the system metadata portion
558
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
559
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
560
   	  
561
   	  // process system metadata
562
      if (systemMetadataXML != null) {
563
    	  SystemMetadata sysMeta = 
564
    		TypeMarshaller.unmarshalTypeFromStream(
565
    				  SystemMetadata.class, 
566
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
567
    	  // need the guid-to-docid mapping
568
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
569
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
570
    	  }
571
    	  // save the system metadata
572
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
573

    
574
      }
575
   	  
576
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
577
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
578
      
579
      // Get docid name (such as acl or dataset)
580
      String docName = docinfoHash.get("docname");
581
      // Get doc type (eml public id)
582
      String docType = docinfoHash.get("doctype");
583
      // Get docid home sever. it might be different to remoteserver
584
      // because of hub feature
585
      String docHomeServer = docinfoHash.get("home_server");
586
      String createdDateString = docinfoHash.get("date_created");
587
      String updatedDateString = docinfoHash.get("date_updated");
588
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
589
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
590
      //docid should include rev number too
591
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
592
                                              (String)docinfoHash.get("rev");*/
593

    
594
      String datafilePath = PropertyService.getProperty("application.datafilepath");
595
      // Get data file content
596
      String readDataURLString = "https://" + remoteserver + "?server="+
597
                                        MetacatUtil.getLocalReplicationServerName()+
598
                                            "&action=readdata&docid="+accNumber;
599
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
600
      URL u = new URL(readDataURLString);
601
      InputStream input = ReplicationService.getURLStream(u);
602
      //register data file into xml_documents table and wite data file
603
      //into file system
604
      if ( input != null)
605
      {
606
        DocumentImpl.writeDataFileInReplication(input,
607
                                                datafilePath,
608
                                                docName,docType,
609
                                                accNumber,
610
                                                null,
611
                                                docHomeServer,
612
                                                remoteserver,
613
                                                tableName,
614
                                                true, //true means timed replication
615
                                                createdDate,
616
                                                updatedDate);
617
                                         
618
        //set the user information
619
        String user = (String) docinfoHash.get("user_owner");
620
		String updated = (String) docinfoHash.get("user_updated");
621
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
622
        
623
        //process extra access rules
624
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
625
        if (xmlAccessDAOList != null) {
626
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
627
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
628
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
629
        			acfsf.insertPermissions(xmlAccessDAO);
630
        		}
631
            }
632
        }
633
        
634
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
635
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
636
                                    remote server);*/
637
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
638
        {
639
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
640
                                       " into "+tableName + " from " +
641
                                           remoteserver);
642
          DOCINSERTNUMBER++;
643
        }
644
        else
645
        {
646
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
647
                    " into "+tableName + " from " +
648
                        remoteserver);
649
            REVINSERTNUMBER++;
650
        }
651
        String ip = getIpFromURL(u);
652
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
653
        
654
      }//if
655
      else
656
      {
657
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
658
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
659
      }//else
660

    
661
    }//try
662
    catch(Exception e)
663
    {
664
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
665
                                      " because " +e.getMessage());*/
666
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
667
      {
668
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
669
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
670
                                     " into " + tableName + " from " +
671
                                         remoteserver + " because " + e.getMessage());
672
        DOCERRORNUMBER++;
673
      }
674
      else
675
      {
676
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
677
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
678
                  " into " + tableName + " from " +
679
                      remoteserver +" because "+ e.getMessage());
680
          REVERRORNUMBER++;
681
      }
682
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
683
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
684
                                      " because " + e.getMessage());
685
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
686
    		  + "writing Replication: " + e.getMessage());
687
    }
688
    finally
689
    {
690
       //return DBConnection
691
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
692
    }//finally
693
    logMetacat.info("replication.create localId:" + accNumber);
694
  }
695

    
696

    
697

    
698
  /* Handle delete single document*/
699
  private void handleDeleteSingleDocument(String docId, String notifyServer)
700
               throws HandlerException
701
  {
702
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
703
    DBConnection dbConn = null;
704
    int serialNumber = -1;
705
    try
706
    {
707
      // Get DBConnection from pool
708
      dbConn=DBConnectionPool.
709
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
710
      serialNumber=dbConn.getCheckOutSerialNumber();
711
      if(!alreadyDeleted(docId))
712
      {
713

    
714
         //because delete method docid should have rev number
715
         //so we just add one for it. This rev number is no sence.
716
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
717
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
718
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
719
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
720
         URL u = new URL("https://"+notifyServer);
721
         String ip = getIpFromURL(u);
722
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
723
      }
724

    
725
    }//try
726
    catch(McdbDocNotFoundException e)
727
    {
728
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
729
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
730
                                 " in db because because " + e.getMessage());
731
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
732
    		  + "when handling document: " + e.getMessage());
733
    }
734
    catch(InsufficientKarmaException e)
735
    {
736
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
737
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
738
                                 " in db because because " + e.getMessage());
739
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
740
    		  + "when handling document: " + e.getMessage());
741
    }
742
    catch(SQLException e)
743
    {
744
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
745
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
746
                                 " in db because because " + e.getMessage());
747
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
748
    		  + "when handling document: " + e.getMessage());
749
    }
750
    catch(Exception e)
751
    {
752
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
753
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
754
                                 " in db because because " + e.getMessage());
755
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
756
    		  + "when handling document: " + e.getMessage());
757
    }
758
    finally
759
    {
760
       //return DBConnection
761
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
762
    }//finally
763
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
764
  }
765

    
766
  /* Handle updateLastCheckTimForSingleServer*/
767
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
768
                                                  throws HandlerException
769
  {
770
    String server = repServer.getServerName();
771
    DBConnection dbConn = null;
772
    int serialNumber = -1;
773
    PreparedStatement pstmt = null;
774
    try
775
    {
776
      // Get DBConnection from pool
777
      dbConn=DBConnectionPool.
778
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
779
      serialNumber=dbConn.getCheckOutSerialNumber();
780

    
781
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
782
      // Get time from remote server
783
      URL dateurl = new URL("https://" + server + "?server="+
784
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
785
      String datexml = ReplicationService.getURLContent(dateurl);
786
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
787
      if (datexml != null && !datexml.equals("")) {
788
    	  
789
    	  // parse the ISO datetime
790
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
791
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
792
         
793
         StringBuffer sql = new StringBuffer();
794
         sql.append("update xml_replication set last_checked = ? ");
795
         sql.append(" where server like ? ");
796
         pstmt = dbConn.prepareStatement(sql.toString());
797
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
798
         pstmt.setString(2, server);
799
         
800
         pstmt.executeUpdate();
801
         dbConn.commit();
802
         pstmt.close();
803
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
804
                                      + server);
805
      }//if
806
      else
807
      {
808

    
809
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
810
                                  server + " in db because couldn't get time "
811
                                  );
812
         throw new Exception("Couldn't get time for server "+ server);
813
      }
814

    
815
    }//try
816
    catch(Exception e)
817
    {
818
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
819
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
820
                                server + " in db because because " + e.getMessage());
821
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
822
    		  + "Error updating last checked time: " + e.getMessage());
823
    }
824
    finally
825
    {
826
       //return DBConnection
827
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
828
    }//finally
829
  }
830
  
831
  	/**
832
	 * Handle replicate system metadata
833
	 * 
834
	 * @param remoteserver
835
	 * @param guid
836
	 * @throws HandlerException
837
	 */
838
	private void handleSystemMetadata(String remoteserver, String guid) 
839
		throws HandlerException {
840
		try {
841

    
842
			// Try get the system metadata from remote server
843
			String sysMetaURLStr = "https://" + remoteserver + "?server="
844
					+ MetacatUtil.getLocalReplicationServerName()
845
					+ "&action=getsystemmetadata&guid=" + guid;
846
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
847
			URL sysMetaUrl = new URL(sysMetaURLStr);
848
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
849
							+ sysMetaUrl.toString());
850
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
851

    
852
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
853

    
854
			// process system metadata
855
			if (systemMetadataXML != null) {
856
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
857
								new ByteArrayInputStream(systemMetadataXML
858
										.getBytes("UTF-8")));
859
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
860
			}
861

    
862
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
863
							+ guid);
864

    
865
			String ip = getIpFromURL(sysMetaUrl);
866
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
867

    
868
		} catch (Exception e) {
869
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
870
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
871
			logReplication
872
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
873
							+ guid + " into db because " + e.getMessage());
874
			throw new HandlerException(
875
					"ReplicationHandler.handleSystemMetadata - generic exception "
876
							+ "writing Replication: " + e.getMessage());
877
		}
878

    
879
	}
880

    
881
  /**
882
   * updates xml_catalog with entries from other servers.
883
   */
884
  private void updateCatalog()
885
  {
886
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
887
    // ReplicationServer object in server list
888
    ReplicationServer replServer = null;
889
    PreparedStatement pstmt = null;
890
    String server = null;
891

    
892

    
893
    // Go through each ReplicationServer object in sererlist
894
    for (int j=0; j<serverList.size(); j++)
895
    {
896
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
897
      Vector<String> publicId = new Vector<String>();
898
      try
899
      {
900
        // Get ReplicationServer object from server list
901
        replServer = serverList.serverAt(j);
902
        // Get server name from the ReplicationServer object
903
        server = replServer.getServerName();
904
        // Try to get catalog
905
        URL u = new URL("https://" + server + "?server="+
906
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
907
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
908
        String catxml = ReplicationService.getURLContent(u);
909

    
910
        // Make sure there are not error, no empty string
911
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
912
        {
913
          throw new Exception("Couldn't get catalog list form server " +server);
914
        }
915
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
916
        CatalogMessageHandler cmh = new CatalogMessageHandler();
917
        XMLReader catparser = initParser(cmh);
918
        catparser.parse(new InputSource(new StringReader(catxml)));
919
        //parse the returned catalog xml and put it into a vector
920
        remoteCatalog = cmh.getCatalogVect();
921

    
922
        // Make sure remoteCatalog is not empty
923
        if (remoteCatalog.isEmpty())
924
        {
925
          throw new Exception("Couldn't get catalog list form server " +server);
926
        }
927

    
928
        String localcatxml = ReplicationService.getCatalogXML();
929

    
930
        // Make sure local catalog is no empty
931
        if (localcatxml==null||localcatxml.equals(""))
932
        {
933
          throw new Exception("Couldn't get catalog list form server " +server);
934
        }
935

    
936
        cmh = new CatalogMessageHandler();
937
        catparser = initParser(cmh);
938
        catparser.parse(new InputSource(new StringReader(localcatxml)));
939
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
940

    
941
        //now we have the catalog from the remote server and this local server
942
        //we now need to compare the two and merge the differences.
943
        //the comparison is base on the public_id fields which is the 4th
944
        //entry in each row vector.
945
        publicId = new Vector<String>();
946
        for(int i=0; i<localCatalog.size(); i++)
947
        {
948
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
949
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
950
          publicId.add(new String((String)v.elementAt(3)));
951
        }
952
      }//try
953
      catch (Exception e)
954
      {
955
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
956
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
957
                                    server + " because " +e.getMessage());
958
      }//catch
959

    
960
      for(int i=0; i<remoteCatalog.size(); i++)
961
      {
962
         // DConnection
963
        DBConnection dbConn = null;
964
        // DBConnection checkout serial number
965
        int serialNumber = -1;
966
        try
967
        {
968
            dbConn=DBConnectionPool.
969
                  getDBConnection("ReplicationHandler.updateCatalog");
970
            serialNumber=dbConn.getCheckOutSerialNumber();
971
            Vector<String> v = remoteCatalog.elementAt(i);
972
            //logMetacat.debug("v2: " + v.toString());
973
            //logMetacat.debug("i: " + i);
974
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
975
            //logMetacat.debug("publicID: " + publicId.toString());
976
            logReplication.info
977
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
978
           if(!publicId.contains(v.elementAt(3)))
979
           { //so we don't have this public id in our local table so we need to
980
             //add it.
981
             //logMetacat.debug("in if");
982
             StringBuffer sql = new StringBuffer();
983
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
984
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
985
             sql.append("?,?)");
986
             //logMetacat.debug("sql: " + sql.toString());
987
             pstmt = dbConn.prepareStatement(sql.toString());
988
             pstmt.setString(1, (String)v.elementAt(0));
989
             pstmt.setString(2, (String)v.elementAt(1));
990
             pstmt.setString(3, (String)v.elementAt(2));
991
             pstmt.setString(4, (String)v.elementAt(3));
992
             pstmt.setString(5, (String)v.elementAt(4));
993
             pstmt.execute();
994
             pstmt.close();
995
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
996
                               (String)v.elementAt(3) + " from server"+server);
997
           }
998
        }
999
        catch(Exception e)
1000
        {
1001
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1002
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1003
                                    server + " because " +e.getMessage());
1004
        }//catch
1005
        finally
1006
        {
1007
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1008
        }//finally
1009
      }//for remote catalog
1010
    }//for server list
1011
    logReplication.info("End of updateCatalog");
1012
  }
1013

    
1014
  /**
1015
   * Method that returns true if docid has already been "deleted" from metacat.
1016
   * This method really implements a truth table for deleted documents
1017
   * The table is (a docid in one of the tables is represented by the X):
1018
   * xml_docs      xml_revs      deleted?
1019
   * ------------------------------------
1020
   *   X             X             FALSE
1021
   *   X             _             FALSE
1022
   *   _             X             TRUE
1023
   *   _             _             TRUE
1024
   */
1025
  private static boolean alreadyDeleted(String docid) throws HandlerException
1026
  {
1027
    DBConnection dbConn = null;
1028
    int serialNumber = -1;
1029
    PreparedStatement pstmt = null;
1030
    try
1031
    {
1032
      dbConn=DBConnectionPool.
1033
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1034
      serialNumber=dbConn.getCheckOutSerialNumber();
1035
      boolean xml_docs = false;
1036
      boolean xml_revs = false;
1037

    
1038
      StringBuffer sb = new StringBuffer();
1039
      sb.append("select docid from xml_revisions where docid like ? ");
1040
      pstmt = dbConn.prepareStatement(sb.toString());
1041
      pstmt.setString(1, docid);
1042
      pstmt.execute();
1043
      ResultSet rs = pstmt.getResultSet();
1044
      boolean tablehasrows = rs.next();
1045
      if(tablehasrows)
1046
      {
1047
        xml_revs = true;
1048
      }
1049

    
1050
      sb = new StringBuffer();
1051
      sb.append("select docid from xml_documents where docid like '");
1052
      sb.append(docid).append("'");
1053
      pstmt.close();
1054
      pstmt = dbConn.prepareStatement(sb.toString());
1055
      //increase usage count
1056
      dbConn.increaseUsageCount(1);
1057
      pstmt.execute();
1058
      rs = pstmt.getResultSet();
1059
      tablehasrows = rs.next();
1060
      pstmt.close();
1061
      if(tablehasrows)
1062
      {
1063
        xml_docs = true;
1064
      }
1065

    
1066
      if(xml_docs && xml_revs)
1067
      {
1068
        return false;
1069
      }
1070
      else if(xml_docs && !xml_revs)
1071
      {
1072
        return false;
1073
      }
1074
      else if(!xml_docs && xml_revs)
1075
      {
1076
        return true;
1077
      }
1078
      else if(!xml_docs && !xml_revs)
1079
      {
1080
        return true;
1081
      }
1082
    }
1083
    catch(Exception e)
1084
    {
1085
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1086
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1087
                          e.getMessage());
1088
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1089
    		  + e.getMessage());
1090
    }
1091
    finally
1092
    {
1093
      try
1094
      {
1095
        pstmt.close();
1096
      }//try
1097
      catch (SQLException ee)
1098
      {
1099
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1100
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1101
                          "to close pstmt: "+ee.getMessage());
1102
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1103
      		  + ee.getMessage());
1104
      }//catch
1105
      finally
1106
      {
1107
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1108
      }//finally
1109
    }//finally
1110
    return false;
1111
  }
1112

    
1113

    
1114
  /**
1115
   * Method to initialize the message parser
1116
   */
1117
  public static XMLReader initParser(DefaultHandler dh)
1118
          throws HandlerException
1119
  {
1120
    XMLReader parser = null;
1121

    
1122
    try {
1123
      ContentHandler chandler = dh;
1124

    
1125
      // Get an instance of the parser
1126
      String parserName = PropertyService.getProperty("xml.saxparser");
1127
      parser = XMLReaderFactory.createXMLReader(parserName);
1128

    
1129
      // Turn off validation
1130
      parser.setFeature("http://xml.org/sax/features/validation", false);
1131

    
1132
      parser.setContentHandler((ContentHandler)chandler);
1133
      parser.setErrorHandler((ErrorHandler)chandler);
1134

    
1135
    } catch (SAXException se) {
1136
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1137
    		  + " initializing parser: " + se.getMessage());
1138
    } catch (PropertyNotFoundException pnfe) {
1139
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1140
      		  + " getting parser name: " + pnfe.getMessage());
1141
    } 
1142

    
1143
    return parser;
1144
  }
1145

    
1146
  /**
1147
	 * This method will combine given time string(in short format) to current
1148
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1149
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1150
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1151
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1152
	 * 
1153
	 * @param givenTime
1154
	 *            the format should be "10:00 AM " or "2:00 PM"
1155
	 * @return
1156
	 * @throws Exception
1157
	 */
1158
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1159
  {
1160
	  try {
1161
     Date givenDate = parseTime(givenTime);
1162
     Date newDate = null;
1163
     Date now = new Date();
1164
     String currentTimeString = getTimeString(now);
1165
     Date currentTime = parseTime(currentTimeString); 
1166
     if ( currentTime.getTime() >= givenDate.getTime())
1167
     {
1168
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1169
        String dateAndTime = getDateString(now) + " " + givenTime;
1170
        Date combinationDate = parseDateTime(dateAndTime);
1171
        // new date should plus 24 hours to make is the second day
1172
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1173
     }
1174
     else
1175
     {
1176
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1177
         String dateAndTime = getDateString(now) + " " + givenTime;
1178
         newDate = parseDateTime(dateAndTime);
1179
     }
1180
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1181
     return newDate;
1182
	  } catch (ParseException pe) {
1183
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1184
				  + "parsing error: "  + pe.getMessage());
1185
	  }
1186
  }
1187

    
1188
  /*
1189
	 * parse a given string to Time in short format. For example, given time is
1190
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1191
	 */
1192
  private static Date parseTime(String timeString) throws ParseException
1193
  {
1194
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1195
    Date time = format.parse(timeString); 
1196
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1197
                              +time.toString());
1198
    return time;
1199

    
1200
  }
1201
  
1202
  /*
1203
   * Parse a given string to date and time. Date format is long and time
1204
   * format is short.
1205
   */
1206
  private static Date parseDateTime(String timeString) throws ParseException
1207
  {
1208
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1209
    Date time = format.parse(timeString);
1210
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1211
                             time.toString());
1212
    return time;
1213
  }
1214
  
1215
  /*
1216
   * Get a date string from a Date object. The date format will be long
1217
   */
1218
  private static String getDateString(Date now)
1219
  {
1220
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1221
     String s = df.format(now);
1222
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1223
     return s;
1224
  }
1225
  
1226
  /*
1227
   * Get a time string from a Date object, the time format will be short
1228
   */
1229
  private static String getTimeString(Date now)
1230
  {
1231
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1232
     String s = df.format(now);
1233
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1234
     return s;
1235
  }
1236
  
1237
  
1238
  /*
1239
	 * This method will go through the docid list both in xml_Documents table
1240
	 * and in xml_revisions table @author tao
1241
	 */
1242
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1243
		boolean dataFile = false;
1244
		for (int j = 0; j < docList.size(); j++) {
1245
			// initial dataFile is false
1246
			dataFile = false;
1247
			// w is information for one document, information contain
1248
			// docid, rev, server or datafile.
1249
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1250
			// Check if the vector w contain "datafile"
1251
			// If it has, this document is data file
1252
			try {
1253
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1254
					dataFile = true;
1255
				}
1256
			} catch (PropertyNotFoundException pnfe) {
1257
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1258
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1259
						+ "Leaving as false: " + pnfe.getMessage());
1260
			}
1261
			// logMetacat.debug("w: " + w.toString());
1262
			// Get docid
1263
			String docid = (String) w.elementAt(0);
1264
			logReplication.info("docid: " + docid);
1265
			// Get revision number
1266
			int rev = Integer.parseInt((String) w.elementAt(1));
1267
			logReplication.info("rev: " + rev);
1268
			// Get remote server name (it is may not be doc home server because
1269
			// the new hub feature
1270
			String remoteServer = (String) w.elementAt(2);
1271
			remoteServer = remoteServer.trim();
1272

    
1273
			try {
1274
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1275
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1276
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1277
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1278
				} else {
1279
					continue;
1280
				}
1281

    
1282
			} catch (Exception e) {
1283
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1284
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1285
						+ " in time replication" + e.getMessage(), e);
1286
				continue;
1287
			}
1288
			
1289
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1290
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1291
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1292
	        }
1293
	        
1294
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1295
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1296
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1297
	        }
1298

    
1299
		}// for update docs
1300

    
1301
	}
1302
   
1303
   /*
1304
	 * This method will handle doc in xml_documents table.
1305
	 */
1306
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1307
                                        throws HandlerException
1308
   {
1309
       // compare the update rev and local rev to see what need happen
1310
       int localrev = -1;
1311
       String action = null;
1312
       boolean flag = false;
1313
       try
1314
       {
1315
    	 long docQueryStartTime = System.currentTimeMillis();
1316
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1317
         long docQueryEndTime = System.currentTimeMillis();
1318
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1319
         _xmlDocQueryCount++;
1320
       }
1321
       catch (SQLException e)
1322
       {
1323
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1324
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1325
                                " be found because " + e.getMessage());
1326
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1327
                 "written because error happend to find it's local revision");
1328
         DOCERRORNUMBER++;
1329
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1330
                 " be found: " + e.getMessage());
1331
       }
1332
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1333
                               localrev);
1334

    
1335
       //check the revs for an update because this document is in the
1336
       //local DB, it might be out of date.
1337
       if (localrev == -1)
1338
       {
1339
          // check if the revision is in the revision table
1340
    	   Vector<Integer> localRevVector = null;
1341
    	 try {
1342
        	 long revQueryStartTime = System.currentTimeMillis();
1343
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1344
             long revQueryEndTime = System.currentTimeMillis();
1345
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1346
             _xmlRevQueryCount++;
1347
    	 } catch (SQLException sqle) {
1348
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1349
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1350
    	 }
1351
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1352
         {
1353
             // this version was deleted, so don't need replicate
1354
             flag = false;
1355
         }
1356
         else
1357
         {
1358
           //insert this document as new because it is not in the local DB
1359
           action = "INSERT";
1360
           flag = true;
1361
         }
1362
       }
1363
       else
1364
       {
1365
         if(localrev == rev)
1366
         {
1367
           // Local meatacat has the same rev to remote host, don't need
1368
           // update and flag set false
1369
           flag = false;
1370
         }
1371
         else if(localrev < rev)
1372
         {
1373
           //this document needs to be updated so send an read request
1374
           action = "UPDATE";
1375
           flag = true;
1376
         }
1377
       }
1378
       
1379
       String accNumber = null;
1380
       try {
1381
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1382
       } catch (PropertyNotFoundException pnfe) {
1383
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1384
    			   + "account number separator : " + pnfe.getMessage());
1385
       }
1386
       // this is non-data file
1387
       if(flag && !dataFile)
1388
       {
1389
         try
1390
         {
1391
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1392
         }
1393
         catch(HandlerException he)
1394
         {
1395
           // skip this document
1396
           throw he;
1397
         }
1398
       }//if for non-data file
1399

    
1400
        // this is for data file
1401
       if(flag && dataFile)
1402
       {
1403
         try
1404
         {
1405
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1406
         }
1407
         catch(HandlerException he)
1408
         {
1409
           // skip this data file
1410
           throw he;
1411
         }
1412

    
1413
       }//for data file
1414
   }
1415
   
1416
   /*
1417
    * This method will handle doc in xml_documents table.
1418
    */
1419
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1420
                                        throws HandlerException
1421
   {
1422
       // compare the update rev and local rev to see what need happen
1423
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1424
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1425
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1426
       Vector<Integer> localrev = null;
1427
       String action = "INSERT";
1428
       boolean flag = false;
1429
       try
1430
       {
1431
      	 long revQueryStartTime = System.currentTimeMillis();
1432
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1433
         long revQueryEndTime = System.currentTimeMillis();
1434
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1435
         _xmlRevQueryCount++;
1436
       }
1437
       catch (SQLException sqle)
1438
       {
1439
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1440
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1441
                                " be found because " + sqle.getMessage());
1442
         REVERRORNUMBER++;
1443
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1444
        		 + sqle.getMessage());
1445
       }
1446
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1447
                               localrev.toString());
1448
       
1449
       // if the rev is not in the xml_revision, we need insert it
1450
       if (!localrev.contains(new Integer(rev)))
1451
       {
1452
           flag = true;    
1453
       }
1454
     
1455
       String accNumber = null;
1456
       try {
1457
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1458
       } catch (PropertyNotFoundException pnfe) {
1459
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1460
    			   + "account number separator : " + pnfe.getMessage());
1461
       }
1462
       // this is non-data file
1463
       if(flag && !dataFile)
1464
       {
1465
         try
1466
         {
1467
           
1468
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1469
         }
1470
         catch(HandlerException he)
1471
         {
1472
           // skip this document
1473
           throw he;
1474
         }
1475
       }//if for non-data file
1476

    
1477
        // this is for data file
1478
       if(flag && dataFile)
1479
       {
1480
         try
1481
         {
1482
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1483
         }
1484
         catch(HandlerException he)
1485
         {
1486
           // skip this data file
1487
           throw he;
1488
         }
1489

    
1490
       }//for data file
1491
   }
1492
   
1493
   /*
1494
    * Return a ip address for given url
1495
    */
1496
   private String getIpFromURL(URL url)
1497
   {
1498
	   String ip = null;
1499
	   try
1500
	   {
1501
	      InetAddress address = InetAddress.getByName(url.getHost());
1502
	      ip = address.getHostAddress();
1503
	   }
1504
	   catch(UnknownHostException e)
1505
	   {
1506
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1507
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1508
                   +e.getMessage());
1509
	   }
1510

    
1511
	   return ip;
1512
   }
1513
  
1514
}
1515

    
(3-3/7)