Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: tao $'
9
 *     '$Date: 2014-11-10 14:22:52 -0800 (Mon, 10 Nov 2014) $'
10
 * '$Revision: 8959 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.TimerTask;
44
import java.util.Vector;
45

    
46
import org.apache.log4j.Logger;
47
import org.dataone.service.types.v2.SystemMetadata;
48
import org.dataone.service.util.DateTimeMarshaller;
49
import org.dataone.service.util.TypeMarshaller;
50
import org.xml.sax.ContentHandler;
51
import org.xml.sax.ErrorHandler;
52
import org.xml.sax.InputSource;
53
import org.xml.sax.SAXException;
54
import org.xml.sax.XMLReader;
55
import org.xml.sax.helpers.DefaultHandler;
56
import org.xml.sax.helpers.XMLReaderFactory;
57

    
58
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
59
import edu.ucsb.nceas.metacat.DBUtil;
60
import edu.ucsb.nceas.metacat.DocumentImpl;
61
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
62
import edu.ucsb.nceas.metacat.EventLog;
63
import edu.ucsb.nceas.metacat.IdentifierManager;
64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
65
import edu.ucsb.nceas.metacat.SchemaLocationResolver;
66
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
67
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
68
import edu.ucsb.nceas.metacat.database.DBConnection;
69
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
70
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
71
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
72
import edu.ucsb.nceas.metacat.properties.PropertyService;
73
import edu.ucsb.nceas.metacat.shared.HandlerException;
74
import edu.ucsb.nceas.metacat.util.DocumentUtil;
75
import edu.ucsb.nceas.metacat.util.MetacatUtil;
76
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
77
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
78
import edu.ucsb.nceas.utilities.access.DocInfoHandler;
79
import edu.ucsb.nceas.utilities.access.XMLAccessDAO;
80

    
81

    
82

    
83
/**
84
 * This class handles deltaT replication checking.  Whenever this TimerTask
85
 * is fired it checks each server in xml_replication for updates and updates
86
 * the local db as needed.
87
 */
88
public class ReplicationHandler extends TimerTask
89
{
90
  int serverCheckCode = 1;
91
  ReplicationServerList serverList = null;
92
  //PrintWriter out;
93
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
94
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
95
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
96
  
97
  private static int DOCINSERTNUMBER = 1;
98
  private static int DOCERRORNUMBER  = 1;
99
  private static int REVINSERTNUMBER = 1;
100
  private static int REVERRORNUMBER  = 1;
101
  
102
  private static int _xmlDocQueryCount = 0;
103
  private static int _xmlRevQueryCount = 0;
104
  private static long _xmlDocQueryTime = 0;
105
  private static long _xmlRevQueryTime = 0;
106
  
107
  
108
  public ReplicationHandler()
109
  {
110
    //this.out = o;
111
    serverList = new ReplicationServerList();
112
  }
113

    
114
  public ReplicationHandler(int serverCheckCode)
115
  {
116
    //this.out = o;
117
    this.serverCheckCode = serverCheckCode;
118
    serverList = new ReplicationServerList();
119
  }
120

    
121
  /**
122
   * Method that implements TimerTask.run().  It runs whenever the timer is
123
   * fired.
124
   */
125
  public void run()
126
  {
127
    //find out the last_checked time of each server in the server list and
128
    //send a query to each server to see if there are any documents in
129
    //xml_documents with an update_date > last_checked
130
	  
131
      //if serverList is null, metacat don't need to replication
132
      if (serverList==null||serverList.isEmpty())
133
      {
134
        return;
135
      }
136
      updateCatalog();
137
      update();
138
      //conn.close();
139
  }
140

    
141
  /**
142
   * Method that uses revision tagging for replication instead of update_date.
143
   */
144
  private void update()
145
  {
146
	  
147
	  _xmlDocQueryCount = 0;
148
	  _xmlRevQueryCount = 0;
149
	  _xmlDocQueryTime = 0;
150
	  _xmlRevQueryTime = 0;
151
    /*
152
     Pseudo-algorithm
153
     - request a doc list from each server in xml_replication
154
     - check the rev number of each of those documents agains the
155
       documents in the local database
156
     - pull any documents that have a lesser rev number on the local server
157
       from the remote server
158
     - delete any documents that still exist in the local xml_documents but
159
       are in the deletedDocuments tag of the remote host response.
160
     - update last_checked to keep track of the last time it was checked.
161
       (this info is theoretically not needed using this system but probably
162
       should be kept anyway)
163
    */
164

    
165
    ReplicationServer replServer = null; // Variable to store the
166
                                        // ReplicationServer got from
167
                                        // Server list
168
    String server = null; // Variable to store server name
169
//    String update;
170
    Vector<InputStream> responses = new Vector<InputStream>();
171
    URL u;
172
    long replicationStartTime = System.currentTimeMillis();
173
    long timeToGetServerList = 0;
174
    
175
    //Check for every server in server list to get updated list and put
176
    // them in to response
177
    long startTimeToGetServers = System.currentTimeMillis();
178
    for (int i=0; i<serverList.size(); i++)
179
    {
180
        // Get ReplicationServer object from server list
181
        replServer = serverList.serverAt(i);
182
        // Get server name from ReplicationServer object
183
        server = replServer.getServerName().trim();
184
        InputStream result = null;
185
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
186
        // Send command to that server to get updated docid information
187
        try
188
        {
189
          u = new URL("https://" + server + "?server="
190
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
191
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
192
          result = ReplicationService.getURLStream(u);
193
        }
194
        catch (Exception e)
195
        {
196
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
197
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
198
                       "for server " + server + " because "+e.getMessage());
199
          continue;
200
        }
201

    
202
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
203
        //check if result have error or not, if has skip it.
204
        // TODO: check for error in stream
205
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
206
        if (result == null) {
207
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
208
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
209
                       "for server " + server + " because "+result);
210
          continue;
211
        }
212
        //Add result to vector
213
        responses.add(result);
214
    }
215
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
216

    
217
    //make sure that there is updated file list
218
    //If response is null, metacat don't need do anything
219
    if (responses==null || responses.isEmpty())
220
    {
221
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
222
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
223
                           "every server and failed to replicate");
224
        return;
225
    }
