Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-08-17 12:42:28 -0700 (Fri, 17 Aug 2012) $'
10
 * '$Revision: 7356 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.TimerTask;
44
import java.util.Vector;
45

    
46

    
47
import org.apache.log4j.Logger;
48
import org.dataone.service.types.v1.SystemMetadata;
49
import org.dataone.service.util.DateTimeMarshaller;
50
import org.dataone.service.util.TypeMarshaller;
51
import org.xml.sax.ContentHandler;
52
import org.xml.sax.ErrorHandler;
53
import org.xml.sax.InputSource;
54
import org.xml.sax.SAXException;
55
import org.xml.sax.XMLReader;
56
import org.xml.sax.helpers.DefaultHandler;
57
import org.xml.sax.helpers.XMLReaderFactory;
58

    
59
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
60
import edu.ucsb.nceas.metacat.DBUtil;
61
import edu.ucsb.nceas.metacat.DocInfoHandler;
62
import edu.ucsb.nceas.metacat.DocumentImpl;
63
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
64
import edu.ucsb.nceas.metacat.EventLog;
65
import edu.ucsb.nceas.metacat.IdentifierManager;
66
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
67
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
68
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
69
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
70
import edu.ucsb.nceas.metacat.database.DBConnection;
71
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
72
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
73
import edu.ucsb.nceas.metacat.properties.PropertyService;
74
import edu.ucsb.nceas.metacat.shared.HandlerException;
75
import edu.ucsb.nceas.metacat.util.MetacatUtil;
76
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
77
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
78

    
79

    
80

    
81
/**
82
 * This class handles deltaT replication checking.  Whenever this TimerTask
83
 * is fired it checks each server in xml_replication for updates and updates
84
 * the local db as needed.
85
 */
86
public class ReplicationHandler extends TimerTask
87
{
88
  int serverCheckCode = 1;
89
  ReplicationServerList serverList = null;
90
  //PrintWriter out;
91
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
92
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
93
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
94
  
95
  private static int DOCINSERTNUMBER = 1;
96
  private static int DOCERRORNUMBER  = 1;
97
  private static int REVINSERTNUMBER = 1;
98
  private static int REVERRORNUMBER  = 1;
99
  
100
  private static int _xmlDocQueryCount = 0;
101
  private static int _xmlRevQueryCount = 0;
102
  private static long _xmlDocQueryTime = 0;
103
  private static long _xmlRevQueryTime = 0;
104
  
105
  
106
  public ReplicationHandler()
107
  {
108
    //this.out = o;
109
    serverList = new ReplicationServerList();
110
  }
111

    
112
  public ReplicationHandler(int serverCheckCode)
113
  {
114
    //this.out = o;
115
    this.serverCheckCode = serverCheckCode;
116
    serverList = new ReplicationServerList();
117
  }
118

    
119
  /**
120
   * Method that implements TimerTask.run().  It runs whenever the timer is
121
   * fired.
122
   */
123
  public void run()
124
  {
125
    //find out the last_checked time of each server in the server list and
126
    //send a query to each server to see if there are any documents in
127
    //xml_documents with an update_date > last_checked
128
	  
129
      //if serverList is null, metacat don't need to replication
130
      if (serverList==null||serverList.isEmpty())
131
      {
132
        return;
133
      }
134
      updateCatalog();
135
      update();
136
      //conn.close();
137
  }
138

    
139
  /**
140
   * Method that uses revision tagging for replication instead of update_date.
141
   */
142
  private void update()
143
  {
144
	  
145
	  _xmlDocQueryCount = 0;
146
	  _xmlRevQueryCount = 0;
147
	  _xmlDocQueryTime = 0;
148
	  _xmlRevQueryTime = 0;
149
    /*
150
     Pseudo-algorithm
151
     - request a doc list from each server in xml_replication
152
     - check the rev number of each of those documents agains the
153
       documents in the local database
154
     - pull any documents that have a lesser rev number on the local server
155
       from the remote server
156
     - delete any documents that still exist in the local xml_documents but
157
       are in the deletedDocuments tag of the remote host response.
158
     - update last_checked to keep track of the last time it was checked.
159
       (this info is theoretically not needed using this system but probably
160
       should be kept anyway)
161
    */
162

    
163
    ReplicationServer replServer = null; // Variable to store the
164
                                        // ReplicationServer got from
165
                                        // Server list
166
    String server = null; // Variable to store server name
167
//    String update;
168
    Vector<InputStream> responses = new Vector<InputStream>();
169
    URL u;
170
    long replicationStartTime = System.currentTimeMillis();
171
    long timeToGetServerList = 0;
172
    
173
    //Check for every server in server list to get updated list and put
174
    // them in to response
175
    long startTimeToGetServers = System.currentTimeMillis();
176
    for (int i=0; i<serverList.size(); i++)
177
    {
178
        // Get ReplicationServer object from server list
179
        replServer = serverList.serverAt(i);
180
        // Get server name from ReplicationServer object
181
        server = replServer.getServerName().trim();
182
        InputStream result = null;
183
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
184
        // Send command to that server to get updated docid information
185
        try
186
        {
187
          u = new URL("https://" + server + "?server="
188
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
189
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
190
          result = ReplicationService.getURLStream(u);
191
        }
192
        catch (Exception e)
193
        {
194
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
195
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
196
                       "for server " + server + " because "+e.getMessage());
197
          continue;
198
        }
199

    
200
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
201
        //check if result have error or not, if has skip it.
202
        // TODO: check for error in stream
203
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
204
        if (result == null) {
205
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
206
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
207
                       "for server " + server + " because "+result);
208
          continue;
209
        }
210
        //Add result to vector
211
        responses.add(result);
212
    }
213
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
214

    
215
    //make sure that there is updated file list
216
    //If response is null, metacat don't need do anything
217
    if (responses==null || responses.isEmpty())
218
    {
219
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
220
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
221
                           "every server and failed to replicate");
222
        return;
223
    }
