Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2011-10-20 14:03:19 -0700 (Thu, 20 Oct 2011) $'
10
 * '$Revision: 6542 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.text.DateFormat;
39
import java.text.ParseException;
40
import java.util.Date;
41
import java.util.Hashtable;
42
import java.util.TimerTask;
43
import java.util.Vector;
44

    
45
import org.apache.log4j.Logger;
46
import org.dataone.service.types.v1.SystemMetadata;
47
import org.dataone.service.util.TypeMarshaller;
48
import org.xml.sax.ContentHandler;
49
import org.xml.sax.ErrorHandler;
50
import org.xml.sax.InputSource;
51
import org.xml.sax.SAXException;
52
import org.xml.sax.XMLReader;
53
import org.xml.sax.helpers.DefaultHandler;
54
import org.xml.sax.helpers.XMLReaderFactory;
55

    
56
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
57
import edu.ucsb.nceas.metacat.DBUtil;
58
import edu.ucsb.nceas.metacat.DocInfoHandler;
59
import edu.ucsb.nceas.metacat.DocumentImpl;
60
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
61
import edu.ucsb.nceas.metacat.EventLog;
62
import edu.ucsb.nceas.metacat.IdentifierManager;
63
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
64
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
65
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
66
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
67
import edu.ucsb.nceas.metacat.database.DBConnection;
68
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
69
import edu.ucsb.nceas.metacat.database.DatabaseService;
70
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
71
import edu.ucsb.nceas.metacat.properties.PropertyService;
72
import edu.ucsb.nceas.metacat.shared.HandlerException;
73
import edu.ucsb.nceas.metacat.util.MetacatUtil;
74
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
75
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
76

    
77

    
78

    
79
/**
80
 * This class handles deltaT replication checking.  Whenever this TimerTask
81
 * is fired it checks each server in xml_replication for updates and updates
82
 * the local db as needed.
83
 */
84
public class ReplicationHandler extends TimerTask
85
{
86
  int serverCheckCode = 1;
87
  ReplicationServerList serverList = null;
88
  //PrintWriter out;
89
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
90
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
91
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
92
  private static Logger logD1 = Logger.getLogger("DataOneLogger");
93
  
94
  private static int DOCINSERTNUMBER = 1;
95
  private static int DOCERRORNUMBER  = 1;
96
  private static int REVINSERTNUMBER = 1;
97
  private static int REVERRORNUMBER  = 1;
98
  
99
  private static int _xmlDocQueryCount = 0;
100
  private static int _xmlRevQueryCount = 0;
101
  private static long _xmlDocQueryTime = 0;
102
  private static long _xmlRevQueryTime = 0;
103
  
104
  
105
  public ReplicationHandler()
106
  {
107
    //this.out = o;
108
    serverList = new ReplicationServerList();
109
  }
110

    
111
  public ReplicationHandler(int serverCheckCode)
112
  {
113
    //this.out = o;
114
    this.serverCheckCode = serverCheckCode;
115
    serverList = new ReplicationServerList();
116
  }
117

    
118
  /**
119
   * Method that implements TimerTask.run().  It runs whenever the timer is
120
   * fired.
121
   */
122
  public void run()
123
  {
124
    //find out the last_checked time of each server in the server list and
125
    //send a query to each server to see if there are any documents in
126
    //xml_documents with an update_date > last_checked
127
	  
128
      //if serverList is null, metacat don't need to replication
129
      if (serverList==null||serverList.isEmpty())
130
      {
131
        return;
132
      }
133
      updateCatalog();
134
      update();
135
      //conn.close();
136
  }
137

    
138
  /**
139
   * Method that uses revision tagging for replication instead of update_date.
140
   */
141
  private void update()
142
  {
143
	  
144
	  _xmlDocQueryCount = 0;
145
	  _xmlRevQueryCount = 0;
146
	  _xmlDocQueryTime = 0;
147
	  _xmlRevQueryTime = 0;
148
    /*
149
     Pseudo-algorithm
150
     - request a doc list from each server in xml_replication
151
     - check the rev number of each of those documents agains the
152
       documents in the local database
153
     - pull any documents that have a lesser rev number on the local server
154
       from the remote server
155
     - delete any documents that still exist in the local xml_documents but
156
       are in the deletedDocuments tag of the remote host response.
157
     - update last_checked to keep track of the last time it was checked.
158
       (this info is theoretically not needed using this system but probably
159
       should be kept anyway)
160
    */
161

    
162
    ReplicationServer replServer = null; // Variable to store the
163
                                        // ReplicationServer got from
164
                                        // Server list
165
    String server = null; // Variable to store server name
166
//    String update;
167
    Vector<String> responses = new Vector<String>();
168
    URL u;
169
    long replicationStartTime = System.currentTimeMillis();
170
    long timeToGetServerList = 0;
171
    
172
    //Check for every server in server list to get updated list and put
173
    // them in to response
174
    long startTimeToGetServers = System.currentTimeMillis();
175
    for (int i=0; i<serverList.size(); i++)
176
    {
177
        // Get ReplicationServer object from server list
178
        replServer = serverList.serverAt(i);
179
        // Get server name from ReplicationServer object
180
        server = replServer.getServerName().trim();
181
        String result = null;
182
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
183
        // Send command to that server to get updated docid information
184
        try
185
        {
186
          u = new URL("https://" + server + "?server="
187
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
188
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
189
          result = ReplicationService.getURLContent(u);
190
        }
191
        catch (Exception e)
192
        {
193
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
194
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
195
                       "for server " + server + " because "+e.getMessage());
196
          continue;
197
        }
198

    
199
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
200
        //check if result have error or not, if has skip it.
201
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
202
        {
203
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
204
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
205
                       "for server " + server + " because "+result);
206
          continue;
207
        }
208
        //Add result to vector
209
        responses.add(result);
210
    }
211
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
212

    
213
    //make sure that there is updated file list
214
    //If response is null, metacat don't need do anything
215
    if (responses==null || responses.isEmpty())
216
    {
217
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
218
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
219
                           "every server and failed to replicate");
220
        return;
221
    }