226

    
227

    
228
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
229
    //               "document information: "+ responses.toString());
230
    
231
    long totalServerListParseTime = 0;
232
    // go through response vector(it contains updated vector and delete vector
233
    for(int i=0; i<responses.size(); i++)
234
    {
235
    	long startServerListParseTime = System.currentTimeMillis();
236
    	XMLReader parser;
237
    	ReplMessageHandler message = new ReplMessageHandler();
238
    	try
239
        {
240
          parser = initParser(message);
241
        }
242
        catch (Exception e)
243
        {
244
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
245
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
246
                                " initParser for message and " +e.getMessage());
247
           // stop replication
248
           return;
249
        }
250
    	
251
        try
252
        {
253
          parser.parse(new InputSource(responses.elementAt(i)));
254
        }
255
        catch(Exception e)
256
        {
257
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
258
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
259
                                   "because "+ e.getMessage());
260
          continue;
261
        }
262
        //v is the list of updated documents
263
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
264
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
265
        //d is the list of deleted documents
266
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
267
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
268
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
269
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
270
        // go though every element in updated document vector
271
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
272
        //handle deleted docs
273
        for(int k=0; k<deleteList.size(); k++)
274
        { //delete the deleted documents;
275
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
276
          String docId = (String)w.elementAt(0);
277
          try
278
          {
279
            handleDeleteSingleDocument(docId, server);
280
          }
281
          catch (Exception ee)
282
          {
283
            continue;
284
          }
285
        }//for delete docs
286
        
287
        // handle replicate doc in xml_revision
288
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
289
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
290
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
291
        DOCINSERTNUMBER = 1;
292
        DOCERRORNUMBER  = 1;
293
        REVINSERTNUMBER = 1;
294
        REVERRORNUMBER  = 1;
295
        
296
        // handle system metadata
297
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
298
        for(int k = 0; k < systemMetadataList.size(); k++) { 
299
        	Vector<String> w = systemMetadataList.elementAt(k);
300
        	String guid = (String) w.elementAt(0);
301
        	String remoteserver = (String) w.elementAt(1);
302
        	try {
303
        		handleSystemMetadata(remoteserver, guid);
304
        	}
305
        	catch (Exception ee) {
306
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
307
        		continue;
308
        	}
309
        }
310
        
311
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
312
    }//for response
313

    
314
    //updated last_checked
315
    for (int i=0;i<serverList.size(); i++)
316
    {
317
       // Get ReplicationServer object from server list
318
       replServer = serverList.serverAt(i);
319
       try
320
       {
321
         updateLastCheckTimeForSingleServer(replServer);
322
       }
323
       catch(Exception e)
324
       {
325
         continue;
326
       }
327
    }//for
328
    
329
    long replicationEndTime = System.currentTimeMillis();
330
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
331
    		(replicationEndTime - replicationStartTime));
332
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
333
    		timeToGetServerList);
334
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
335
    		totalServerListParseTime);
336
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
337
    		_xmlDocQueryCount);
338
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
339
    		_xmlDocQueryTime + " ms");
340
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
341
    		_xmlRevQueryCount);
342
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
343
    		_xmlRevQueryTime + " ms");;
344

    
345
  }//update
346

    
347
  /* Handle replicate single xml document*/
348
  private void handleSingleXMLDocument(String remoteserver, String actions,
349
                                       String accNumber, String tableName)
350
               throws HandlerException