224

    
225

    
226
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
227
    //               "document information: "+ responses.toString());
228
    
229
    long totalServerListParseTime = 0;
230
    // go through response vector(it contains updated vector and delete vector
231
    for(int i=0; i<responses.size(); i++)
232
    {
233
    	long startServerListParseTime = System.currentTimeMillis();
234
    	XMLReader parser;
235
    	ReplMessageHandler message = new ReplMessageHandler();
236
    	try
237
        {
238
          parser = initParser(message);
239
        }
240
        catch (Exception e)
241
        {
242
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
243
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
244
                                " initParser for message and " +e.getMessage());
245
           // stop replication
246
           return;
247
        }
248
    	
249
        try
250
        {
251
          parser.parse(new InputSource(responses.elementAt(i)));
252
        }
253
        catch(Exception e)
254
        {
255
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
256
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
257
                                   "because "+ e.getMessage());
258
          continue;
259
        }
260
        //v is the list of updated documents
261
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
262
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
263
        //d is the list of deleted documents
264
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
265
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
266
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
267
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
268
        // go though every element in updated document vector
269
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
270
        //handle deleted docs
271
        for(int k=0; k<deleteList.size(); k++)
272
        { //delete the deleted documents;
273
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
274
          String docId = (String)w.elementAt(0);
275
          try
276
          {
277
            handleDeleteSingleDocument(docId, server);
278
          }
279
          catch (Exception ee)
280
          {
281
            continue;
282
          }
283
        }//for delete docs
284
        
285
        // handle replicate doc in xml_revision
286
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
287
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
288
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
289
        DOCINSERTNUMBER = 1;
290
        DOCERRORNUMBER  = 1;
291
        REVINSERTNUMBER = 1;
292
        REVERRORNUMBER  = 1;
293
        
294
        // handle system metadata
295
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
296
        for(int k = 0; k < systemMetadataList.size(); k++) { 
297
        	Vector<String> w = systemMetadataList.elementAt(k);
298
        	String guid = (String) w.elementAt(0);
299
        	String remoteserver = (String) w.elementAt(1);
300
        	try {
301
        		handleSystemMetadata(remoteserver, guid);
302
        	}
303
        	catch (Exception ee) {
304
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
305
        		continue;
306
        	}
307
        }
308
        
309
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
310
    }//for response
311

    
312
    //updated last_checked
313
    for (int i=0;i<serverList.size(); i++)
314
    {
315
       // Get ReplicationServer object from server list
316
       replServer = serverList.serverAt(i);
317
       try
318
       {
319
         updateLastCheckTimeForSingleServer(replServer);
320
       }
321
       catch(Exception e)
322
       {
323
         continue;
324
       }
325
    }//for
326
    
327
    long replicationEndTime = System.currentTimeMillis();
328
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
329
    		(replicationEndTime - replicationStartTime));
330
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
331
    		timeToGetServerList);
332
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
333
    		totalServerListParseTime);
334
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
335
    		_xmlDocQueryCount);
336
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
337
    		_xmlDocQueryTime + " ms");
338
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
339
    		_xmlRevQueryCount);
340
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
341
    		_xmlRevQueryTime + " ms");;
342

    
343
  }//update
344

    
345
  /* Handle replicate single xml document*/
346
  private void handleSingleXMLDocument(String remoteserver, String actions,
347
                                       String accNumber, String tableName)
348
               throws HandlerException