222

    
223

    
224
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
225
    //               "document information: "+ responses.toString());
226
    
227
    long totalServerListParseTime = 0;
228
    // go through response vector(it contains updated vector and delete vector
229
    for(int i=0; i<responses.size(); i++)
230
    {
231
    	long startServerListParseTime = System.currentTimeMillis();
232
    	XMLReader parser;
233
    	ReplMessageHandler message = new ReplMessageHandler();
234
    	try
235
        {
236
          parser = initParser(message);
237
        }
238
        catch (Exception e)
239
        {
240
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
241
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
242
                                " initParser for message and " +e.getMessage());
243
           // stop replication
244
           return;
245
        }
246
    	
247
        try
248
        {
249
          parser.parse(new InputSource(
250
                     new StringReader(
251
                     (String)(responses.elementAt(i)))));
252
        }
253
        catch(Exception e)
254
        {
255
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
256
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
257
                                   "because "+ e.getMessage());
258
          continue;
259
        }
260
        //v is the list of updated documents
261
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
262
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
263
        //d is the list of deleted documents
264
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
265
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
266
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
267
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
268
        // go though every element in updated document vector
269
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
270
        //handle deleted docs
271
        for(int k=0; k<deleteList.size(); k++)
272
        { //delete the deleted documents;
273
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
274
          String docId = (String)w.elementAt(0);
275
          try
276
          {
277
            handleDeleteSingleDocument(docId, server);
278
          }
279
          catch (Exception ee)
280
          {
281
            continue;
282
          }
283
        }//for delete docs
284
        
285
        // handle replicate doc in xml_revision
286
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
287
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
288
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
289
        DOCINSERTNUMBER = 1;
290
        DOCERRORNUMBER  = 1;
291
        REVINSERTNUMBER = 1;
292
        REVERRORNUMBER  = 1;
293
        
294
        // handle system metadata
295
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
296
        for(int k = 0; k < systemMetadataList.size(); k++) { 
297
        	Vector<String> w = systemMetadataList.elementAt(k);
298
        	String guid = (String) w.elementAt(0);
299
        	String remoteserver = (String) w.elementAt(1);
300
        	try {
301
        		handleSystemMetadata(remoteserver, guid);
302
        	}
303
        	catch (Exception ee) {
304
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
305
        		continue;
306
        	}
307
        }
308
        
309
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
310
    }//for response
311

    
312
    //updated last_checked
313
    for (int i=0;i<serverList.size(); i++)
314
    {
315
       // Get ReplicationServer object from server list
316
       replServer = serverList.serverAt(i);
317
       try
318
       {
319
         updateLastCheckTimeForSingleServer(replServer);
320
       }
321
       catch(Exception e)
322
       {
323
         continue;
324
       }
325
    }//for
326
    
327
    long replicationEndTime = System.currentTimeMillis();
328
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
329
    		(replicationEndTime - replicationStartTime));
330
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
331
    		timeToGetServerList);
332
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
333
    		totalServerListParseTime);
334
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
335
    		_xmlDocQueryCount);
336
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
337
    		_xmlDocQueryTime + " ms");
338
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
339
    		_xmlRevQueryCount);
340
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
341
    		_xmlRevQueryTime + " ms");;
342

    
343
  }//update
344

    
345
  /* Handle replicate single xml document*/
346
  private void handleSingleXMLDocument(String remoteserver, String actions,
347
                                       String accNumber, String tableName)
348
               throws HandlerException