351
  {
352
    DBConnection dbConn = null;
353
    int serialNumber = -1;
354
    try
355
    {
356
      // Get DBConnection from pool
357
      dbConn=DBConnectionPool.
358
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
359
      serialNumber=dbConn.getCheckOutSerialNumber();
360
      //if the document needs to be updated or inserted, this is executed
361
      String readDocURLString = "https://" + remoteserver + "?server="+
362
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
363
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
364
      URL u = new URL(readDocURLString);
365

    
366
      // Get docid content
367
      byte[] xmlBytes = ReplicationService.getURLBytes(u);
368
      String newxmldoc = new String(xmlBytes, "UTF-8");
369
      // If couldn't get skip it
370
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
371
      {
372
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
373
      }
374
      //logReplication.info("xml documnet:");
375
      //logReplication.info(newxmldoc);
376

    
377
      // Try get the docid info from remote server
378
      DocInfoHandler dih = new DocInfoHandler();
379
      XMLReader docinfoParser = initParser(dih);
380
      String docInfoURLStr = "https://" + remoteserver +
381
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
382
                       "&action=getdocumentinfo&docid="+accNumber;
383
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
384
      URL docinfoUrl = new URL(docInfoURLStr);
385
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
386
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
387
      
388
      // strip out the system metadata portion
389
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
390
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
391
   	  SystemMetadata sysMeta = null;
392
   	  // process system metadata if we have it
393
      if (systemMetadataXML != null) {
394
    	  sysMeta = 
395
    		  TypeMarshaller.unmarshalTypeFromStream(
396
    				  SystemMetadata.class, 
397
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
398
    	  // need the guid-to-docid mapping
399
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
400
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
401
    	  }
402
      	  // save the system metadata
403
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
404
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
405
      	  
406
      }
407
   	  
408
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
409
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
410
      // Get home server of the docid
411
      String docHomeServer = docinfoHash.get("home_server");
412
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
413
     
414
      // dates
415
      String createdDateString = docinfoHash.get("date_created");
416
      String updatedDateString = docinfoHash.get("date_updated");
417
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
418
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
419
      
420
      //docid should include rev number too
421
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
422
                                              (String)docinfoHash.get("rev");*/
423
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
424
      String docType = docinfoHash.get("doctype");
425
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
426

    
427
      String parserBase = null;
428
      // this for eml2 and we need user eml2 parser
429
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
430
      {
431
         parserBase = DocumentImpl.EML200;
432
      }
433
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
434
      {
435
        parserBase = DocumentImpl.EML200;
436
      }
437
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
438
      {
439
        parserBase = DocumentImpl.EML210;
440
      }
441
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
442
      {
443
        parserBase = DocumentImpl.EML210;
444
      }
445
      // Write the document into local host
446
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
447
      String newDocid = wrapper.writeReplication(dbConn,
448
                              newxmldoc, xmlBytes,
449
                              docinfoHash.get("public_access"),
450
                              null,  /* the dtd text */
451
                              actions,
452
                              accNumber,
453
                              null, //docinfoHash.get("user_owner"),                              
454
                              null, /* null for groups[] */
455
                              docHomeServer,
456
                              remoteserver, tableName, true,// true is for time replication 
457
                              createdDate,
458
                              updatedDate);
459
      
460
      if(sysMeta != null) {
461
			// submit for indexing. When the doc writing process fails, the index process will fail as well. But this failure
462
			// will not interrupt the process.
463
			try {
464
				MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
465
			} catch (Exception ee) {
466
				logReplication.warn("ReplicationService.handleForceReplicateRequest - couldn't index the doc since "+ee.getMessage());
467
			}
468
          
469
		}
470
      
471
      //set the user information
472
      String user = (String) docinfoHash.get("user_owner");
473
      String updated = (String) docinfoHash.get("user_updated");
474
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
475
      
476
      //process extra access rules 
477
      try {
478
      	// check if we had a guid -> docid mapping
479
      	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
480
      	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
481
      	IdentifierManager.getInstance().getGUID(docid, rev);
482
      	// no need to create the mapping if we have it
483
      } catch (McdbDocNotFoundException mcdbe) {
484
      	// create mapping if we don't
485
      	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
486
      }
487
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
488
      if (xmlAccessDAOList != null) {
489
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
490
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
491
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
492
      			acfsf.insertPermissions(xmlAccessDAO);
493
      		}
494
          }
495
      }
496
      
497
      
498
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
499
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
500
      {
501
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
502
                                     " into "+tableName + " from " +
503
                                         remoteserver);
504
        DOCINSERTNUMBER++;
505
      }
506
      else
507
      {
508
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
509
                  " into "+tableName + " from " +
510
                      remoteserver);
511
          REVINSERTNUMBER++;
512
      }
513
      String ip = getIpFromURL(u);
514
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
515
      
516

    
517
    }//try
518
    catch(Exception e)
519
    {
520
        
521
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
522
        {
523
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
524
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
525
                                       " into "+tableName + " from " +
526
                                           remoteserver + " because "+e.getMessage());
527
          DOCERRORNUMBER++;
528
        }
529
        else
530
        {
531
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
532
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
533
                    " into "+tableName + " from " +
534
                        remoteserver +" because "+e.getMessage());
535
            REVERRORNUMBER++;
536
        }
537
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
538
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
539
                                      " into db because " + e.getMessage(), e);
540
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
541
    		  + "writing Replication: " +e.getMessage());
542
    }
543
    finally
544
    {
545
       //return DBConnection
546
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
547
    }//finally
548
    logMetacat.info("replication.create localId:" + accNumber);
549
  }
550

    
551

    
552

    
553
  /* Handle replicate single xml document*/
554
  private void handleSingleDataFile(String remoteserver, String actions,
555
                                    String accNumber, String tableName)
556
               throws HandlerException