349
  {
350
    DBConnection dbConn = null;
351
    int serialNumber = -1;
352
    try
353
    {
354
      // Get DBConnection from pool
355
      dbConn=DBConnectionPool.
356
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
357
      serialNumber=dbConn.getCheckOutSerialNumber();
358
      //if the document needs to be updated or inserted, this is executed
359
      String readDocURLString = "https://" + remoteserver + "?server="+
360
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
361
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
362
      URL u = new URL(readDocURLString);
363

    
364
      // Get docid content
365
      String newxmldoc = ReplicationService.getURLContent(u);
366
      // If couldn't get skip it
367
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
368
      {
369
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
370
      }
371
      //logReplication.info("xml documnet:");
372
      //logReplication.info(newxmldoc);
373

    
374
      // Try get the docid info from remote server
375
      DocInfoHandler dih = new DocInfoHandler();
376
      XMLReader docinfoParser = initParser(dih);
377
      String docInfoURLStr = "https://" + remoteserver +
378
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
379
                       "&action=getdocumentinfo&docid="+accNumber;
380
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
381
      URL docinfoUrl = new URL(docInfoURLStr);
382
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
383
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
384
      
385
      // strip out the system metadata portion
386
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
387
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
388
      
389
   	  // process system metadata if we have it
390
      if (systemMetadataXML != null) {
391
    	  SystemMetadata sysMeta = 
392
    		  TypeMarshaller.unmarshalTypeFromStream(
393
    				  SystemMetadata.class, 
394
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
395
    	  // need the guid-to-docid mapping
396
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
397
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
398
    	  }
399
      	  // save the system metadata
400
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
401
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
402
      }
403
   	  
404
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
405
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
406
      // Get home server of the docid
407
      String docHomeServer = docinfoHash.get("home_server");
408
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
409
     
410
      // dates
411
      String createdDateString = docinfoHash.get("date_created");
412
      String updatedDateString = docinfoHash.get("date_updated");
413
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
414
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
415
      
416
      //docid should include rev number too
417
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
418
                                              (String)docinfoHash.get("rev");*/
419
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
420
      String docType = docinfoHash.get("doctype");
421
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
422

    
423
      String parserBase = null;
424
      // this for eml2 and we need user eml2 parser
425
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
426
      {
427
         parserBase = DocumentImpl.EML200;
428
      }
429
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
430
      {
431
        parserBase = DocumentImpl.EML200;
432
      }
433
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
434
      {
435
        parserBase = DocumentImpl.EML210;
436
      }
437
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
438
      {
439
        parserBase = DocumentImpl.EML210;
440
      }
441
      // Write the document into local host
442
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
443
      String newDocid = wrapper.writeReplication(dbConn,
444
                              newxmldoc,
445
                              docinfoHash.get("public_access"),
446
                              null,  /* the dtd text */
447
                              actions,
448
                              accNumber,
449
                              null, //docinfoHash.get("user_owner"),                              
450
                              null, /* null for groups[] */
451
                              docHomeServer,
452
                              remoteserver, tableName, true,// true is for time replication 
453
                              createdDate,
454
                              updatedDate);
455
      
456
      //set the user information
457
      String user = (String) docinfoHash.get("user_owner");
458
      String updated = (String) docinfoHash.get("user_updated");
459
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
460
      
461
      //process extra access rules 
462
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
463
      if (xmlAccessDAOList != null) {
464
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
465
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
466
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
467
      			acfsf.insertPermissions(xmlAccessDAO);
468
      		}
469
          }
470
      }
471
      
472
      
473
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
474
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
475
      {
476
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
477
                                     " into "+tableName + " from " +
478
                                         remoteserver);
479
        DOCINSERTNUMBER++;
480
      }
481
      else
482
      {
483
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
484
                  " into "+tableName + " from " +
485
                      remoteserver);
486
          REVINSERTNUMBER++;
487
      }
488
      String ip = getIpFromURL(u);
489
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
490
      
491

    
492
    }//try
493
    catch(Exception e)
494
    {
495
        
496
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
497
        {
498
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
499
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
500
                                       " into "+tableName + " from " +
501
                                           remoteserver + " because "+e.getMessage());
502
          DOCERRORNUMBER++;
503
        }
504
        else
505
        {
506
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
507
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
508
                    " into "+tableName + " from " +
509
                        remoteserver +" because "+e.getMessage());
510
            REVERRORNUMBER++;
511
        }
512
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
513
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
514
                                      " into db because " + e.getMessage(), e);
515
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
516
    		  + "writing Replication: " +e.getMessage());
517
    }
518
    finally
519
    {
520
       //return DBConnection
521
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
522
    }//finally
523
    logMetacat.info("replication.create localId:" + accNumber);
524
  }
525

    
526

    
527

    
528
  /* Handle replicate single xml document*/
529
  private void handleSingleDataFile(String remoteserver, String actions,
530
                                    String accNumber, String tableName)
531
               throws HandlerException