349
  {
350
    DBConnection dbConn = null;
351
    int serialNumber = -1;
352
    try
353
    {
354
      // Get DBConnection from pool
355
      dbConn=DBConnectionPool.
356
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
357
      serialNumber=dbConn.getCheckOutSerialNumber();
358
      //if the document needs to be updated or inserted, this is executed
359
      String readDocURLString = "https://" + remoteserver + "?server="+
360
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
361
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
362
      URL u = new URL(readDocURLString);
363

    
364
      // Get docid content
365
      String newxmldoc = ReplicationService.getURLContent(u);
366
      // If couldn't get skip it
367
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
368
      {
369
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
370
      }
371
      //logReplication.info("xml documnet:");
372
      //logReplication.info(newxmldoc);
373

    
374
      // Try get the docid info from remote server
375
      DocInfoHandler dih = new DocInfoHandler();
376
      XMLReader docinfoParser = initParser(dih);
377
      String docInfoURLStr = "https://" + remoteserver +
378
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
379
                       "&action=getdocumentinfo&docid="+accNumber;
380
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
381
      URL docinfoUrl = new URL(docInfoURLStr);
382
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
383
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
384
      
385
      // strip out the system metadata portion
386
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
387
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
388
      
389
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
390
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
391
      // Get home server of the docid
392
      String docHomeServer = docinfoHash.get("home_server");
393
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
394
      String createdDate = docinfoHash.get("date_created");
395
      String updatedDate = docinfoHash.get("date_updated");
396
      //docid should include rev number too
397
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
398
                                              (String)docinfoHash.get("rev");*/
399
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
400
      String docType = docinfoHash.get("doctype");
401
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
402

    
403
      String parserBase = null;
404
      // this for eml2 and we need user eml2 parser
405
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
406
      {
407
         parserBase = DocumentImpl.EML200;
408
      }
409
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
410
      {
411
        parserBase = DocumentImpl.EML200;
412
      }
413
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
414
      {
415
        parserBase = DocumentImpl.EML210;
416
      }
417
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
418
      {
419
        parserBase = DocumentImpl.EML210;
420
      }
421
      // Write the document into local host
422
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
423
      String newDocid = wrapper.writeReplication(dbConn,
424
                              newxmldoc,
425
                              docinfoHash.get("public_access"),
426
                              null,  /* the dtd text */
427
                              actions,
428
                              accNumber,
429
                              null, //docinfoHash.get("user_owner"),                              
430
                              null, /* null for groups[] */
431
                              docHomeServer,
432
                              remoteserver, tableName, true,// true is for time replication 
433
                              createdDate,
434
                              updatedDate);
435
      
436
      //set the user information
437
      String user = (String) docinfoHash.get("user_owner");
438
      String updated = (String) docinfoHash.get("user_updated");
439
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
440
      
441
      //process extra access rules 
442
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
443
      if (xmlAccessDAOList != null) {
444
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
445
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
446
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
447
      			acfsf.insertPermissions(xmlAccessDAO);
448
      		}
449
          }
450
      }
451
      
452
      // process system metadata
453
      if (systemMetadataXML != null) {
454
    	  SystemMetadata sysMeta = 
455
    		  TypeMarshaller.unmarshalTypeFromStream(
456
    				  SystemMetadata.class, 
457
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
458
    	  // need the guid-to-docid mapping
459
      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
460
      }
461
      
462
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
463
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
464
      {
465
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
466
                                     " into "+tableName + " from " +
467
                                         remoteserver);
468
        DOCINSERTNUMBER++;
469
      }
470
      else
471
      {
472
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
473
                  " into "+tableName + " from " +
474
                      remoteserver);
475
          REVINSERTNUMBER++;
476
      }
477
      String ip = getIpFromURL(u);
478
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
479
      
480

    
481
    }//try
482
    catch(Exception e)
483
    {
484
        
485
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
486
        {
487
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
488
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
489
                                       " into "+tableName + " from " +
490
                                           remoteserver + " because "+e.getMessage());
491
          DOCERRORNUMBER++;
492
        }
493
        else
494
        {
495
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
496
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
497
                    " into "+tableName + " from " +
498
                        remoteserver +" because "+e.getMessage());
499
            REVERRORNUMBER++;
500
        }
501
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
502
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
503
                                      " into db because " +e.getMessage());
504
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
505
    		  + "writing Replication: " +e.getMessage());
506
    }
507
    finally
508
    {
509
       //return DBConnection
510
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
511
    }//finally
512
    logD1.info("replication.create localId:" + accNumber);
513
  }
514

    
515

    
516

    
517
  /* Handle replicate single xml document*/
518
  private void handleSingleDataFile(String remoteserver, String actions,
519
                                    String accNumber, String tableName)
520
               throws HandlerException