557
  {
558
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
559
    DBConnection dbConn = null;
560
    int serialNumber = -1;
561
    try
562
    {
563
      // Get DBConnection from pool
564
      dbConn=DBConnectionPool.
565
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
566
      serialNumber=dbConn.getCheckOutSerialNumber();
567
      // Try get docid info from remote server
568
      DocInfoHandler dih = new DocInfoHandler();
569
      XMLReader docinfoParser = initParser(dih);
570
      String docInfoURLString = "https://" + remoteserver +
571
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
572
                  "&action=getdocumentinfo&docid="+accNumber;
573
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
574
      URL docinfoUrl = new URL(docInfoURLString);
575

    
576
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
577
      
578
      // strip out the system metadata portion
579
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
580
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
581
   	  
582
   	  // process system metadata
583
      if (systemMetadataXML != null) {
584
    	  SystemMetadata sysMeta = 
585
    		TypeMarshaller.unmarshalTypeFromStream(
586
    				  SystemMetadata.class, 
587
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
588
    	  // need the guid-to-docid mapping
589
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
590
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
591
    	  }
592
    	  // save the system metadata
593
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
594
    	  // submit for indexing
595
          MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
596

    
597
      }
598
   	  
599
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
600
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
601
      
602
      // Get docid name (such as acl or dataset)
603
      String docName = docinfoHash.get("docname");
604
      // Get doc type (eml public id)
605
      String docType = docinfoHash.get("doctype");
606
      // Get docid home sever. it might be different to remoteserver
607
      // because of hub feature
608
      String docHomeServer = docinfoHash.get("home_server");
609
      String createdDateString = docinfoHash.get("date_created");
610
      String updatedDateString = docinfoHash.get("date_updated");
611
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
612
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
613
      //docid should include rev number too
614
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
615
                                              (String)docinfoHash.get("rev");*/
616

    
617
      String datafilePath = PropertyService.getProperty("application.datafilepath");
618
      // Get data file content
619
      String readDataURLString = "https://" + remoteserver + "?server="+
620
                                        MetacatUtil.getLocalReplicationServerName()+
621
                                            "&action=readdata&docid="+accNumber;
622
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
623
      URL u = new URL(readDataURLString);
624
      InputStream input = ReplicationService.getURLStream(u);
625
      //register data file into xml_documents table and wite data file
626
      //into file system
627
      if ( input != null)
628
      {
629
        DocumentImpl.writeDataFileInReplication(input,
630
                                                datafilePath,
631
                                                docName,docType,
632
                                                accNumber,
633
                                                null,
634
                                                docHomeServer,
635
                                                remoteserver,
636
                                                tableName,
637
                                                true, //true means timed replication
638
                                                createdDate,
639
                                                updatedDate);
640
                                         
641
        //set the user information
642
        String user = (String) docinfoHash.get("user_owner");
643
		String updated = (String) docinfoHash.get("user_updated");
644
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
645
        
646
        //process extra access rules
647
        try {
648
        	// check if we had a guid -> docid mapping
649
        	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
650
        	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
651
        	IdentifierManager.getInstance().getGUID(docid, rev);
652
        	// no need to create the mapping if we have it
653
        } catch (McdbDocNotFoundException mcdbe) {
654
        	// create mapping if we don't
655
        	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
656
        }
657
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
658
        if (xmlAccessDAOList != null) {
659
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
660
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
661
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
662
        			acfsf.insertPermissions(xmlAccessDAO);
663
        		}
664
            }
665
        }
666
        
667
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
668
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
669
                                    remote server);*/
670
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
671
        {
672
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
673
                                       " into "+tableName + " from " +
674
                                           remoteserver);
675
          DOCINSERTNUMBER++;
676
        }
677
        else
678
        {
679
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
680
                    " into "+tableName + " from " +
681
                        remoteserver);
682
            REVINSERTNUMBER++;
683
        }
684
        String ip = getIpFromURL(u);
685
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
686
        
687
      }//if
688
      else
689
      {
690
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
691
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
692
      }//else
693

    
694
    }//try
695
    catch(Exception e)
696
    {
697
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
698
                                      " because " +e.getMessage());*/
699
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
700
      {
701
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
702
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
703
                                     " into " + tableName + " from " +
704
                                         remoteserver + " because " + e.getMessage());
705
        DOCERRORNUMBER++;
706
      }
707
      else
708
      {
709
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
710
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
711
                  " into " + tableName + " from " +
712
                      remoteserver +" because "+ e.getMessage());
713
          REVERRORNUMBER++;
714
      }
715
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
716
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
717
                                      " because " + e.getMessage());
718
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
719
    		  + "writing Replication: " + e.getMessage());
720
    }
721
    finally
722
    {
723
       //return DBConnection
724
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
725
    }//finally
726
    logMetacat.info("replication.create localId:" + accNumber);
727
  }
728

    
729

    
730

    
731
  /* Handle delete single document*/
732
  private void handleDeleteSingleDocument(String docId, String notifyServer)
733
               throws HandlerException
734
  {
735
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
736
    DBConnection dbConn = null;
737
    int serialNumber = -1;
738
    try
739
    {
740
      // Get DBConnection from pool
741
      dbConn=DBConnectionPool.
742
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
743
      serialNumber=dbConn.getCheckOutSerialNumber();
744
      if(!alreadyDeleted(docId))
745
      {
746

    
747
         //because delete method docid should have rev number
748
         //so we just add one for it. This rev number is no sence.
749
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
750
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
751
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
752
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
753
         URL u = new URL("https://"+notifyServer);
754
         String ip = getIpFromURL(u);
755
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
756
      }
757

    
758
    }//try
759
    catch(McdbDocNotFoundException e)
760
    {
761
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
762
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
763
                                 " in db because because " + e.getMessage());
764
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
765
    		  + "when handling document: " + e.getMessage());
766
    }
767
    catch(InsufficientKarmaException e)
768
    {
769
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
770
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
771
                                 " in db because because " + e.getMessage());
772
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
773
    		  + "when handling document: " + e.getMessage());
774
    }
775
    catch(SQLException e)
776
    {
777
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
778
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
779
                                 " in db because because " + e.getMessage());
780
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
781
    		  + "when handling document: " + e.getMessage());
782
    }
783
    catch(Exception e)
784
    {
785
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
786
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
787
                                 " in db because because " + e.getMessage());
788
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
789
    		  + "when handling document: " + e.getMessage());
790
    }
791
    finally
792
    {
793
       //return DBConnection
794
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
795
    }//finally
796
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
797
  }
798

    
799
  /* Handle updateLastCheckTimForSingleServer*/
800
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
801
                                                  throws HandlerException