532
  {
533
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
534
    DBConnection dbConn = null;
535
    int serialNumber = -1;
536
    try
537
    {
538
      // Get DBConnection from pool
539
      dbConn=DBConnectionPool.
540
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
541
      serialNumber=dbConn.getCheckOutSerialNumber();
542
      // Try get docid info from remote server
543
      DocInfoHandler dih = new DocInfoHandler();
544
      XMLReader docinfoParser = initParser(dih);
545
      String docInfoURLString = "https://" + remoteserver +
546
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
547
                  "&action=getdocumentinfo&docid="+accNumber;
548
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
549
      URL docinfoUrl = new URL(docInfoURLString);
550

    
551
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
552
      
553
      // strip out the system metadata portion
554
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
555
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
556
   	  
557
   	  // process system metadata
558
      if (systemMetadataXML != null) {
559
    	  SystemMetadata sysMeta = 
560
    		TypeMarshaller.unmarshalTypeFromStream(
561
    				  SystemMetadata.class, 
562
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
563
    	  // need the guid-to-docid mapping
564
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
565
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
566
    	  }
567
    	  // save the system metadata
568
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
569

    
570
      }
571
   	  
572
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
573
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
574
      
575
      // Get docid name (such as acl or dataset)
576
      String docName = docinfoHash.get("docname");
577
      // Get doc type (eml public id)
578
      String docType = docinfoHash.get("doctype");
579
      // Get docid home sever. it might be different to remoteserver
580
      // because of hub feature
581
      String docHomeServer = docinfoHash.get("home_server");
582
      String createdDateString = docinfoHash.get("date_created");
583
      String updatedDateString = docinfoHash.get("date_updated");
584
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
585
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
586
      //docid should include rev number too
587
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
588
                                              (String)docinfoHash.get("rev");*/
589

    
590
      String datafilePath = PropertyService.getProperty("application.datafilepath");
591
      // Get data file content
592
      String readDataURLString = "https://" + remoteserver + "?server="+
593
                                        MetacatUtil.getLocalReplicationServerName()+
594
                                            "&action=readdata&docid="+accNumber;
595
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
596
      URL u = new URL(readDataURLString);
597
      InputStream input = ReplicationService.getURLStream(u);
598
      //register data file into xml_documents table and wite data file
599
      //into file system
600
      if ( input != null)
601
      {
602
        DocumentImpl.writeDataFileInReplication(input,
603
                                                datafilePath,
604
                                                docName,docType,
605
                                                accNumber,
606
                                                null,
607
                                                docHomeServer,
608
                                                remoteserver,
609
                                                tableName,
610
                                                true, //true means timed replication
611
                                                createdDate,
612
                                                updatedDate);
613
                                         
614
        //set the user information
615
        String user = (String) docinfoHash.get("user_owner");
616
		String updated = (String) docinfoHash.get("user_updated");
617
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
618
        
619
        //process extra access rules
620
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
621
        if (xmlAccessDAOList != null) {
622
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
623
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
624
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
625
        			acfsf.insertPermissions(xmlAccessDAO);
626
        		}
627
            }
628
        }
629
        
630
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
631
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
632
                                    remote server);*/
633
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
634
        {
635
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
636
                                       " into "+tableName + " from " +
637
                                           remoteserver);
638
          DOCINSERTNUMBER++;
639
        }
640
        else
641
        {
642
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
643
                    " into "+tableName + " from " +
644
                        remoteserver);
645
            REVINSERTNUMBER++;
646
        }
647
        String ip = getIpFromURL(u);
648
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
649
        
650
      }//if
651
      else
652
      {
653
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
654
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
655
      }//else
656

    
657
    }//try
658
    catch(Exception e)
659
    {
660
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
661
                                      " because " +e.getMessage());*/
662
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
663
      {
664
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
665
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
666
                                     " into " + tableName + " from " +
667
                                         remoteserver + " because " + e.getMessage());
668
        DOCERRORNUMBER++;
669
      }
670
      else
671
      {
672
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
673
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
674
                  " into " + tableName + " from " +
675
                      remoteserver +" because "+ e.getMessage());
676
          REVERRORNUMBER++;
677
      }
678
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
679
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
680
                                      " because " + e.getMessage());
681
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
682
    		  + "writing Replication: " + e.getMessage());
683
    }
684
    finally
685
    {
686
       //return DBConnection
687
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
688
    }//finally
689
    logMetacat.info("replication.create localId:" + accNumber);
690
  }
691

    
692

    
693

    
694
  /* Handle delete single document*/
695
  private void handleDeleteSingleDocument(String docId, String notifyServer)
696
               throws HandlerException
697
  {
698
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
699
    DBConnection dbConn = null;
700
    int serialNumber = -1;
701
    try
702
    {
703
      // Get DBConnection from pool
704
      dbConn=DBConnectionPool.
705
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
706
      serialNumber=dbConn.getCheckOutSerialNumber();
707
      if(!alreadyDeleted(docId))
708
      {
709

    
710
         //because delete method docid should have rev number
711
         //so we just add one for it. This rev number is no sence.
712
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
713
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
714
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
715
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
716
         URL u = new URL("https://"+notifyServer);
717
         String ip = getIpFromURL(u);
718
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
719
      }
720

    
721
    }//try
722
    catch(McdbDocNotFoundException e)
723
    {
724
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
725
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
726
                                 " in db because because " + e.getMessage());
727
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
728
    		  + "when handling document: " + e.getMessage());
729
    }
730
    catch(InsufficientKarmaException e)
731
    {
732
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
733
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
734
                                 " in db because because " + e.getMessage());
735
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
736
    		  + "when handling document: " + e.getMessage());
737
    }
738
    catch(SQLException e)
739
    {
740
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
741
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
742
                                 " in db because because " + e.getMessage());
743
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
744
    		  + "when handling document: " + e.getMessage());
745
    }
746
    catch(Exception e)
747
    {
748
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
749
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
750
                                 " in db because because " + e.getMessage());
751
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
752
    		  + "when handling document: " + e.getMessage());
753
    }
754
    finally
755
    {
756
       //return DBConnection
757
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
758
    }//finally
759
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
760
  }
761

    
762
  /* Handle updateLastCheckTimForSingleServer*/
763
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
764
                                                  throws HandlerException