521
  {
522
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
523
    DBConnection dbConn = null;
524
    int serialNumber = -1;
525
    try
526
    {
527
      // Get DBConnection from pool
528
      dbConn=DBConnectionPool.
529
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
530
      serialNumber=dbConn.getCheckOutSerialNumber();
531
      // Try get docid info from remote server
532
      DocInfoHandler dih = new DocInfoHandler();
533
      XMLReader docinfoParser = initParser(dih);
534
      String docInfoURLString = "https://" + remoteserver +
535
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
536
                  "&action=getdocumentinfo&docid="+accNumber;
537
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
538
      URL docinfoUrl = new URL(docInfoURLString);
539

    
540
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
541
      
542
      // strip out the system metadata portion
543
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
544
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
545
   	  
546
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
547
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
548
      
549
      // Get docid name (such as acl or dataset)
550
      String docName = docinfoHash.get("docname");
551
      // Get doc type (eml public id)
552
      String docType = docinfoHash.get("doctype");
553
      // Get docid home sever. it might be different to remoteserver
554
      // because of hub feature
555
      String docHomeServer = docinfoHash.get("home_server");
556
      String createdDate = docinfoHash.get("date_created");
557
      String updatedDate = docinfoHash.get("date_updated");
558
      //docid should include rev number too
559
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
560
                                              (String)docinfoHash.get("rev");*/
561

    
562
      String datafilePath = PropertyService.getProperty("application.datafilepath");
563
      // Get data file content
564
      String readDataURLString = "https://" + remoteserver + "?server="+
565
                                        MetacatUtil.getLocalReplicationServerName()+
566
                                            "&action=readdata&docid="+accNumber;
567
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
568
      URL u = new URL(readDataURLString);
569
      InputStream input = u.openStream();
570
      //register data file into xml_documents table and wite data file
571
      //into file system
572
      if ( input != null)
573
      {
574
        DocumentImpl.writeDataFileInReplication(input,
575
                                                datafilePath,
576
                                                docName,docType,
577
                                                accNumber,
578
                                                null,
579
                                                docHomeServer,
580
                                                remoteserver,
581
                                                tableName,
582
                                                true, //true means timed replication
583
                                                createdDate,
584
                                                updatedDate);
585
                                         
586
        //set the user information
587
        String user = (String) docinfoHash.get("user_owner");
588
		String updated = (String) docinfoHash.get("user_updated");
589
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
590
        
591
        //process extra access rules
592
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
593
        if (xmlAccessDAOList != null) {
594
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
595
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
596
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
597
        			acfsf.insertPermissions(xmlAccessDAO);
598
        		}
599
            }
600
        }
601
        
602
        // process system metadata
603
        if (systemMetadataXML != null) {
604
      	  SystemMetadata sysMeta = 
605
      		TypeMarshaller.unmarshalTypeFromStream(
606
      				  SystemMetadata.class, 
607
      				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
608
      	  // need the guid-to-docid mapping
609
      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
610
        }
611
        
612
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
613
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
614
                                    remote server);*/
615
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
616
        {
617
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
618
                                       " into "+tableName + " from " +
619
                                           remoteserver);
620
          DOCINSERTNUMBER++;
621
        }
622
        else
623
        {
624
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
625
                    " into "+tableName + " from " +
626
                        remoteserver);
627
            REVINSERTNUMBER++;
628
        }
629
        String ip = getIpFromURL(u);
630
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
631
        
632
      }//if
633
      else
634
      {
635
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
636
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
637
      }//else
638

    
639
    }//try
640
    catch(Exception e)
641
    {
642
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
643
                                      " because " +e.getMessage());*/
644
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
645
      {
646
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
647
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
648
                                     " into " + tableName + " from " +
649
                                         remoteserver + " because " + e.getMessage());
650
        DOCERRORNUMBER++;
651
      }
652
      else
653
      {
654
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
655
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
656
                  " into " + tableName + " from " +
657
                      remoteserver +" because "+ e.getMessage());
658
          REVERRORNUMBER++;
659
      }
660
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
661
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
662
                                      " because " + e.getMessage());
663
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
664
    		  + "writing Replication: " + e.getMessage());
665
    }
666
    finally
667
    {
668
       //return DBConnection
669
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
670
    }//finally
671
    logD1.info("replication.create localId:" + accNumber);
672
  }
673

    
674

    
675

    
676
  /* Handle delete single document*/
677
  private void handleDeleteSingleDocument(String docId, String notifyServer)
678
               throws HandlerException
679
  {
680
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
681
    DBConnection dbConn = null;
682
    int serialNumber = -1;
683
    try
684
    {
685
      // Get DBConnection from pool
686
      dbConn=DBConnectionPool.
687
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
688
      serialNumber=dbConn.getCheckOutSerialNumber();
689
      if(!alreadyDeleted(docId))
690
      {
691

    
692
         //because delete method docid should have rev number
693
         //so we just add one for it. This rev number is no sence.
694
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
695
         DocumentImpl.delete(accnum, null, null, notifyServer);
696
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
697
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
698
         URL u = new URL("https://"+notifyServer);
699
         String ip = getIpFromURL(u);
700
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
701
      }
702

    
703
    }//try
704
    catch(McdbDocNotFoundException e)
705
    {
706
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
707
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
708
                                 " in db because because " + e.getMessage());
709
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
710
    		  + "when handling document: " + e.getMessage());
711
    }
712
    catch(InsufficientKarmaException e)
713
    {
714
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
715
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
716
                                 " in db because because " + e.getMessage());
717
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
718
    		  + "when handling document: " + e.getMessage());
719
    }
720
    catch(SQLException e)
721
    {
722
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
723
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
724
                                 " in db because because " + e.getMessage());
725
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
726
    		  + "when handling document: " + e.getMessage());
727
    }
728
    catch(Exception e)
729
    {
730
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
731
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
732
                                 " in db because because " + e.getMessage());
733
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
734
    		  + "when handling document: " + e.getMessage());
735
    }
736
    finally
737
    {
738
       //return DBConnection
739
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
740
    }//finally
741
    logD1.info("replication.handleDeleteSingleDocument localId:" + docId);
742
  }
743

    
744
  /* Handle updateLastCheckTimForSingleServer*/
745
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
746
                                                  throws HandlerException