802
  {
803
    String server = repServer.getServerName();
804
    DBConnection dbConn = null;
805
    int serialNumber = -1;
806
    PreparedStatement pstmt = null;
807
    try
808
    {
809
      // Get DBConnection from pool
810
      dbConn=DBConnectionPool.
811
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
812
      serialNumber=dbConn.getCheckOutSerialNumber();
813

    
814
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
815
      // Get time from remote server
816
      URL dateurl = new URL("https://" + server + "?server="+
817
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
818
      String datexml = ReplicationService.getURLContent(dateurl);
819
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
820
      if (datexml != null && !datexml.equals("")) {
821
    	  
822
    	  // parse the ISO datetime
823
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
824
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
825
         
826
         StringBuffer sql = new StringBuffer();
827
         sql.append("update xml_replication set last_checked = ? ");
828
         sql.append(" where server like ? ");
829
         pstmt = dbConn.prepareStatement(sql.toString());
830
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
831
         pstmt.setString(2, server);
832
         
833
         pstmt.executeUpdate();
834
         dbConn.commit();
835
         pstmt.close();
836
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
837
                                      + server);
838
      }//if
839
      else
840
      {
841

    
842
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
843
                                  server + " in db because couldn't get time "
844
                                  );
845
         throw new Exception("Couldn't get time for server "+ server);
846
      }
847

    
848
    }//try
849
    catch(Exception e)
850
    {
851
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
852
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
853
                                server + " in db because because " + e.getMessage());
854
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
855
    		  + "Error updating last checked time: " + e.getMessage());
856
    }
857
    finally
858
    {
859
       //return DBConnection
860
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
861
    }//finally
862
  }
863
  
864
  	/**
865
	 * Handle replicate system metadata
866
	 * 
867
	 * @param remoteserver
868
	 * @param guid
869
	 * @throws HandlerException
870
	 */
871
	private void handleSystemMetadata(String remoteserver, String guid) 
872
		throws HandlerException {
873
		try {
874

    
875
			// Try get the system metadata from remote server
876
			String sysMetaURLStr = "https://" + remoteserver + "?server="
877
					+ MetacatUtil.getLocalReplicationServerName()
878
					+ "&action=getsystemmetadata&guid=" + guid;
879
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
880
			URL sysMetaUrl = new URL(sysMetaURLStr);
881
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
882
							+ sysMetaUrl.toString());
883
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
884

    
885
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
886

    
887
			// process system metadata
888
			if (systemMetadataXML != null) {
889
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
890
								new ByteArrayInputStream(systemMetadataXML
891
										.getBytes("UTF-8")));
892
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
893
				// submit for indexing
894
                MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
895
			}
896

    
897
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
898
							+ guid);
899

    
900
			String ip = getIpFromURL(sysMetaUrl);
901
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
902

    
903
		} catch (Exception e) {
904
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
905
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
906
			logReplication
907
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
908
							+ guid + " into db because " + e.getMessage());
909
			throw new HandlerException(
910
					"ReplicationHandler.handleSystemMetadata - generic exception "
911
							+ "writing Replication: " + e.getMessage());
912
		}
913

    
914
	}
915

    
916
  /**
917
   * updates xml_catalog with entries from other servers.
918
   */
919
  private void updateCatalog()