765
  {
766
    String server = repServer.getServerName();
767
    DBConnection dbConn = null;
768
    int serialNumber = -1;
769
    PreparedStatement pstmt = null;
770
    try
771
    {
772
      // Get DBConnection from pool
773
      dbConn=DBConnectionPool.
774
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
775
      serialNumber=dbConn.getCheckOutSerialNumber();
776

    
777
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
778
      // Get time from remote server
779
      URL dateurl = new URL("https://" + server + "?server="+
780
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
781
      String datexml = ReplicationService.getURLContent(dateurl);
782
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
783
      if (datexml != null && !datexml.equals("")) {
784
    	  
785
    	  // parse the ISO datetime
786
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
787
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
788
         
789
         StringBuffer sql = new StringBuffer();
790
         sql.append("update xml_replication set last_checked = ? ");
791
         sql.append(" where server like ? ");
792
         pstmt = dbConn.prepareStatement(sql.toString());
793
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
794
         pstmt.setString(2, server);
795
         
796
         pstmt.executeUpdate();
797
         dbConn.commit();
798
         pstmt.close();
799
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
800
                                      + server);
801
      }//if
802
      else
803
      {
804

    
805
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
806
                                  server + " in db because couldn't get time "
807
                                  );
808
         throw new Exception("Couldn't get time for server "+ server);
809
      }
810

    
811
    }//try
812
    catch(Exception e)
813
    {
814
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
815
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
816
                                server + " in db because because " + e.getMessage());
817
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
818
    		  + "Error updating last checked time: " + e.getMessage());
819
    }
820
    finally
821
    {
822
       //return DBConnection
823
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
824
    }//finally
825
  }
826
  
827
  	/**
828
	 * Handle replicate system metadata
829
	 * 
830
	 * @param remoteserver
831
	 * @param guid
832
	 * @throws HandlerException
833
	 */
834
	private void handleSystemMetadata(String remoteserver, String guid) 
835
		throws HandlerException {
836
		try {
837

    
838
			// Try get the system metadata from remote server
839
			String sysMetaURLStr = "https://" + remoteserver + "?server="
840
					+ MetacatUtil.getLocalReplicationServerName()
841
					+ "&action=getsystemmetadata&guid=" + guid;
842
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
843
			URL sysMetaUrl = new URL(sysMetaURLStr);
844
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
845
							+ sysMetaUrl.toString());
846
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
847

    
848
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
849

    
850
			// process system metadata
851
			if (systemMetadataXML != null) {
852
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
853
								new ByteArrayInputStream(systemMetadataXML
854
										.getBytes("UTF-8")));
855
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
856
			}
857

    
858
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
859
							+ guid);
860

    
861
			String ip = getIpFromURL(sysMetaUrl);
862
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
863

    
864
		} catch (Exception e) {
865
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
866
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
867
			logReplication
868
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
869
							+ guid + " into db because " + e.getMessage());
870
			throw new HandlerException(
871
					"ReplicationHandler.handleSystemMetadata - generic exception "
872
							+ "writing Replication: " + e.getMessage());
873
		}
874

    
875
	}
876

    
877
  /**
878
   * updates xml_catalog with entries from other servers.
879
   */
880
  private void updateCatalog()
881
  {
882
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
883
    // ReplicationServer object in server list
884
    ReplicationServer replServer = null;
885
    PreparedStatement pstmt = null;
886
    String server = null;
887

    
888

    
889
    // Go through each ReplicationServer object in sererlist
890
    for (int j=0; j<serverList.size(); j++)
891
    {
892
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
893
      Vector<String> publicId = new Vector<String>();
894
      try
895
      {
896
        // Get ReplicationServer object from server list
897
        replServer = serverList.serverAt(j);
898
        // Get server name from the ReplicationServer object
899
        server = replServer.getServerName();
900
        // Try to get catalog
901
        URL u = new URL("https://" + server + "?server="+
902
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
903
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
904
        String catxml = ReplicationService.getURLContent(u);
905

    
906
        // Make sure there are not error, no empty string
907
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
908
        {
909
          throw new Exception("Couldn't get catalog list form server " +server);
910
        }
911
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
912
        CatalogMessageHandler cmh = new CatalogMessageHandler();
913
        XMLReader catparser = initParser(cmh);
914
        catparser.parse(new InputSource(new StringReader(catxml)));
915
        //parse the returned catalog xml and put it into a vector
916
        remoteCatalog = cmh.getCatalogVect();
917

    
918
        // Make sure remoteCatalog is not empty
919
        if (remoteCatalog.isEmpty())
920
        {
921
          throw new Exception("Couldn't get catalog list form server " +server);
922
        }
923

    
924
        String localcatxml = ReplicationService.getCatalogXML();
925

    
926
        // Make sure local catalog is no empty
927
        if (localcatxml==null||localcatxml.equals(""))
928
        {
929
          throw new Exception("Couldn't get catalog list form server " +server);
930
        }
931

    
932
        cmh = new CatalogMessageHandler();
933
        catparser = initParser(cmh);
934
        catparser.parse(new InputSource(new StringReader(localcatxml)));
935
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
936

    
937
        //now we have the catalog from the remote server and this local server
938
        //we now need to compare the two and merge the differences.
939
        //the comparison is base on the public_id fields which is the 4th
940
        //entry in each row vector.
941
        publicId = new Vector<String>();
942
        for(int i=0; i<localCatalog.size(); i++)
943
        {
944
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
945
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
946
          publicId.add(new String((String)v.elementAt(3)));
947
        }
948
      }//try
949
      catch (Exception e)
950
      {
951
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
952
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
953
                                    server + " because " +e.getMessage());
954
      }//catch
955

    
956
      for(int i=0; i<remoteCatalog.size(); i++)
957
      {
958
         // DConnection
959
        DBConnection dbConn = null;
960
        // DBConnection checkout serial number
961
        int serialNumber = -1;
962
        try
963
        {
964
            dbConn=DBConnectionPool.
965
                  getDBConnection("ReplicationHandler.updateCatalog");
966
            serialNumber=dbConn.getCheckOutSerialNumber();
967
            Vector<String> v = remoteCatalog.elementAt(i);
968
            //logMetacat.debug("v2: " + v.toString());
969
            //logMetacat.debug("i: " + i);
970
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
971
            //logMetacat.debug("publicID: " + publicId.toString());
972
            logReplication.info
973
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
974
           if(!publicId.contains(v.elementAt(3)))
975
           { //so we don't have this public id in our local table so we need to
976
             //add it.
977
             //logMetacat.debug("in if");
978
             StringBuffer sql = new StringBuffer();
979
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
980
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
981
             sql.append("?,?)");
982
             //logMetacat.debug("sql: " + sql.toString());
983
             pstmt = dbConn.prepareStatement(sql.toString());
984
             pstmt.setString(1, (String)v.elementAt(0));
985
             pstmt.setString(2, (String)v.elementAt(1));
986
             pstmt.setString(3, (String)v.elementAt(2));
987
             pstmt.setString(4, (String)v.elementAt(3));
988
             pstmt.setString(5, (String)v.elementAt(4));
989
             pstmt.execute();
990
             pstmt.close();
991
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
992
                               (String)v.elementAt(3) + " from server"+server);
993
           }
994
        }
995
        catch(Exception e)
996
        {
997
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
998
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
999
                                    server + " because " +e.getMessage());
1000
        }//catch
1001
        finally
1002
        {
1003
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1004
        }//finally
1005
      }//for remote catalog