747
  {
748
    String server = repServer.getServerName();
749
    DBConnection dbConn = null;
750
    int serialNumber = -1;
751
    PreparedStatement pstmt = null;
752
    try
753
    {
754
      // Get DBConnection from pool
755
      dbConn=DBConnectionPool.
756
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
757
      serialNumber=dbConn.getCheckOutSerialNumber();
758

    
759
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
760
      // Get time from remote server
761
      URL dateurl = new URL("https://" + server + "?server="+
762
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
763
      String datexml = ReplicationService.getURLContent(dateurl);
764
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
765
      if (datexml!=null && !datexml.equals(""))
766
      {
767
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
768
         StringBuffer sql = new StringBuffer();
769
         /*sql.append("update xml_replication set last_checked = to_date('");
770
         sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
771
         sql.append("server like '").append(server).append("'");*/
772
         sql.append("update xml_replication set last_checked = ");
773
         sql.append(DatabaseService.getInstance().getDBAdapter().toDate(datestr, "MM/DD/YY HH24:MI:SS"));
774
         sql.append(" where server like '").append(server).append("'");
775
         pstmt = dbConn.prepareStatement(sql.toString());
776

    
777
         pstmt.executeUpdate();
778
         dbConn.commit();
779
         pstmt.close();
780
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
781
                                      + server);
782
      }//if
783
      else
784
      {
785

    
786
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
787
                                  server + " in db because couldn't get time "
788
                                  );
789
         throw new Exception("Couldn't get time for server "+ server);
790
      }
791

    
792
    }//try
793
    catch(Exception e)
794
    {
795
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
796
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
797
                                server + " in db because because " + e.getMessage());
798
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
799
    		  + "Error updating last checked time: " + e.getMessage());
800
    }
801
    finally
802
    {
803
       //return DBConnection
804
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
805
    }//finally
806
  }
807
  
808
  	/**
809
	 * Handle replicate system metadata
810
	 * 
811
	 * @param remoteserver
812
	 * @param guid
813
	 * @throws HandlerException
814
	 */
815
	private void handleSystemMetadata(String remoteserver, String guid) 
816
		throws HandlerException {
817
		try {
818

    
819
			// Try get the system metadata from remote server
820
			String sysMetaURLStr = "https://" + remoteserver + "?server="
821
					+ MetacatUtil.getLocalReplicationServerName()
822
					+ "&action=getsystemmetadata&guid=" + guid;
823
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
824
			URL sysMetaUrl = new URL(sysMetaURLStr);
825
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
826
							+ sysMetaUrl.toString());
827
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
828

    
829
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
830

    
831
			// process system metadata
832
			if (systemMetadataXML != null) {
833
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
834
								new ByteArrayInputStream(systemMetadataXML
835
										.getBytes("UTF-8")));
836
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
837
			}
838

    
839
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
840
							+ guid);
841

    
842
			String ip = getIpFromURL(sysMetaUrl);
843
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
844

    
845
		} catch (Exception e) {
846
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
847
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
848
			logReplication
849
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
850
							+ guid + " into db because " + e.getMessage());
851
			throw new HandlerException(
852
					"ReplicationHandler.handleSystemMetadata - generic exception "
853
							+ "writing Replication: " + e.getMessage());
854
		}
855

    
856
	}
857

    
858
  /**
859
   * updates xml_catalog with entries from other servers.
860
   */
861
  private void updateCatalog()
862
  {
863
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
864
    // ReplicationServer object in server list
865
    ReplicationServer replServer = null;
866
    PreparedStatement pstmt = null;
867
    String server = null;
868

    
869

    
870
    // Go through each ReplicationServer object in sererlist
871
    for (int j=0; j<serverList.size(); j++)
872
    {
873
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
874
      Vector<String> publicId = new Vector<String>();
875
      try
876
      {
877
        // Get ReplicationServer object from server list
878
        replServer = serverList.serverAt(j);
879
        // Get server name from the ReplicationServer object
880
        server = replServer.getServerName();
881
        // Try to get catalog
882
        URL u = new URL("https://" + server + "?server="+
883
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
884
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
885
        String catxml = ReplicationService.getURLContent(u);
886

    
887
        // Make sure there are not error, no empty string
888
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
889
        {
890
          throw new Exception("Couldn't get catalog list form server " +server);
891
        }
892
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
893
        CatalogMessageHandler cmh = new CatalogMessageHandler();
894
        XMLReader catparser = initParser(cmh);
895
        catparser.parse(new InputSource(new StringReader(catxml)));
896
        //parse the returned catalog xml and put it into a vector
897
        remoteCatalog = cmh.getCatalogVect();
898

    
899
        // Make sure remoteCatalog is not empty
900
        if (remoteCatalog.isEmpty())
901
        {
902
          throw new Exception("Couldn't get catalog list form server " +server);
903
        }
904

    
905
        String localcatxml = ReplicationService.getCatalogXML();
906

    
907
        // Make sure local catalog is no empty
908
        if (localcatxml==null||localcatxml.equals(""))
909
        {
910
          throw new Exception("Couldn't get catalog list form server " +server);
911
        }
912

    
913
        cmh = new CatalogMessageHandler();
914
        catparser = initParser(cmh);
915
        catparser.parse(new InputSource(new StringReader(localcatxml)));
916
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
917

    
918
        //now we have the catalog from the remote server and this local server
919
        //we now need to compare the two and merge the differences.
920
        //the comparison is base on the public_id fields which is the 4th
921
        //entry in each row vector.
922
        publicId = new Vector<String>();
923
        for(int i=0; i<localCatalog.size(); i++)
924
        {
925
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
926
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
927
          publicId.add(new String((String)v.elementAt(3)));
928
        }
929
      }//try
930
      catch (Exception e)
931
      {
932
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
933
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
934
                                    server + " because " +e.getMessage());
935
      }//catch
936

    
937
      for(int i=0; i<remoteCatalog.size(); i++)
938
      {
939
         // DConnection
940
        DBConnection dbConn = null;
941
        // DBConnection checkout serial number
942
        int serialNumber = -1;
943
        try
944
        {
945
            dbConn=DBConnectionPool.
946
                  getDBConnection("ReplicationHandler.updateCatalog");
947
            serialNumber=dbConn.getCheckOutSerialNumber();
948
            Vector<String> v = remoteCatalog.elementAt(i);
949
            //logMetacat.debug("v2: " + v.toString());
950
            //logMetacat.debug("i: " + i);
951
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
952
            //logMetacat.debug("publicID: " + publicId.toString());
953
            logReplication.info
954
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
955
           if(!publicId.contains(v.elementAt(3)))
956
           { //so we don't have this public id in our local table so we need to
957
             //add it.
958
             //logMetacat.debug("in if");
959
             StringBuffer sql = new StringBuffer();
960
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
961
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
962
             sql.append("?,?)");
963
             //logMetacat.debug("sql: " + sql.toString());
964
             pstmt = dbConn.prepareStatement(sql.toString());
965
             pstmt.setString(1, (String)v.elementAt(0));
966
             pstmt.setString(2, (String)v.elementAt(1));
967
             pstmt.setString(3, (String)v.elementAt(2));
968
             pstmt.setString(4, (String)v.elementAt(3));
969
             pstmt.setString(5, (String)v.elementAt(4));
970
             pstmt.execute();
971
             pstmt.close();
972
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
973
                               (String)v.elementAt(3) + " from server"+server);
974
           }
975
        }
976
        catch(Exception e)
977
        {
978
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
979
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
980
                                    server + " because " +e.getMessage());
981
        }//catch
982
        finally
983
        {
984
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
985
        }//finally
986
      }//for remote catalog