920
  {
921
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
922
    // ReplicationServer object in server list
923
    ReplicationServer replServer = null;
924
    PreparedStatement pstmt = null;
925
    String server = null;
926

    
927

    
928
    // Go through each ReplicationServer object in sererlist
929
    for (int j=0; j<serverList.size(); j++)
930
    {
931
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
932
      Vector<String> publicId = new Vector<String>();
933
      try
934
      {
935
        // Get ReplicationServer object from server list
936
        replServer = serverList.serverAt(j);
937
        // Get server name from the ReplicationServer object
938
        server = replServer.getServerName();
939
        // Try to get catalog
940
        URL u = new URL("https://" + server + "?server="+
941
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
942
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
943
        String catxml = ReplicationService.getURLContent(u);
944

    
945
        // Make sure there are not error, no empty string
946
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
947
        {
948
          throw new Exception("Couldn't get catalog list form server " +server);
949
        }
950
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
951
        CatalogMessageHandler cmh = new CatalogMessageHandler();
952
        XMLReader catparser = initParser(cmh);
953
        catparser.parse(new InputSource(new StringReader(catxml)));
954
        //parse the returned catalog xml and put it into a vector
955
        remoteCatalog = cmh.getCatalogVect();
956

    
957
        // Make sure remoteCatalog is not empty
958
        if (remoteCatalog.isEmpty())
959
        {
960
          throw new Exception("Couldn't get catalog list form server " +server);
961
        }
962

    
963
        String localcatxml = ReplicationService.getCatalogXML();
964

    
965
        // Make sure local catalog is no empty
966
        if (localcatxml==null||localcatxml.equals(""))
967
        {
968
          throw new Exception("Couldn't get catalog list form server " +server);
969
        }
970

    
971
        cmh = new CatalogMessageHandler();
972
        catparser = initParser(cmh);
973
        catparser.parse(new InputSource(new StringReader(localcatxml)));
974
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
975

    
976
        //now we have the catalog from the remote server and this local server
977
        //we now need to compare the two and merge the differences.
978
        //the comparison is base on the public_id fields which is the 4th
979
        //entry in each row vector.
980
        publicId = new Vector<String>();
981
        for(int i=0; i<localCatalog.size(); i++)
982
        {
983
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
984
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
985
          publicId.add(new String((String)v.elementAt(3)));
986
        }
987
      }//try
988
      catch (Exception e)
989
      {
990
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
991
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
992
                                    server + " because " +e.getMessage());
993
      }//catch
994

    
995
      for(int i=0; i<remoteCatalog.size(); i++)
996
      {
997
         // DConnection
998
        DBConnection dbConn = null;
999
        // DBConnection checkout serial number
1000
        int serialNumber = -1;
1001
        try
1002
        {
1003
            dbConn=DBConnectionPool.
1004
                  getDBConnection("ReplicationHandler.updateCatalog");
1005
            serialNumber=dbConn.getCheckOutSerialNumber();
1006
            Vector<String> v = remoteCatalog.elementAt(i);
1007
            //logMetacat.debug("v2: " + v.toString());
1008
            //logMetacat.debug("i: " + i);
1009
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
1010
            //logMetacat.debug("publicID: " + publicId.toString());
1011
            logReplication.info
1012
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
1013
           if(!publicId.contains(v.elementAt(3)))
1014
           { //so we don't have this public id in our local table so we need to
1015
             //add it.
1016
        	   
1017
        	   // check where it is pointing first, before adding
1018
        	   String entryType = (String)v.elementAt(0);
1019
        	   if (entryType.equals(DocumentImpl.SCHEMA)) {
1020
	        	   String nameSpace = (String)v.elementAt(3);
1021
	        	   String schemaLocation = (String)v.elementAt(4);
1022
	        	   SchemaLocationResolver slr = new SchemaLocationResolver(nameSpace, schemaLocation);
1023
	        	   try {
1024
	        		   slr.resolveNameSpace();
1025
	        	   } catch (Exception e) {
1026
	        		   String msg = "Could not save remote schema to xml catalog. " + "nameSpace: " + nameSpace + " location: " + schemaLocation;
1027
	        		   logMetacat.error(msg, e);
1028
	        		   logReplication.error(msg, e);
1029
	        	   }
1030
	        	   // skip whatever else we were going to do
1031
	        	   continue;
1032
        	   }
1033
        	   
1034
             //logMetacat.debug("in if");
1035
             StringBuffer sql = new StringBuffer();
1036
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
1037
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
1038
             sql.append("?,?)");
1039
             //logMetacat.debug("sql: " + sql.toString());
1040
             pstmt = dbConn.prepareStatement(sql.toString());
1041
             pstmt.setString(1, (String)v.elementAt(0));
1042
             pstmt.setString(2, (String)v.elementAt(1));
1043
             pstmt.setString(3, (String)v.elementAt(2));
1044
             pstmt.setString(4, (String)v.elementAt(3));
1045
             pstmt.setString(5, (String)v.elementAt(4));
1046
             pstmt.execute();
1047
             pstmt.close();
1048
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
1049
                               (String)v.elementAt(3) + " from server"+server);
1050
           }
1051
        }
1052
        catch(Exception e)
1053
        {
1054
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1055
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1056
                                    server + " because " +e.getMessage());
1057
        }//catch
1058
        finally
1059
        {
1060
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1061
        }//finally
1062
      }//for remote catalog
1063
    }//for server list
1064
    logReplication.info("End of updateCatalog");
1065
  }
1066

    
1067
  /**
1068
   * Method that returns true if docid has already been "deleted" from metacat.
1069
   * This method really implements a truth table for deleted documents
1070
   * The table is (a docid in one of the tables is represented by the X):
1071
   * xml_docs      xml_revs      deleted?
1072
   * ------------------------------------
1073
   *   X             X             FALSE
1074
   *   X             _             FALSE
1075
   *   _             X             TRUE
1076
   *   _             _             TRUE
1077
   */
1078
  private static boolean alreadyDeleted(String docid) throws HandlerException
1079
  {
1080
    DBConnection dbConn = null;
1081
    int serialNumber = -1;
1082
    PreparedStatement pstmt = null;
1083
    try
1084
    {
1085
      dbConn=DBConnectionPool.
1086
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1087
      serialNumber=dbConn.getCheckOutSerialNumber();
1088
      boolean xml_docs = false;
1089
      boolean xml_revs = false;
1090

    
1091
      StringBuffer sb = new StringBuffer();
1092
      sb.append("select docid from xml_revisions where docid like ? ");
1093
      pstmt = dbConn.prepareStatement(sb.toString());
1094
      pstmt.setString(1, docid);
1095
      pstmt.execute();
1096
      ResultSet rs = pstmt.getResultSet();
1097
      boolean tablehasrows = rs.next();
1098
      if(tablehasrows)
1099
      {
1100
        xml_revs = true;
1101
      }
1102

    
1103
      sb = new StringBuffer();
1104
      sb.append("select docid from xml_documents where docid like '");
1105
      sb.append(docid).append("'");
1106
      pstmt.close();
1107
      pstmt = dbConn.prepareStatement(sb.toString());
1108
      //increase usage count
1109
      dbConn.increaseUsageCount(1);
1110
      pstmt.execute();
1111
      rs = pstmt.getResultSet();
1112
      tablehasrows = rs.next();
1113
      pstmt.close();
1114
      if(tablehasrows)
1115
      {
1116
        xml_docs = true;
1117
      }
1118

    
1119
      if(xml_docs && xml_revs)
1120
      {
1121
        return false;
1122
      }
1123
      else if(xml_docs && !xml_revs)
1124
      {
1125
        return false;
1126
      }
1127
      else if(!xml_docs && xml_revs)
1128
      {
1129
        return true;
1130
      }
1131
      else if(!xml_docs && !xml_revs)
1132
      {
1133
        return true;
1134
      }
1135
    }
1136
    catch(Exception e)
1137
    {
1138
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1139
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1140
                          e.getMessage());
1141
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1142
    		  + e.getMessage());
1143
    }
1144
    finally