1006
    }//for server list
1007
    logReplication.info("End of updateCatalog");
1008
  }
1009

    
1010
  /**
1011
   * Method that returns true if docid has already been "deleted" from metacat.
1012
   * This method really implements a truth table for deleted documents
1013
   * The table is (a docid in one of the tables is represented by the X):
1014
   * xml_docs      xml_revs      deleted?
1015
   * ------------------------------------
1016
   *   X             X             FALSE
1017
   *   X             _             FALSE
1018
   *   _             X             TRUE
1019
   *   _             _             TRUE
1020
   */
1021
  private static boolean alreadyDeleted(String docid) throws HandlerException
1022
  {
1023
    DBConnection dbConn = null;
1024
    int serialNumber = -1;
1025
    PreparedStatement pstmt = null;
1026
    try
1027
    {
1028
      dbConn=DBConnectionPool.
1029
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1030
      serialNumber=dbConn.getCheckOutSerialNumber();
1031
      boolean xml_docs = false;
1032
      boolean xml_revs = false;
1033

    
1034
      StringBuffer sb = new StringBuffer();
1035
      sb.append("select docid from xml_revisions where docid like ? ");
1036
      pstmt = dbConn.prepareStatement(sb.toString());
1037
      pstmt.setString(1, docid);
1038
      pstmt.execute();
1039
      ResultSet rs = pstmt.getResultSet();
1040
      boolean tablehasrows = rs.next();
1041
      if(tablehasrows)
1042
      {
1043
        xml_revs = true;
1044
      }
1045

    
1046
      sb = new StringBuffer();
1047
      sb.append("select docid from xml_documents where docid like '");
1048
      sb.append(docid).append("'");
1049
      pstmt.close();
1050
      pstmt = dbConn.prepareStatement(sb.toString());
1051
      //increase usage count
1052
      dbConn.increaseUsageCount(1);
1053
      pstmt.execute();
1054
      rs = pstmt.getResultSet();
1055
      tablehasrows = rs.next();
1056
      pstmt.close();
1057
      if(tablehasrows)
1058
      {
1059
        xml_docs = true;
1060
      }
1061

    
1062
      if(xml_docs && xml_revs)
1063
      {
1064
        return false;
1065
      }
1066
      else if(xml_docs && !xml_revs)
1067
      {
1068
        return false;
1069
      }
1070
      else if(!xml_docs && xml_revs)
1071
      {
1072
        return true;
1073
      }
1074
      else if(!xml_docs && !xml_revs)
1075
      {
1076
        return true;
1077
      }
1078
    }
1079
    catch(Exception e)
1080
    {
1081
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1082
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1083
                          e.getMessage());
1084
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1085
    		  + e.getMessage());
1086
    }
1087
    finally
1088
    {
1089
      try
1090
      {
1091
        pstmt.close();
1092
      }//try
1093
      catch (SQLException ee)
1094
      {
1095
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1096
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1097
                          "to close pstmt: "+ee.getMessage());
1098
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1099
      		  + ee.getMessage());
1100
      }//catch
1101
      finally
1102
      {
1103
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1104
      }//finally
1105
    }//finally
1106
    return false;
1107
  }
1108

    
1109

    
1110
  /**
1111
   * Method to initialize the message parser
1112
   */
1113
  public static XMLReader initParser(DefaultHandler dh)
1114
          throws HandlerException