987
    }//for server list
988
    logReplication.info("End of updateCatalog");
989
  }
990

    
991
  /**
992
   * Method that returns true if docid has already been "deleted" from metacat.
993
   * This method really implements a truth table for deleted documents
994
   * The table is (a docid in one of the tables is represented by the X):
995
   * xml_docs      xml_revs      deleted?
996
   * ------------------------------------
997
   *   X             X             FALSE
998
   *   X             _             FALSE
999
   *   _             X             TRUE
1000
   *   _             _             TRUE
1001
   */
1002
  private static boolean alreadyDeleted(String docid) throws HandlerException
1003
  {
1004
    DBConnection dbConn = null;
1005
    int serialNumber = -1;
1006
    PreparedStatement pstmt = null;
1007
    try
1008
    {
1009
      dbConn=DBConnectionPool.
1010
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1011
      serialNumber=dbConn.getCheckOutSerialNumber();
1012
      boolean xml_docs = false;
1013
      boolean xml_revs = false;
1014

    
1015
      StringBuffer sb = new StringBuffer();
1016
      sb.append("select docid from xml_revisions where docid like '");
1017
      sb.append(docid).append("'");
1018
      pstmt = dbConn.prepareStatement(sb.toString());
1019
      pstmt.execute();
1020
      ResultSet rs = pstmt.getResultSet();
1021
      boolean tablehasrows = rs.next();
1022
      if(tablehasrows)
1023
      {
1024
        xml_revs = true;
1025
      }
1026

    
1027
      sb = new StringBuffer();
1028
      sb.append("select docid from xml_documents where docid like '");
1029
      sb.append(docid).append("'");
1030
      pstmt.close();
1031
      pstmt = dbConn.prepareStatement(sb.toString());
1032
      //increase usage count
1033
      dbConn.increaseUsageCount(1);
1034
      pstmt.execute();
1035
      rs = pstmt.getResultSet();
1036
      tablehasrows = rs.next();
1037
      pstmt.close();
1038
      if(tablehasrows)
1039
      {
1040
        xml_docs = true;
1041
      }
1042

    
1043
      if(xml_docs && xml_revs)
1044
      {
1045
        return false;
1046
      }
1047
      else if(xml_docs && !xml_revs)
1048
      {
1049
        return false;
1050
      }
1051
      else if(!xml_docs && xml_revs)
1052
      {
1053
        return true;
1054
      }
1055
      else if(!xml_docs && !xml_revs)
1056
      {
1057
        return true;
1058
      }
1059
    }
1060
    catch(Exception e)
1061
    {
1062
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1063
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1064
                          e.getMessage());
1065
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1066
    		  + e.getMessage());
1067
    }
1068
    finally
1069
    {
1070
      try
1071
      {
1072
        pstmt.close();
1073
      }//try
1074
      catch (SQLException ee)
1075
      {
1076
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1077
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1078
                          "to close pstmt: "+ee.getMessage());
1079
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1080
      		  + ee.getMessage());
1081
      }//catch
1082
      finally
1083
      {
1084
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1085
      }//finally
1086
    }//finally
1087
    return false;
1088
  }
1089

    
1090

    
1091
  /**
1092
   * Method to initialize the message parser
1093
   */
1094
  public static XMLReader initParser(DefaultHandler dh)
1095
          throws HandlerException