1145
    {
1146
      try
1147
      {
1148
        pstmt.close();
1149
      }//try
1150
      catch (SQLException ee)
1151
      {
1152
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1153
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1154
                          "to close pstmt: "+ee.getMessage());
1155
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1156
      		  + ee.getMessage());
1157
      }//catch
1158
      finally
1159
      {
1160
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1161
      }//finally
1162
    }//finally
1163
    return false;
1164
  }
1165

    
1166

    
1167
  /**
1168
   * Method to initialize the message parser
1169
   */
1170
  public static XMLReader initParser(DefaultHandler dh)
1171
          throws HandlerException
1172
  {
1173
    XMLReader parser = null;
1174

    
1175
    try {
1176
      ContentHandler chandler = dh;
1177

    
1178
      // Get an instance of the parser
1179
      String parserName = PropertyService.getProperty("xml.saxparser");
1180
      parser = XMLReaderFactory.createXMLReader(parserName);
1181

    
1182
      // Turn off validation
1183
      parser.setFeature("http://xml.org/sax/features/validation", false);
1184

    
1185
      parser.setContentHandler((ContentHandler)chandler);
1186
      parser.setErrorHandler((ErrorHandler)chandler);
1187

    
1188
    } catch (SAXException se) {
1189
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1190
    		  + " initializing parser: " + se.getMessage());
1191
    } catch (PropertyNotFoundException pnfe) {
1192
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1193
      		  + " getting parser name: " + pnfe.getMessage());
1194
    } 
1195

    
1196
    return parser;
1197
  }
1198

    
1199
  /**
1200
	 * This method will combine given time string(in short format) to current
1201
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1202
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1203
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1204
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1205
	 * 
1206
	 * @param givenTime
1207
	 *            the format should be "10:00 AM " or "2:00 PM"
1208
	 * @return
1209
	 * @throws Exception
1210
	 */
1211
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1212
  {
1213
	  try {
1214
     Date givenDate = parseTime(givenTime);
1215
     Date newDate = null;
1216
     Date now = new Date();
1217
     String currentTimeString = getTimeString(now);
1218
     Date currentTime = parseTime(currentTimeString); 
1219
     if ( currentTime.getTime() >= givenDate.getTime())
1220
     {
1221
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1222
        String dateAndTime = getDateString(now) + " " + givenTime;
1223
        Date combinationDate = parseDateTime(dateAndTime);
1224
        // new date should plus 24 hours to make is the second day
1225
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1226
     }
1227
     else
1228
     {
1229
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1230
         String dateAndTime = getDateString(now) + " " + givenTime;
1231
         newDate = parseDateTime(dateAndTime);
1232
     }
1233
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1234
     return newDate;
1235
	  } catch (ParseException pe) {
1236
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1237
				  + "parsing error: "  + pe.getMessage());
1238
	  }
1239
  }
1240

    
1241
  /*
1242
	 * parse a given string to Time in short format. For example, given time is
1243
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1244
	 */
1245
  private static Date parseTime(String timeString) throws ParseException
1246
  {
1247
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1248
    Date time = format.parse(timeString); 
1249
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1250
                              +time.toString());
1251
    return time;
1252

    
1253
  }
1254
  
1255
  /*
1256
   * Parse a given string to date and time. Date format is long and time
1257
   * format is short.
1258
   */
1259
  private static Date parseDateTime(String timeString) throws ParseException
1260
  {
1261
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1262
    Date time = format.parse(timeString);
1263
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1264
                             time.toString());
1265
    return time;
1266
  }
1267
  
1268
  /*
1269
   * Get a date string from a Date object. The date format will be long
1270
   */
1271
  private static String getDateString(Date now)
1272
  {
1273
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1274
     String s = df.format(now);
1275
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1276
     return s;
1277
  }
1278
  
1279
  /*
1280
   * Get a time string from a Date object, the time format will be short
1281
   */
1282
  private static String getTimeString(Date now)
1283
  {
1284
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1285
     String s = df.format(now);
1286
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1287
     return s;
1288
  }
1289
  
1290
  
1291
  /*
1292
	 * This method will go through the docid list both in xml_Documents table
1293
	 * and in xml_revisions table @author tao
1294
	 */
1295
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1296
		boolean dataFile = false;
1297
		for (int j = 0; j < docList.size(); j++) {
1298
			// initial dataFile is false
1299
			dataFile = false;
1300
			// w is information for one document, information contain
1301
			// docid, rev, server or datafile.
1302
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1303
			// Check if the vector w contain "datafile"
1304
			// If it has, this document is data file
1305
			try {
1306
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1307
					dataFile = true;
1308
				}
1309
			} catch (PropertyNotFoundException pnfe) {
1310
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1311
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1312
						+ "Leaving as false: " + pnfe.getMessage());
1313
			}
1314
			// logMetacat.debug("w: " + w.toString());
1315
			// Get docid
1316
			String docid = (String) w.elementAt(0);
1317
			logReplication.info("docid: " + docid);
1318
			// Get revision number
1319
			int rev = Integer.parseInt((String) w.elementAt(1));
1320
			logReplication.info("rev: " + rev);
1321
			// Get remote server name (it is may not be doc home server because
1322
			// the new hub feature
1323
			String remoteServer = (String) w.elementAt(2);
1324
			remoteServer = remoteServer.trim();
1325

    
1326
			try {
1327
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1328
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1329
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1330
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1331
				} else {
1332
					continue;
1333
				}
1334

    
1335
			} catch (Exception e) {
1336
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1337
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1338
						+ " in time replication" + e.getMessage(), e);
1339
				continue;
1340
			}
1341
			
1342
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1343
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1344
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1345
	        }
1346
	        