1115
  {
1116
    XMLReader parser = null;
1117

    
1118
    try {
1119
      ContentHandler chandler = dh;
1120

    
1121
      // Get an instance of the parser
1122
      String parserName = PropertyService.getProperty("xml.saxparser");
1123
      parser = XMLReaderFactory.createXMLReader(parserName);
1124

    
1125
      // Turn off validation
1126
      parser.setFeature("http://xml.org/sax/features/validation", false);
1127

    
1128
      parser.setContentHandler((ContentHandler)chandler);
1129
      parser.setErrorHandler((ErrorHandler)chandler);
1130

    
1131
    } catch (SAXException se) {
1132
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1133
    		  + " initializing parser: " + se.getMessage());
1134
    } catch (PropertyNotFoundException pnfe) {
1135
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1136
      		  + " getting parser name: " + pnfe.getMessage());
1137
    } 
1138

    
1139
    return parser;
1140
  }
1141

    
1142
  /**
1143
	 * This method will combine given time string(in short format) to current
1144
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1145
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1146
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1147
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1148
	 * 
1149
	 * @param givenTime
1150
	 *            the format should be "10:00 AM " or "2:00 PM"
1151
	 * @return
1152
	 * @throws Exception
1153
	 */
1154
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1155
  {
1156
	  try {
1157
     Date givenDate = parseTime(givenTime);
1158
     Date newDate = null;
1159
     Date now = new Date();
1160
     String currentTimeString = getTimeString(now);
1161
     Date currentTime = parseTime(currentTimeString); 
1162
     if ( currentTime.getTime() >= givenDate.getTime())
1163
     {
1164
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1165
        String dateAndTime = getDateString(now) + " " + givenTime;
1166
        Date combinationDate = parseDateTime(dateAndTime);
1167
        // new date should plus 24 hours to make is the second day
1168
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1169
     }
1170
     else
1171
     {
1172
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1173
         String dateAndTime = getDateString(now) + " " + givenTime;
1174
         newDate = parseDateTime(dateAndTime);
1175
     }
1176
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1177
     return newDate;
1178
	  } catch (ParseException pe) {
1179
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1180
				  + "parsing error: "  + pe.getMessage());
1181
	  }
1182
  }
1183

    
1184
  /*
1185
	 * parse a given string to Time in short format. For example, given time is
1186
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1187
	 */
1188
  private static Date parseTime(String timeString) throws ParseException
1189
  {
1190
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1191
    Date time = format.parse(timeString); 
1192
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1193
                              +time.toString());
1194
    return time;
1195

    
1196
  }
1197
  
1198
  /*
1199
   * Parse a given string to date and time. Date format is long and time
1200
   * format is short.
1201
   */
1202
  private static Date parseDateTime(String timeString) throws ParseException
1203
  {
1204
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1205
    Date time = format.parse(timeString);
1206
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1207
                             time.toString());
1208
    return time;
1209
  }
1210
  
1211
  /*
1212
   * Get a date string from a Date object. The date format will be long
1213
   */
1214
  private static String getDateString(Date now)
1215
  {
1216
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1217
     String s = df.format(now);
1218
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1219
     return s;
1220
  }
1221
  
1222
  /*
1223
   * Get a time string from a Date object, the time format will be short
1224
   */
1225
  private static String getTimeString(Date now)
1226
  {
1227
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1228
     String s = df.format(now);
1229
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1230
     return s;
1231
  }
1232
  
1233
  
1234
  /*
1235
	 * This method will go through the docid list both in xml_Documents table
1236
	 * and in xml_revisions table @author tao
1237
	 */
1238
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1239
		boolean dataFile = false;
1240
		for (int j = 0; j < docList.size(); j++) {
1241
			// initial dataFile is false
1242
			dataFile = false;
1243
			// w is information for one document, information contain
1244
			// docid, rev, server or datafile.
1245
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1246
			// Check if the vector w contain "datafile"
1247
			// If it has, this document is data file
1248
			try {
1249
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1250
					dataFile = true;
1251
				}
1252
			} catch (PropertyNotFoundException pnfe) {
1253
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1254
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1255
						+ "Leaving as false: " + pnfe.getMessage());
1256
			}
1257
			// logMetacat.debug("w: " + w.toString());
1258
			// Get docid
1259
			String docid = (String) w.elementAt(0);
1260
			logReplication.info("docid: " + docid);
1261
			// Get revision number
1262
			int rev = Integer.parseInt((String) w.elementAt(1));
1263
			logReplication.info("rev: " + rev);
1264
			// Get remote server name (it is may not be doc home server because
1265
			// the new hub feature
1266
			String remoteServer = (String) w.elementAt(2);
1267
			remoteServer = remoteServer.trim();
1268

    
1269
			try {
1270
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1271
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1272
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1273
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1274
				} else {
1275
					continue;
1276
				}
1277

    
1278
			} catch (Exception e) {
1279
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1280
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1281
						+ " in time replication" + e.getMessage(), e);
1282
				continue;
1283
			}
1284
			
1285
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1286
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1287
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1288
	        }
1289
	        
1290
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1291
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1292
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1293
	        }
1294

    
1295
		}// for update docs
1296

    
1297
	}
1298
   
1299
   /*
1300
	 * This method will handle doc in xml_documents table.
1301
	 */
1302
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1303
                                        throws HandlerException