1096
  {
1097
    XMLReader parser = null;
1098

    
1099
    try {
1100
      ContentHandler chandler = dh;
1101

    
1102
      // Get an instance of the parser
1103
      String parserName = PropertyService.getProperty("xml.saxparser");
1104
      parser = XMLReaderFactory.createXMLReader(parserName);
1105

    
1106
      // Turn off validation
1107
      parser.setFeature("http://xml.org/sax/features/validation", false);
1108

    
1109
      parser.setContentHandler((ContentHandler)chandler);
1110
      parser.setErrorHandler((ErrorHandler)chandler);
1111

    
1112
    } catch (SAXException se) {
1113
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1114
    		  + " initializing parser: " + se.getMessage());
1115
    } catch (PropertyNotFoundException pnfe) {
1116
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1117
      		  + " getting parser name: " + pnfe.getMessage());
1118
    } 
1119

    
1120
    return parser;
1121
  }
1122

    
1123
  /**
1124
	 * This method will combine given time string(in short format) to current
1125
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1126
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1127
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1128
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1129
	 * 
1130
	 * @param givenTime
1131
	 *            the format should be "10:00 AM " or "2:00 PM"
1132
	 * @return
1133
	 * @throws Exception
1134
	 */
1135
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1136
  {
1137
	  try {
1138
     Date givenDate = parseTime(givenTime);
1139
     Date newDate = null;
1140
     Date now = new Date();
1141
     String currentTimeString = getTimeString(now);
1142
     Date currentTime = parseTime(currentTimeString); 
1143
     if ( currentTime.getTime() >= givenDate.getTime())
1144
     {
1145
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1146
        String dateAndTime = getDateString(now) + " " + givenTime;
1147
        Date combinationDate = parseDateTime(dateAndTime);
1148
        // new date should plus 24 hours to make is the second day
1149
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1150
     }
1151
     else
1152
     {
1153
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1154
         String dateAndTime = getDateString(now) + " " + givenTime;
1155
         newDate = parseDateTime(dateAndTime);
1156
     }
1157
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1158
     return newDate;
1159
	  } catch (ParseException pe) {
1160
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1161
				  + "parsing error: "  + pe.getMessage());
1162
	  }
1163
  }
1164

    
1165
  /*
1166
	 * parse a given string to Time in short format. For example, given time is
1167
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1168
	 */
1169
  private static Date parseTime(String timeString) throws ParseException
1170
  {
1171
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1172
    Date time = format.parse(timeString); 
1173
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1174
                              +time.toString());
1175
    return time;
1176

    
1177
  }
1178
  
1179
  /*
1180
   * Parse a given string to date and time. Date format is long and time
1181
   * format is short.
1182
   */
1183
  private static Date parseDateTime(String timeString) throws ParseException
1184
  {
1185
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1186
    Date time = format.parse(timeString);
1187
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1188
                             time.toString());
1189
    return time;
1190
  }
1191
  
1192
  /*
1193
   * Get a date string from a Date object. The date format will be long
1194
   */
1195
  private static String getDateString(Date now)
1196
  {
1197
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1198
     String s = df.format(now);
1199
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1200
     return s;
1201
  }
1202
  
1203
  /*
1204
   * Get a time string from a Date object, the time format will be short
1205
   */
1206
  private static String getTimeString(Date now)
1207
  {
1208
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1209
     String s = df.format(now);
1210
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1211
     return s;
1212
  }
1213
  
1214
  
1215
  /*
1216
	 * This method will go through the docid list both in xml_Documents table
1217
	 * and in xml_revisions table @author tao
1218
	 */
1219
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1220
		boolean dataFile = false;
1221
		for (int j = 0; j < docList.size(); j++) {
1222
			// initial dataFile is false
1223
			dataFile = false;
1224
			// w is information for one document, information contain
1225
			// docid, rev, server or datafile.
1226
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1227
			// Check if the vector w contain "datafile"
1228
			// If it has, this document is data file
1229
			try {
1230
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1231
					dataFile = true;
1232
				}
1233
			} catch (PropertyNotFoundException pnfe) {
1234
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1235
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1236
						+ "Leaving as false: " + pnfe.getMessage());
1237
			}
1238
			// logMetacat.debug("w: " + w.toString());
1239
			// Get docid
1240
			String docid = (String) w.elementAt(0);
1241
			logReplication.info("docid: " + docid);
1242
			// Get revision number
1243
			int rev = Integer.parseInt((String) w.elementAt(1));
1244
			logReplication.info("rev: " + rev);
1245
			// Get remote server name (it is may not be doc home server because
1246
			// the new hub feature
1247
			String remoteServer = (String) w.elementAt(2);
1248
			remoteServer = remoteServer.trim();
1249

    
1250
			try {
1251
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1252
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1253
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1254
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1255
				} else {
1256
					continue;
1257
				}
1258

    
1259
			} catch (Exception e) {
1260
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1261
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1262
						+ " in time replication" + e.getMessage());
1263
				continue;
1264
			}
1265
			
1266
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1267
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1268
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1269
	        }
1270
	        
1271
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1272
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1273
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1274
	        }
1275

    
1276
		}// for update docs
1277

    
1278
	}
1279
   
1280
   /*
1281
	 * This method will handle doc in xml_documents table.
1282
	 */
1283
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1284
                                        throws HandlerException