1347
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1348
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1349
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1350
	        }
1351

    
1352
		}// for update docs
1353

    
1354
	}
1355
   
1356
   /*
1357
	 * This method will handle doc in xml_documents table.
1358
	 */
1359
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1360
                                        throws HandlerException
1361
   {
1362
       // compare the update rev and local rev to see what need happen
1363
       int localrev = -1;
1364
       String action = null;
1365
       boolean flag = false;
1366
       try
1367
       {
1368
    	 long docQueryStartTime = System.currentTimeMillis();
1369
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1370
         long docQueryEndTime = System.currentTimeMillis();
1371
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1372
         _xmlDocQueryCount++;
1373
       }
1374
       catch (SQLException e)
1375
       {
1376
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1377
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1378
                                " be found because " + e.getMessage());
1379
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1380
                 "written because error happend to find it's local revision");
1381
         DOCERRORNUMBER++;
1382
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1383
                 " be found: " + e.getMessage());
1384
       }
1385
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1386
                               localrev);
1387

    
1388
       //check the revs for an update because this document is in the
1389
       //local DB, it might be out of date.
1390
       if (localrev == -1)
1391
       {
1392
          // check if the revision is in the revision table
1393
    	   Vector<Integer> localRevVector = null;
1394
    	 try {
1395
        	 long revQueryStartTime = System.currentTimeMillis();
1396
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1397
             long revQueryEndTime = System.currentTimeMillis();
1398
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1399
             _xmlRevQueryCount++;
1400
    	 } catch (SQLException sqle) {
1401
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1402
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1403
    	 }
1404
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1405
         {
1406
             // this version was deleted, so don't need replicate
1407
             flag = false;
1408
         }
1409
         else
1410
         {
1411
           //insert this document as new because it is not in the local DB
1412
           action = "INSERT";
1413
           flag = true;
1414
         }
1415
       }
1416
       else
1417
       {
1418
         if(localrev == rev)
1419
         {
1420
           // Local meatacat has the same rev to remote host, don't need
1421
           // update and flag set false
1422
           flag = false;
1423
         }
1424
         else if(localrev < rev)
1425
         {
1426
           //this document needs to be updated so send an read request
1427
           action = "UPDATE";
1428
           flag = true;
1429
         }
1430
       }
1431
       
1432
       String accNumber = null;
1433
       try {
1434
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1435
       } catch (PropertyNotFoundException pnfe) {
1436
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1437
    			   + "account number separator : " + pnfe.getMessage());
1438
       }
1439
       // this is non-data file
1440
       if(flag && !dataFile)
1441
       {
1442
         try
1443
         {
1444
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1445
         }
1446
         catch(HandlerException he)
1447
         {
1448
           // skip this document
1449
           throw he;
1450
         }
1451
       }//if for non-data file
1452

    
1453
        // this is for data file
1454
       if(flag && dataFile)
1455
       {
1456
         try
1457
         {
1458
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1459
         }
1460
         catch(HandlerException he)
1461
         {
1462
           // skip this data file
1463
           throw he;
1464
         }
1465

    
1466
       }//for data file
1467
   }
1468
   
1469
   /*
1470
    * This method will handle doc in xml_documents table.
1471
    */
1472
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1473
                                        throws HandlerException
1474
   {
1475
       // compare the update rev and local rev to see what need happen
1476
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1477
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1478
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1479
       Vector<Integer> localrev = null;
1480
       String action = "INSERT";
1481
       boolean flag = false;
1482
       try
1483
       {
1484
      	 long revQueryStartTime = System.currentTimeMillis();
1485
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1486
         long revQueryEndTime = System.currentTimeMillis();
1487
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1488
         _xmlRevQueryCount++;
1489
       }
1490
       catch (SQLException sqle)
1491
       {
1492
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1493
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1494
                                " be found because " + sqle.getMessage());
1495
         REVERRORNUMBER++;
1496
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1497
        		 + sqle.getMessage());
1498
       }
1499
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1500
                               localrev.toString());
1501
       
1502
       // if the rev is not in the xml_revision, we need insert it
1503
       if (!localrev.contains(new Integer(rev)))
1504
       {
1505
           flag = true;    
1506
       }
1507
     
1508
       String accNumber = null;
1509
       try {
1510
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1511
       } catch (PropertyNotFoundException pnfe) {
1512
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1513
    			   + "account number separator : " + pnfe.getMessage());
1514
       }
1515
       // this is non-data file
1516
       if(flag && !dataFile)
1517
       {
1518
         try
1519
         {
1520
           
1521
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1522
         }
1523
         catch(HandlerException he)
1524
         {
1525
           // skip this document
1526
           throw he;
1527
         }
1528
       }//if for non-data file
1529

    
1530
        // this is for data file
1531
       if(flag && dataFile)
1532
       {
1533
         try
1534
         {
1535
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1536
         }
1537
         catch(HandlerException he)
1538
         {
1539
           // skip this data file
1540
           throw he;
1541
         }
1542

    
1543
       }//for data file
1544
   }
1545
   
1546
   /*
1547
    * Return a ip address for given url
1548
    */
1549
   private String getIpFromURL(URL url)
1550
   {
1551
	   String ip = null;
1552
	   try
1553
	   {
1554
	      InetAddress address = InetAddress.getByName(url.getHost());
1555
	      ip = address.getHostAddress();
1556
	   }
1557
	   catch(UnknownHostException e)
1558
	   {
1559
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1560
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1561
                   +e.getMessage());
1562
	   }
1563

    
1564
	   return ip;
1565
   }
1566
  
1567
}
1568

    
(3-3/7)