1304
   {
1305
       // compare the update rev and local rev to see what need happen
1306
       int localrev = -1;
1307
       String action = null;
1308
       boolean flag = false;
1309
       try
1310
       {
1311
    	 long docQueryStartTime = System.currentTimeMillis();
1312
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1313
         long docQueryEndTime = System.currentTimeMillis();
1314
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1315
         _xmlDocQueryCount++;
1316
       }
1317
       catch (SQLException e)
1318
       {
1319
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1320
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1321
                                " be found because " + e.getMessage());
1322
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1323
                 "written because error happend to find it's local revision");
1324
         DOCERRORNUMBER++;
1325
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1326
                 " be found: " + e.getMessage());
1327
       }
1328
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1329
                               localrev);
1330

    
1331
       //check the revs for an update because this document is in the
1332
       //local DB, it might be out of date.
1333
       if (localrev == -1)
1334
       {
1335
          // check if the revision is in the revision table
1336
    	   Vector<Integer> localRevVector = null;
1337
    	 try {
1338
        	 long revQueryStartTime = System.currentTimeMillis();
1339
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1340
             long revQueryEndTime = System.currentTimeMillis();
1341
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1342
             _xmlRevQueryCount++;
1343
    	 } catch (SQLException sqle) {
1344
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1345
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1346
    	 }
1347
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1348
         {
1349
             // this version was deleted, so don't need replicate
1350
             flag = false;
1351
         }
1352
         else
1353
         {
1354
           //insert this document as new because it is not in the local DB
1355
           action = "INSERT";
1356
           flag = true;
1357
         }
1358
       }
1359
       else
1360
       {
1361
         if(localrev == rev)
1362
         {
1363
           // Local meatacat has the same rev to remote host, don't need
1364
           // update and flag set false
1365
           flag = false;
1366
         }
1367
         else if(localrev < rev)
1368
         {
1369
           //this document needs to be updated so send an read request
1370
           action = "UPDATE";
1371
           flag = true;
1372
         }
1373
       }
1374
       
1375
       String accNumber = null;
1376
       try {
1377
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1378
       } catch (PropertyNotFoundException pnfe) {
1379
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1380
    			   + "account number separator : " + pnfe.getMessage());
1381
       }
1382
       // this is non-data file
1383
       if(flag && !dataFile)
1384
       {
1385
         try
1386
         {
1387
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1388
         }
1389
         catch(HandlerException he)
1390
         {
1391
           // skip this document
1392
           throw he;
1393
         }
1394
       }//if for non-data file
1395

    
1396
        // this is for data file
1397
       if(flag && dataFile)
1398
       {
1399
         try
1400
         {
1401
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1402
         }
1403
         catch(HandlerException he)
1404
         {
1405
           // skip this data file
1406
           throw he;
1407
         }
1408

    
1409
       }//for data file
1410
   }
1411
   
1412
   /*
1413
    * This method will handle doc in xml_documents table.
1414
    */
1415
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1416
                                        throws HandlerException
1417
   {
1418
       // compare the update rev and local rev to see what need happen
1419
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1420
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1421
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1422
       Vector<Integer> localrev = null;
1423
       String action = "INSERT";
1424
       boolean flag = false;
1425
       try
1426
       {
1427
      	 long revQueryStartTime = System.currentTimeMillis();
1428
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1429
         long revQueryEndTime = System.currentTimeMillis();
1430
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1431
         _xmlRevQueryCount++;
1432
       }
1433
       catch (SQLException sqle)
1434
       {
1435
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1436
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1437
                                " be found because " + sqle.getMessage());
1438
         REVERRORNUMBER++;
1439
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1440
        		 + sqle.getMessage());
1441
       }
1442
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1443
                               localrev.toString());
1444
       
1445
       // if the rev is not in the xml_revision, we need insert it
1446
       if (!localrev.contains(new Integer(rev)))
1447
       {
1448
           flag = true;    
1449
       }
1450
     
1451
       String accNumber = null;
1452
       try {
1453
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1454
       } catch (PropertyNotFoundException pnfe) {
1455
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1456
    			   + "account number separator : " + pnfe.getMessage());
1457
       }
1458
       // this is non-data file
1459
       if(flag && !dataFile)
1460
       {
1461
         try
1462
         {
1463
           
1464
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1465
         }
1466
         catch(HandlerException he)
1467
         {
1468
           // skip this document
1469
           throw he;
1470
         }
1471
       }//if for non-data file
1472

    
1473
        // this is for data file
1474
       if(flag && dataFile)
1475
       {
1476
         try
1477
         {
1478
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1479
         }
1480
         catch(HandlerException he)
1481
         {
1482
           // skip this data file
1483
           throw he;
1484
         }
1485

    
1486
       }//for data file
1487
   }
1488
   
1489
   /*
1490
    * Return a ip address for given url
1491
    */
1492
   private String getIpFromURL(URL url)
1493
   {
1494
	   String ip = null;
1495
	   try
1496
	   {
1497
	      InetAddress address = InetAddress.getByName(url.getHost());
1498
	      ip = address.getHostAddress();
1499
	   }
1500
	   catch(UnknownHostException e)
1501
	   {
1502
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1503
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1504
                   +e.getMessage());
1505
	   }
1506

    
1507
	   return ip;
1508
   }
1509
  
1510
}
1511

    
(3-3/7)