1285
   {
1286
       // compare the update rev and local rev to see what need happen
1287
       int localrev = -1;
1288
       String action = null;
1289
       boolean flag = false;
1290
       try
1291
       {
1292
    	 long docQueryStartTime = System.currentTimeMillis();
1293
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1294
         long docQueryEndTime = System.currentTimeMillis();
1295
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1296
         _xmlDocQueryCount++;
1297
       }
1298
       catch (SQLException e)
1299
       {
1300
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1301
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1302
                                " be found because " + e.getMessage());
1303
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1304
                 "written because error happend to find it's local revision");
1305
         DOCERRORNUMBER++;
1306
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1307
                 " be found: " + e.getMessage());
1308
       }
1309
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1310
                               localrev);
1311

    
1312
       //check the revs for an update because this document is in the
1313
       //local DB, it might be out of date.
1314
       if (localrev == -1)
1315
       {
1316
          // check if the revision is in the revision table
1317
    	   Vector<Integer> localRevVector = null;
1318
    	 try {
1319
        	 long revQueryStartTime = System.currentTimeMillis();
1320
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1321
             long revQueryEndTime = System.currentTimeMillis();
1322
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1323
             _xmlRevQueryCount++;
1324
    	 } catch (SQLException sqle) {
1325
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1326
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1327
    	 }
1328
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1329
         {
1330
             // this version was deleted, so don't need replicate
1331
             flag = false;
1332
         }
1333
         else
1334
         {
1335
           //insert this document as new because it is not in the local DB
1336
           action = "INSERT";
1337
           flag = true;
1338
         }
1339
       }
1340
       else
1341
       {
1342
         if(localrev == rev)
1343
         {
1344
           // Local meatacat has the same rev to remote host, don't need
1345
           // update and flag set false
1346
           flag = false;
1347
         }
1348
         else if(localrev < rev)
1349
         {
1350
           //this document needs to be updated so send an read request
1351
           action = "UPDATE";
1352
           flag = true;
1353
         }
1354
       }
1355
       
1356
       String accNumber = null;
1357
       try {
1358
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1359
       } catch (PropertyNotFoundException pnfe) {
1360
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1361
    			   + "account number separator : " + pnfe.getMessage());
1362
       }
1363
       // this is non-data file
1364
       if(flag && !dataFile)
1365
       {
1366
         try
1367
         {
1368
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1369
         }
1370
         catch(HandlerException he)
1371
         {
1372
           // skip this document
1373
           throw he;
1374
         }
1375
       }//if for non-data file
1376

    
1377
        // this is for data file
1378
       if(flag && dataFile)
1379
       {
1380
         try
1381
         {
1382
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1383
         }
1384
         catch(HandlerException he)
1385
         {
1386
           // skip this data file
1387
           throw he;
1388
         }
1389

    
1390
       }//for data file
1391
   }
1392
   
1393
   /*
1394
    * This method will handle doc in xml_documents table.
1395
    */
1396
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1397
                                        throws HandlerException
1398
   {
1399
       // compare the update rev and local rev to see what need happen
1400
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1401
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1402
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1403
       Vector<Integer> localrev = null;
1404
       String action = "INSERT";
1405
       boolean flag = false;
1406
       try
1407
       {
1408
      	 long revQueryStartTime = System.currentTimeMillis();
1409
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1410
         long revQueryEndTime = System.currentTimeMillis();
1411
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1412
         _xmlRevQueryCount++;
1413
       }
1414
       catch (SQLException sqle)
1415
       {
1416
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1417
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1418
                                " be found because " + sqle.getMessage());
1419
         REVERRORNUMBER++;
1420
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1421
        		 + sqle.getMessage());
1422
       }
1423
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1424
                               localrev.toString());
1425
       
1426
       // if the rev is not in the xml_revision, we need insert it
1427
       if (!localrev.contains(new Integer(rev)))
1428
       {
1429
           flag = true;    
1430
       }
1431
     
1432
       String accNumber = null;
1433
       try {
1434
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1435
       } catch (PropertyNotFoundException pnfe) {
1436
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1437
    			   + "account number separator : " + pnfe.getMessage());
1438
       }
1439
       // this is non-data file
1440
       if(flag && !dataFile)
1441
       {
1442
         try
1443
         {
1444
           
1445
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1446
         }
1447
         catch(HandlerException he)
1448
         {
1449
           // skip this document
1450
           throw he;
1451
         }
1452
       }//if for non-data file
1453

    
1454
        // this is for data file
1455
       if(flag && dataFile)
1456
       {
1457
         try
1458
         {
1459
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1460
         }
1461
         catch(HandlerException he)
1462
         {
1463
           // skip this data file
1464
           throw he;
1465
         }
1466

    
1467
       }//for data file
1468
   }
1469
   
1470
   /*
1471
    * Return a ip address for given url
1472
    */
1473
   private String getIpFromURL(URL url)
1474
   {
1475
	   String ip = null;
1476
	   try
1477
	   {
1478
	      InetAddress address = InetAddress.getByName(url.getHost());
1479
	      ip = address.getHostAddress();
1480
	   }
1481
	   catch(UnknownHostException e)
1482
	   {
1483
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1484
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1485
                   +e.getMessage());
1486
	   }
1487

    
1488
	   return ip;
1489
   }
1490
  
1491
}
1492

    
(4-4/8)