Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: rnahf $'
9
 *     '$Date: 2015-03-24 13:14:09 -0700 (Tue, 24 Mar 2015) $'
10
 * '$Revision: 9156 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.TimerTask;
44
import java.util.Vector;
45

    
46
import org.apache.commons.io.IOUtils;
47
import org.apache.log4j.Logger;
48
import org.dataone.service.types.v2.SystemMetadata;
49
import org.dataone.service.util.DateTimeMarshaller;
50
import org.dataone.service.util.TypeMarshaller;
51
import org.xml.sax.ContentHandler;
52
import org.xml.sax.ErrorHandler;
53
import org.xml.sax.InputSource;
54
import org.xml.sax.SAXException;
55
import org.xml.sax.XMLReader;
56
import org.xml.sax.helpers.DefaultHandler;
57
import org.xml.sax.helpers.XMLReaderFactory;
58

    
59
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
60
import edu.ucsb.nceas.metacat.DBUtil;
61
import edu.ucsb.nceas.metacat.DocumentImpl;
62
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
63
import edu.ucsb.nceas.metacat.EventLog;
64
import edu.ucsb.nceas.metacat.IdentifierManager;
65
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
66
import edu.ucsb.nceas.metacat.SchemaLocationResolver;
67
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
68
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
69
import edu.ucsb.nceas.metacat.database.DBConnection;
70
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
71
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
72
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
73
import edu.ucsb.nceas.metacat.properties.PropertyService;
74
import edu.ucsb.nceas.metacat.shared.HandlerException;
75
import edu.ucsb.nceas.metacat.util.DocumentUtil;
76
import edu.ucsb.nceas.metacat.util.MetacatUtil;
77
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
78
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
79
import edu.ucsb.nceas.utilities.access.DocInfoHandler;
80
import edu.ucsb.nceas.utilities.access.XMLAccessDAO;
81

    
82

    
83

    
84
/**
85
 * This class handles deltaT replication checking.  Whenever this TimerTask
86
 * is fired it checks each server in xml_replication for updates and updates
87
 * the local db as needed.
88
 */
89
public class ReplicationHandler extends TimerTask
90
{
91
  int serverCheckCode = 1;
92
  ReplicationServerList serverList = null;
93
  //PrintWriter out;
94
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
95
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
96
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
97
  
98
  private static int DOCINSERTNUMBER = 1;
99
  private static int DOCERRORNUMBER  = 1;
100
  private static int REVINSERTNUMBER = 1;
101
  private static int REVERRORNUMBER  = 1;
102
  
103
  private static int _xmlDocQueryCount = 0;
104
  private static int _xmlRevQueryCount = 0;
105
  private static long _xmlDocQueryTime = 0;
106
  private static long _xmlRevQueryTime = 0;
107
  
108
  
109
  public ReplicationHandler()
110
  {
111
    //this.out = o;
112
    serverList = new ReplicationServerList();
113
  }
114

    
115
  public ReplicationHandler(int serverCheckCode)
116
  {
117
    //this.out = o;
118
    this.serverCheckCode = serverCheckCode;
119
    serverList = new ReplicationServerList();
120
  }
121

    
122
  /**
123
   * Method that implements TimerTask.run().  It runs whenever the timer is
124
   * fired.
125
   */
126
  public void run()
127
  {
128
    //find out the last_checked time of each server in the server list and
129
    //send a query to each server to see if there are any documents in
130
    //xml_documents with an update_date > last_checked
131
	  
132
      //if serverList is null, metacat don't need to replication
133
      if (serverList==null||serverList.isEmpty())
134
      {
135
        return;
136
      }
137
      updateCatalog();
138
      update();
139
      //conn.close();
140
  }
141

    
142
  /**
143
   * Method that uses revision tagging for replication instead of update_date.
144
   */
145
  private void update()
146
  {
147
	  
148
	  _xmlDocQueryCount = 0;
149
	  _xmlRevQueryCount = 0;
150
	  _xmlDocQueryTime = 0;
151
	  _xmlRevQueryTime = 0;
152
    /*
153
     Pseudo-algorithm
154
     - request a doc list from each server in xml_replication
155
     - check the rev number of each of those documents agains the
156
       documents in the local database
157
     - pull any documents that have a lesser rev number on the local server
158
       from the remote server
159
     - delete any documents that still exist in the local xml_documents but
160
       are in the deletedDocuments tag of the remote host response.
161
     - update last_checked to keep track of the last time it was checked.
162
       (this info is theoretically not needed using this system but probably
163
       should be kept anyway)
164
    */
165

    
166
    ReplicationServer replServer = null; // Variable to store the
167
                                        // ReplicationServer got from
168
                                        // Server list
169
    String server = null; // Variable to store server name
170
//    String update;
171
    Vector<InputStream> responses = new Vector<InputStream>();
172
    URL u;
173
    long replicationStartTime = System.currentTimeMillis();
174
    long timeToGetServerList = 0;
175
    
176
    //Check for every server in server list to get updated list and put
177
    // them in to response
178
    long startTimeToGetServers = System.currentTimeMillis();
179
    for (int i=0; i<serverList.size(); i++)
180
    {
181
        // Get ReplicationServer object from server list
182
        replServer = serverList.serverAt(i);
183
        // Get server name from ReplicationServer object
184
        server = replServer.getServerName().trim();
185
        InputStream result = null;
186
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
187
        // Send command to that server to get updated docid information
188
        try
189
        {
190
          u = new URL("https://" + server + "?server="
191
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
192
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
193
          result = ReplicationService.getURLStream(u);
194
        }
195
        catch (Exception e)
196
        {
197
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
198
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
199
                       "for server " + server + " because "+e.getMessage());
200
          continue;
201
        }
202

    
203
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
204
        //check if result have error or not, if has skip it.
205
        // TODO: check for error in stream
206
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
207
        if (result == null) {
208
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
209
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
210
                       "for server " + server + " because "+result);
211
          continue;
212
        }
213
        //Add result to vector
214
        responses.add(result);
215
    }
216
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
217

    
218
    //make sure that there is updated file list
219
    //If response is null, metacat don't need do anything
220
    if (responses==null || responses.isEmpty())
221
    {
222
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
223
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
224
                           "every server and failed to replicate");
225
        return;
226
    }
227

    
228

    
229
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
230
    //               "document information: "+ responses.toString());
231
    
232
    long totalServerListParseTime = 0;
233
    // go through response vector(it contains updated vector and delete vector
234
    for(int i=0; i<responses.size(); i++)
235
    {
236
    	long startServerListParseTime = System.currentTimeMillis();
237
    	XMLReader parser;
238
    	ReplMessageHandler message = new ReplMessageHandler();
239
    	try
240
        {
241
          parser = initParser(message);
242
        }
243
        catch (Exception e)
244
        {
245
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
246
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
247
                                " initParser for message and " +e.getMessage());
248
           // stop replication
249
           return;
250
        }
251
    	
252
        try
253
        {
254
          parser.parse(new InputSource(responses.elementAt(i)));
255
        }
256
        catch(Exception e)
257
        {
258
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
259
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
260
                                   "because "+ e.getMessage());
261
          continue;
262
        }
263
        finally 
264
        {
265
            IOUtils.closeQuietly(responses.elementAt(i));
266
        }
267
        //v is the list of updated documents
268
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
269
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
270
        //d is the list of deleted documents
271
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
272
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
273
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
274
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
275
        // go though every element in updated document vector
276
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
277
        //handle deleted docs
278
        for(int k=0; k<deleteList.size(); k++)
279
        { //delete the deleted documents;
280
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
281
          String docId = (String)w.elementAt(0);
282
          try
283
          {
284
            handleDeleteSingleDocument(docId, server);
285
          }
286
          catch (Exception ee)
287
          {
288
            continue;
289
          }
290
        }//for delete docs
291
        
292
        // handle replicate doc in xml_revision
293
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
294
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
295
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
296
        DOCINSERTNUMBER = 1;
297
        DOCERRORNUMBER  = 1;
298
        REVINSERTNUMBER = 1;
299
        REVERRORNUMBER  = 1;
300
        
301
        // handle system metadata
302
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
303
        for(int k = 0; k < systemMetadataList.size(); k++) { 
304
        	Vector<String> w = systemMetadataList.elementAt(k);
305
        	String guid = (String) w.elementAt(0);
306
        	String remoteserver = (String) w.elementAt(1);
307
        	try {
308
        		handleSystemMetadata(remoteserver, guid);
309
        	}
310
        	catch (Exception ee) {
311
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
312
        		continue;
313
        	}
314
        }
315
        
316
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
317
    }//for response
318

    
319
    //updated last_checked
320
    for (int i=0;i<serverList.size(); i++)
321
    {
322
       // Get ReplicationServer object from server list
323
       replServer = serverList.serverAt(i);
324
       try
325
       {
326
         updateLastCheckTimeForSingleServer(replServer);
327
       }
328
       catch(Exception e)
329
       {
330
         continue;
331
       }
332
    }//for
333
    
334
    long replicationEndTime = System.currentTimeMillis();
335
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
336
    		(replicationEndTime - replicationStartTime));
337
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
338
    		timeToGetServerList);
339
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
340
    		totalServerListParseTime);
341
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
342
    		_xmlDocQueryCount);
343
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
344
    		_xmlDocQueryTime + " ms");
345
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
346
    		_xmlRevQueryCount);
347
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
348
    		_xmlRevQueryTime + " ms");;
349

    
350
  }//update
351

    
352
  /* Handle replicate single xml document*/
353
  private void handleSingleXMLDocument(String remoteserver, String actions,
354
                                       String accNumber, String tableName)
355
               throws HandlerException
356
  {
357
    DBConnection dbConn = null;
358
    int serialNumber = -1;
359
    try
360
    {
361
      // Get DBConnection from pool
362
      dbConn=DBConnectionPool.
363
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
364
      serialNumber=dbConn.getCheckOutSerialNumber();
365
      //if the document needs to be updated or inserted, this is executed
366
      String readDocURLString = "https://" + remoteserver + "?server="+
367
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
368
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
369
      URL u = new URL(readDocURLString);
370

    
371
      // Get docid content
372
      byte[] xmlBytes = ReplicationService.getURLBytes(u);
373
      String newxmldoc = new String(xmlBytes, "UTF-8");
374
      // If couldn't get skip it
375
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
376
      {
377
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
378
      }
379
      //logReplication.info("xml documnet:");
380
      //logReplication.info(newxmldoc);
381

    
382
      // Try get the docid info from remote server
383
      DocInfoHandler dih = new DocInfoHandler();
384
      XMLReader docinfoParser = initParser(dih);
385
      String docInfoURLStr = "https://" + remoteserver +
386
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
387
                       "&action=getdocumentinfo&docid="+accNumber;
388
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
389
      URL docinfoUrl = new URL(docInfoURLStr);
390
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
391
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
392
      
393
      // strip out the system metadata portion
394
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
395
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
396
   	  SystemMetadata sysMeta = null;
397
   	  // process system metadata if we have it
398
      if (systemMetadataXML != null) {
399
    	  sysMeta = 
400
    		  TypeMarshaller.unmarshalTypeFromStream(
401
    				  SystemMetadata.class, 
402
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
403
    	  // need the guid-to-docid mapping
404
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
405
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
406
    	  }
407
      	  // save the system metadata
408
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
409
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
410
      	  
411
      }
412
   	  
413
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
414
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
415
      // Get home server of the docid
416
      String docHomeServer = docinfoHash.get("home_server");
417
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
418
     
419
      // dates
420
      String createdDateString = docinfoHash.get("date_created");
421
      String updatedDateString = docinfoHash.get("date_updated");
422
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
423
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
424
      
425
      //docid should include rev number too
426
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
427
                                              (String)docinfoHash.get("rev");*/
428
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
429
      String docType = docinfoHash.get("doctype");
430
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
431

    
432
      String parserBase = null;
433
      // this for eml2 and we need user eml2 parser
434
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
435
      {
436
         parserBase = DocumentImpl.EML200;
437
      }
438
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
439
      {
440
        parserBase = DocumentImpl.EML200;
441
      }
442
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
443
      {
444
        parserBase = DocumentImpl.EML210;
445
      }
446
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
447
      {
448
        parserBase = DocumentImpl.EML210;
449
      }
450
      // Write the document into local host
451
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
452
      String newDocid = wrapper.writeReplication(dbConn,
453
                              newxmldoc, xmlBytes,
454
                              docinfoHash.get("public_access"),
455
                              null,  /* the dtd text */
456
                              actions,
457
                              accNumber,
458
                              null, //docinfoHash.get("user_owner"),                              
459
                              null, /* null for groups[] */
460
                              docHomeServer,
461
                              remoteserver, tableName, true,// true is for time replication 
462
                              createdDate,
463
                              updatedDate);
464
      
465
      if(sysMeta != null) {
466
			// submit for indexing. When the doc writing process fails, the index process will fail as well. But this failure
467
			// will not interrupt the process.
468
			try {
469
				MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
470
			} catch (Exception ee) {
471
				logReplication.warn("ReplicationService.handleForceReplicateRequest - couldn't index the doc since "+ee.getMessage());
472
			}
473
          
474
		}
475
      
476
      //set the user information
477
      String user = (String) docinfoHash.get("user_owner");
478
      String updated = (String) docinfoHash.get("user_updated");
479
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
480
      
481
      //process extra access rules 
482
      try {
483
      	// check if we had a guid -> docid mapping
484
      	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
485
      	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
486
      	IdentifierManager.getInstance().getGUID(docid, rev);
487
      	// no need to create the mapping if we have it
488
      } catch (McdbDocNotFoundException mcdbe) {
489
      	// create mapping if we don't
490
      	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
491
      }
492
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
493
      if (xmlAccessDAOList != null) {
494
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
495
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
496
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
497
      			acfsf.insertPermissions(xmlAccessDAO);
498
      		}
499
          }
500
      }
501
      
502
      
503
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
504
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
505
      {
506
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
507
                                     " into "+tableName + " from " +
508
                                         remoteserver);
509
        DOCINSERTNUMBER++;
510
      }
511
      else
512
      {
513
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
514
                  " into "+tableName + " from " +
515
                      remoteserver);
516
          REVINSERTNUMBER++;
517
      }
518
      String ip = getIpFromURL(u);
519
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
520
      
521

    
522
    }//try
523
    catch(Exception e)
524
    {
525
        
526
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
527
        {
528
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
529
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
530
                                       " into "+tableName + " from " +
531
                                           remoteserver + " because "+e.getMessage());
532
          DOCERRORNUMBER++;
533
        }
534
        else
535
        {
536
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
537
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
538
                    " into "+tableName + " from " +
539
                        remoteserver +" because "+e.getMessage());
540
            REVERRORNUMBER++;
541
        }
542
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
543
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
544
                                      " into db because " + e.getMessage(), e);
545
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
546
    		  + "writing Replication: " +e.getMessage());
547
    }
548
    finally
549
    {
550
       //return DBConnection
551
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
552
    }//finally
553
    logMetacat.info("replication.create localId:" + accNumber);
554
  }
555

    
556

    
557

    
558
  /* Handle replicate single xml document*/
559
  private void handleSingleDataFile(String remoteserver, String actions,
560
                                    String accNumber, String tableName)
561
               throws HandlerException
562
  {
563
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
564
    DBConnection dbConn = null;
565
    int serialNumber = -1;
566
    InputStream input = null;
567
    try
568
    {
569
      // Get DBConnection from pool
570
      dbConn=DBConnectionPool.
571
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
572
      serialNumber=dbConn.getCheckOutSerialNumber();
573
      // Try get docid info from remote server
574
      DocInfoHandler dih = new DocInfoHandler();
575
      XMLReader docinfoParser = initParser(dih);
576
      String docInfoURLString = "https://" + remoteserver +
577
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
578
                  "&action=getdocumentinfo&docid="+accNumber;
579
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
580
      URL docinfoUrl = new URL(docInfoURLString);
581

    
582
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
583
      
584
      // strip out the system metadata portion
585
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
586
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
587
   	  
588
   	  // process system metadata
589
      if (systemMetadataXML != null) {
590
    	  SystemMetadata sysMeta = 
591
    		TypeMarshaller.unmarshalTypeFromStream(
592
    				  SystemMetadata.class, 
593
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
594
    	  // need the guid-to-docid mapping
595
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
596
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
597
    	  }
598
    	  // save the system metadata
599
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
600
    	  // submit for indexing
601
          MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
602

    
603
      }
604
   	  
605
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
606
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
607
      
608
      // Get docid name (such as acl or dataset)
609
      String docName = docinfoHash.get("docname");
610
      // Get doc type (eml public id)
611
      String docType = docinfoHash.get("doctype");
612
      // Get docid home sever. it might be different to remoteserver
613
      // because of hub feature
614
      String docHomeServer = docinfoHash.get("home_server");
615
      String createdDateString = docinfoHash.get("date_created");
616
      String updatedDateString = docinfoHash.get("date_updated");
617
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
618
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
619
      //docid should include rev number too
620
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
621
                                              (String)docinfoHash.get("rev");*/
622

    
623
      String datafilePath = PropertyService.getProperty("application.datafilepath");
624
      // Get data file content
625
      String readDataURLString = "https://" + remoteserver + "?server="+
626
                                        MetacatUtil.getLocalReplicationServerName()+
627
                                            "&action=readdata&docid="+accNumber;
628
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
629
      URL u = new URL(readDataURLString);
630
      input = ReplicationService.getURLStream(u);
631
      //register data file into xml_documents table and wite data file
632
      //into file system
633
      if ( input != null)
634
      {
635
        DocumentImpl.writeDataFileInReplication(input,
636
                                                datafilePath,
637
                                                docName,docType,
638
                                                accNumber,
639
                                                null,
640
                                                docHomeServer,
641
                                                remoteserver,
642
                                                tableName,
643
                                                true, //true means timed replication
644
                                                createdDate,
645
                                                updatedDate);
646
                                         
647
        //set the user information
648
        String user = (String) docinfoHash.get("user_owner");
649
		String updated = (String) docinfoHash.get("user_updated");
650
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
651
        
652
        //process extra access rules
653
        try {
654
        	// check if we had a guid -> docid mapping
655
        	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
656
        	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
657
        	IdentifierManager.getInstance().getGUID(docid, rev);
658
        	// no need to create the mapping if we have it
659
        } catch (McdbDocNotFoundException mcdbe) {
660
        	// create mapping if we don't
661
        	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
662
        }
663
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
664
        if (xmlAccessDAOList != null) {
665
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
666
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
667
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
668
        			acfsf.insertPermissions(xmlAccessDAO);
669
        		}
670
            }
671
        }
672
        
673
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
674
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
675
                                    remote server);*/
676
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
677
        {
678
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
679
                                       " into "+tableName + " from " +
680
                                           remoteserver);
681
          DOCINSERTNUMBER++;
682
        }
683
        else
684
        {
685
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
686
                    " into "+tableName + " from " +
687
                        remoteserver);
688
            REVINSERTNUMBER++;
689
        }
690
        String ip = getIpFromURL(u);
691
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
692
        
693
      }//if
694
      else
695
      {
696
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
697
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
698
      }//else
699

    
700
    }//try
701
    catch(Exception e)
702
    {
703
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
704
                                      " because " +e.getMessage());*/
705
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
706
      {
707
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
708
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
709
                                     " into " + tableName + " from " +
710
                                         remoteserver + " because " + e.getMessage());
711
        DOCERRORNUMBER++;
712
      }
713
      else
714
      {
715
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
716
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
717
                  " into " + tableName + " from " +
718
                      remoteserver +" because "+ e.getMessage());
719
          REVERRORNUMBER++;
720
      }
721
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
722
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
723
                                      " because " + e.getMessage());
724
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
725
    		  + "writing Replication: " + e.getMessage());
726
    }
727
    finally
728
    {
729
       IOUtils.closeQuietly(input);
730
       //return DBConnection
731
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
732

    
733
     
734
    }//finally
735
    logMetacat.info("replication.create localId:" + accNumber);
736
  }
737

    
738

    
739

    
740
  /* Handle delete single document*/
741
  private void handleDeleteSingleDocument(String docId, String notifyServer)
742
               throws HandlerException
743
  {
744
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
745
    DBConnection dbConn = null;
746
    int serialNumber = -1;
747
    try
748
    {
749
      // Get DBConnection from pool
750
      dbConn=DBConnectionPool.
751
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
752
      serialNumber=dbConn.getCheckOutSerialNumber();
753
      if(!alreadyDeleted(docId))
754
      {
755

    
756
         //because delete method docid should have rev number
757
         //so we just add one for it. This rev number is no sence.
758
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
759
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
760
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
761
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
762
         URL u = new URL("https://"+notifyServer);
763
         String ip = getIpFromURL(u);
764
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
765
      }
766

    
767
    }//try
768
    catch(McdbDocNotFoundException e)
769
    {
770
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
771
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
772
                                 " in db because because " + e.getMessage());
773
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
774
    		  + "when handling document: " + e.getMessage());
775
    }
776
    catch(InsufficientKarmaException e)
777
    {
778
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
779
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
780
                                 " in db because because " + e.getMessage());
781
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
782
    		  + "when handling document: " + e.getMessage());
783
    }
784
    catch(SQLException e)
785
    {
786
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
787
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
788
                                 " in db because because " + e.getMessage());
789
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
790
    		  + "when handling document: " + e.getMessage());
791
    }
792
    catch(Exception e)
793
    {
794
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
795
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
796
                                 " in db because because " + e.getMessage());
797
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
798
    		  + "when handling document: " + e.getMessage());
799
    }
800
    finally
801
    {
802
       //return DBConnection
803
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
804
    }//finally
805
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
806
  }
807

    
808
  /* Handle updateLastCheckTimForSingleServer*/
809
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
810
                                                  throws HandlerException
811
  {
812
    String server = repServer.getServerName();
813
    DBConnection dbConn = null;
814
    int serialNumber = -1;
815
    PreparedStatement pstmt = null;
816
    try
817
    {
818
      // Get DBConnection from pool
819
      dbConn=DBConnectionPool.
820
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
821
      serialNumber=dbConn.getCheckOutSerialNumber();
822

    
823
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
824
      // Get time from remote server
825
      URL dateurl = new URL("https://" + server + "?server="+
826
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
827
      String datexml = ReplicationService.getURLContent(dateurl);
828
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
829
      if (datexml != null && !datexml.equals("")) {
830
    	  
831
    	  // parse the ISO datetime
832
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
833
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
834
         
835
         StringBuffer sql = new StringBuffer();
836
         sql.append("update xml_replication set last_checked = ? ");
837
         sql.append(" where server like ? ");
838
         pstmt = dbConn.prepareStatement(sql.toString());
839
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
840
         pstmt.setString(2, server);
841
         
842
         pstmt.executeUpdate();
843
         dbConn.commit();
844
         pstmt.close();
845
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
846
                                      + server);
847
      }//if
848
      else
849
      {
850

    
851
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
852
                                  server + " in db because couldn't get time "
853
                                  );
854
         throw new Exception("Couldn't get time for server "+ server);
855
      }
856

    
857
    }//try
858
    catch(Exception e)
859
    {
860
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
861
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
862
                                server + " in db because because " + e.getMessage());
863
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
864
    		  + "Error updating last checked time: " + e.getMessage());
865
    }
866
    finally
867
    {
868
       //return DBConnection
869
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
870
    }//finally
871
  }
872
  
873
  	/**
874
	 * Handle replicate system metadata
875
	 * 
876
	 * @param remoteserver
877
	 * @param guid
878
	 * @throws HandlerException
879
	 */
880
	private void handleSystemMetadata(String remoteserver, String guid) 
881
		throws HandlerException {
882
		try {
883

    
884
			// Try get the system metadata from remote server
885
			String sysMetaURLStr = "https://" + remoteserver + "?server="
886
					+ MetacatUtil.getLocalReplicationServerName()
887
					+ "&action=getsystemmetadata&guid=" + guid;
888
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
889
			URL sysMetaUrl = new URL(sysMetaURLStr);
890
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
891
							+ sysMetaUrl.toString());
892
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
893

    
894
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
895

    
896
			// process system metadata
897
			if (systemMetadataXML != null) {
898
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
899
								new ByteArrayInputStream(systemMetadataXML
900
										.getBytes("UTF-8")));
901
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
902
				// submit for indexing
903
                MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
904
			}
905

    
906
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
907
							+ guid);
908

    
909
			String ip = getIpFromURL(sysMetaUrl);
910
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
911

    
912
		} catch (Exception e) {
913
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
914
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
915
			logReplication
916
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
917
							+ guid + " into db because " + e.getMessage());
918
			throw new HandlerException(
919
					"ReplicationHandler.handleSystemMetadata - generic exception "
920
							+ "writing Replication: " + e.getMessage());
921
		}
922

    
923
	}
924

    
925
  /**
926
   * updates xml_catalog with entries from other servers.
927
   */
928
  private void updateCatalog()
929
  {
930
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
931
    // ReplicationServer object in server list
932
    ReplicationServer replServer = null;
933
    PreparedStatement pstmt = null;
934
    String server = null;
935

    
936

    
937
    // Go through each ReplicationServer object in sererlist
938
    for (int j=0; j<serverList.size(); j++)
939
    {
940
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
941
      Vector<String> publicId = new Vector<String>();
942
      try
943
      {
944
        // Get ReplicationServer object from server list
945
        replServer = serverList.serverAt(j);
946
        // Get server name from the ReplicationServer object
947
        server = replServer.getServerName();
948
        // Try to get catalog
949
        URL u = new URL("https://" + server + "?server="+
950
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
951
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
952
        String catxml = ReplicationService.getURLContent(u);
953

    
954
        // Make sure there are not error, no empty string
955
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
956
        {
957
          throw new Exception("Couldn't get catalog list form server " +server);
958
        }
959
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
960
        CatalogMessageHandler cmh = new CatalogMessageHandler();
961
        XMLReader catparser = initParser(cmh);
962
        catparser.parse(new InputSource(new StringReader(catxml)));
963
        //parse the returned catalog xml and put it into a vector
964
        remoteCatalog = cmh.getCatalogVect();
965

    
966
        // Make sure remoteCatalog is not empty
967
        if (remoteCatalog.isEmpty())
968
        {
969
          throw new Exception("Couldn't get catalog list form server " +server);
970
        }
971

    
972
        String localcatxml = ReplicationService.getCatalogXML();
973

    
974
        // Make sure local catalog is no empty
975
        if (localcatxml==null||localcatxml.equals(""))
976
        {
977
          throw new Exception("Couldn't get catalog list form server " +server);
978
        }
979

    
980
        cmh = new CatalogMessageHandler();
981
        catparser = initParser(cmh);
982
        catparser.parse(new InputSource(new StringReader(localcatxml)));
983
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
984

    
985
        //now we have the catalog from the remote server and this local server
986
        //we now need to compare the two and merge the differences.
987
        //the comparison is base on the public_id fields which is the 4th
988
        //entry in each row vector.
989
        publicId = new Vector<String>();
990
        for(int i=0; i<localCatalog.size(); i++)
991
        {
992
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
993
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
994
          publicId.add(new String((String)v.elementAt(3)));
995
        }
996
      }//try
997
      catch (Exception e)
998
      {
999
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1000
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1001
                                    server + " because " +e.getMessage());
1002
      }//catch
1003

    
1004
      for(int i=0; i<remoteCatalog.size(); i++)
1005
      {
1006
         // DConnection
1007
        DBConnection dbConn = null;
1008
        // DBConnection checkout serial number
1009
        int serialNumber = -1;
1010
        try
1011
        {
1012
            dbConn=DBConnectionPool.
1013
                  getDBConnection("ReplicationHandler.updateCatalog");
1014
            serialNumber=dbConn.getCheckOutSerialNumber();
1015
            Vector<String> v = remoteCatalog.elementAt(i);
1016
            //logMetacat.debug("v2: " + v.toString());
1017
            //logMetacat.debug("i: " + i);
1018
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
1019
            //logMetacat.debug("publicID: " + publicId.toString());
1020
            logReplication.info
1021
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
1022
           if(!publicId.contains(v.elementAt(3)))
1023
           { //so we don't have this public id in our local table so we need to
1024
             //add it.
1025
        	   
1026
        	   // check where it is pointing first, before adding
1027
        	   String entryType = (String)v.elementAt(0);
1028
        	   if (entryType.equals(DocumentImpl.SCHEMA)) {
1029
	        	   String nameSpace = (String)v.elementAt(3);
1030
	        	   String schemaLocation = (String)v.elementAt(4);
1031
	        	   SchemaLocationResolver slr = new SchemaLocationResolver(nameSpace, schemaLocation);
1032
	        	   try {
1033
	        		   slr.resolveNameSpace();
1034
	        	   } catch (Exception e) {
1035
	        		   String msg = "Could not save remote schema to xml catalog. " + "nameSpace: " + nameSpace + " location: " + schemaLocation;
1036
	        		   logMetacat.error(msg, e);
1037
	        		   logReplication.error(msg, e);
1038
	        	   }
1039
	        	   // skip whatever else we were going to do
1040
	        	   continue;
1041
        	   }
1042
        	   
1043
             //logMetacat.debug("in if");
1044
             StringBuffer sql = new StringBuffer();
1045
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
1046
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
1047
             sql.append("?,?)");
1048
             //logMetacat.debug("sql: " + sql.toString());
1049
             pstmt = dbConn.prepareStatement(sql.toString());
1050
             pstmt.setString(1, (String)v.elementAt(0));
1051
             pstmt.setString(2, (String)v.elementAt(1));
1052
             pstmt.setString(3, (String)v.elementAt(2));
1053
             pstmt.setString(4, (String)v.elementAt(3));
1054
             pstmt.setString(5, (String)v.elementAt(4));
1055
             pstmt.execute();
1056
             pstmt.close();
1057
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
1058
                               (String)v.elementAt(3) + " from server"+server);
1059
           }
1060
        }
1061
        catch(Exception e)
1062
        {
1063
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1064
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1065
                                    server + " because " +e.getMessage());
1066
        }//catch
1067
        finally
1068
        {
1069
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1070
        }//finally
1071
      }//for remote catalog
1072
    }//for server list
1073
    logReplication.info("End of updateCatalog");
1074
  }
1075

    
1076
  /**
1077
   * Method that returns true if docid has already been "deleted" from metacat.
1078
   * This method really implements a truth table for deleted documents
1079
   * The table is (a docid in one of the tables is represented by the X):
1080
   * xml_docs      xml_revs      deleted?
1081
   * ------------------------------------
1082
   *   X             X             FALSE
1083
   *   X             _             FALSE
1084
   *   _             X             TRUE
1085
   *   _             _             TRUE
1086
   */
1087
  private static boolean alreadyDeleted(String docid) throws HandlerException
1088
  {
1089
    DBConnection dbConn = null;
1090
    int serialNumber = -1;
1091
    PreparedStatement pstmt = null;
1092
    try
1093
    {
1094
      dbConn=DBConnectionPool.
1095
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1096
      serialNumber=dbConn.getCheckOutSerialNumber();
1097
      boolean xml_docs = false;
1098
      boolean xml_revs = false;
1099

    
1100
      StringBuffer sb = new StringBuffer();
1101
      sb.append("select docid from xml_revisions where docid like ? ");
1102
      pstmt = dbConn.prepareStatement(sb.toString());
1103
      pstmt.setString(1, docid);
1104
      pstmt.execute();
1105
      ResultSet rs = pstmt.getResultSet();
1106
      boolean tablehasrows = rs.next();
1107
      if(tablehasrows)
1108
      {
1109
        xml_revs = true;
1110
      }
1111

    
1112
      sb = new StringBuffer();
1113
      sb.append("select docid from xml_documents where docid like '");
1114
      sb.append(docid).append("'");
1115
      pstmt.close();
1116
      pstmt = dbConn.prepareStatement(sb.toString());
1117
      //increase usage count
1118
      dbConn.increaseUsageCount(1);
1119
      pstmt.execute();
1120
      rs = pstmt.getResultSet();
1121
      tablehasrows = rs.next();
1122
      pstmt.close();
1123
      if(tablehasrows)
1124
      {
1125
        xml_docs = true;
1126
      }
1127

    
1128
      if(xml_docs && xml_revs)
1129
      {
1130
        return false;
1131
      }
1132
      else if(xml_docs && !xml_revs)
1133
      {
1134
        return false;
1135
      }
1136
      else if(!xml_docs && xml_revs)
1137
      {
1138
        return true;
1139
      }
1140
      else if(!xml_docs && !xml_revs)
1141
      {
1142
        return true;
1143
      }
1144
    }
1145
    catch(Exception e)
1146
    {
1147
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1148
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1149
                          e.getMessage());
1150
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1151
    		  + e.getMessage());
1152
    }
1153
    finally
1154
    {
1155
      try
1156
      {
1157
        pstmt.close();
1158
      }//try
1159
      catch (SQLException ee)
1160
      {
1161
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1162
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1163
                          "to close pstmt: "+ee.getMessage());
1164
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1165
      		  + ee.getMessage());
1166
      }//catch
1167
      finally
1168
      {
1169
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1170
      }//finally
1171
    }//finally
1172
    return false;
1173
  }
1174

    
1175

    
1176
  /**
1177
   * Method to initialize the message parser
1178
   */
1179
  public static XMLReader initParser(DefaultHandler dh)
1180
          throws HandlerException
1181
  {
1182
    XMLReader parser = null;
1183

    
1184
    try {
1185
      ContentHandler chandler = dh;
1186

    
1187
      // Get an instance of the parser
1188
      String parserName = PropertyService.getProperty("xml.saxparser");
1189
      parser = XMLReaderFactory.createXMLReader(parserName);
1190

    
1191
      // Turn off validation
1192
      parser.setFeature("http://xml.org/sax/features/validation", false);
1193

    
1194
      parser.setContentHandler((ContentHandler)chandler);
1195
      parser.setErrorHandler((ErrorHandler)chandler);
1196

    
1197
    } catch (SAXException se) {
1198
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1199
    		  + " initializing parser: " + se.getMessage());
1200
    } catch (PropertyNotFoundException pnfe) {
1201
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1202
      		  + " getting parser name: " + pnfe.getMessage());
1203
    } 
1204

    
1205
    return parser;
1206
  }
1207

    
1208
  /**
1209
	 * This method will combine given time string(in short format) to current
1210
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1211
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1212
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1213
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1214
	 * 
1215
	 * @param givenTime
1216
	 *            the format should be "10:00 AM " or "2:00 PM"
1217
	 * @return
1218
	 * @throws Exception
1219
	 */
1220
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1221
  {
1222
	  try {
1223
     Date givenDate = parseTime(givenTime);
1224
     Date newDate = null;
1225
     Date now = new Date();
1226
     String currentTimeString = getTimeString(now);
1227
     Date currentTime = parseTime(currentTimeString); 
1228
     if ( currentTime.getTime() >= givenDate.getTime())
1229
     {
1230
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1231
        String dateAndTime = getDateString(now) + " " + givenTime;
1232
        Date combinationDate = parseDateTime(dateAndTime);
1233
        // new date should plus 24 hours to make is the second day
1234
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1235
     }
1236
     else
1237
     {
1238
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1239
         String dateAndTime = getDateString(now) + " " + givenTime;
1240
         newDate = parseDateTime(dateAndTime);
1241
     }
1242
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1243
     return newDate;
1244
	  } catch (ParseException pe) {
1245
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1246
				  + "parsing error: "  + pe.getMessage());
1247
	  }
1248
  }
1249

    
1250
  /*
1251
	 * parse a given string to Time in short format. For example, given time is
1252
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1253
	 */
1254
  private static Date parseTime(String timeString) throws ParseException
1255
  {
1256
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1257
    Date time = format.parse(timeString); 
1258
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1259
                              +time.toString());
1260
    return time;
1261

    
1262
  }
1263
  
1264
  /*
1265
   * Parse a given string to date and time. Date format is long and time
1266
   * format is short.
1267
   */
1268
  private static Date parseDateTime(String timeString) throws ParseException
1269
  {
1270
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1271
    Date time = format.parse(timeString);
1272
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1273
                             time.toString());
1274
    return time;
1275
  }
1276
  
1277
  /*
1278
   * Get a date string from a Date object. The date format will be long
1279
   */
1280
  private static String getDateString(Date now)
1281
  {
1282
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1283
     String s = df.format(now);
1284
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1285
     return s;
1286
  }
1287
  
1288
  /*
1289
   * Get a time string from a Date object, the time format will be short
1290
   */
1291
  private static String getTimeString(Date now)
1292
  {
1293
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1294
     String s = df.format(now);
1295
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1296
     return s;
1297
  }
1298
  
1299
  
1300
  /*
1301
	 * This method will go through the docid list both in xml_Documents table
1302
	 * and in xml_revisions table @author tao
1303
	 */
1304
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1305
		boolean dataFile = false;
1306
		for (int j = 0; j < docList.size(); j++) {
1307
			// initial dataFile is false
1308
			dataFile = false;
1309
			// w is information for one document, information contain
1310
			// docid, rev, server or datafile.
1311
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1312
			// Check if the vector w contain "datafile"
1313
			// If it has, this document is data file
1314
			try {
1315
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1316
					dataFile = true;
1317
				}
1318
			} catch (PropertyNotFoundException pnfe) {
1319
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1320
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1321
						+ "Leaving as false: " + pnfe.getMessage());
1322
			}
1323
			// logMetacat.debug("w: " + w.toString());
1324
			// Get docid
1325
			String docid = (String) w.elementAt(0);
1326
			logReplication.info("docid: " + docid);
1327
			// Get revision number
1328
			int rev = Integer.parseInt((String) w.elementAt(1));
1329
			logReplication.info("rev: " + rev);
1330
			// Get remote server name (it is may not be doc home server because
1331
			// the new hub feature
1332
			String remoteServer = (String) w.elementAt(2);
1333
			remoteServer = remoteServer.trim();
1334

    
1335
			try {
1336
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1337
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1338
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1339
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1340
				} else {
1341
					continue;
1342
				}
1343

    
1344
			} catch (Exception e) {
1345
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1346
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1347
						+ " in time replication" + e.getMessage(), e);
1348
				continue;
1349
			}
1350
			
1351
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1352
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1353
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1354
	        }
1355
	        
1356
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1357
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1358
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1359
	        }
1360

    
1361
		}// for update docs
1362

    
1363
	}
1364
   
1365
   /*
1366
	 * This method will handle doc in xml_documents table.
1367
	 */
1368
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1369
                                        throws HandlerException
1370
   {
1371
       // compare the update rev and local rev to see what need happen
1372
       int localrev = -1;
1373
       String action = null;
1374
       boolean flag = false;
1375
       try
1376
       {
1377
    	 long docQueryStartTime = System.currentTimeMillis();
1378
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1379
         long docQueryEndTime = System.currentTimeMillis();
1380
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1381
         _xmlDocQueryCount++;
1382
       }
1383
       catch (SQLException e)
1384
       {
1385
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1386
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1387
                                " be found because " + e.getMessage());
1388
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1389
                 "written because error happend to find it's local revision");
1390
         DOCERRORNUMBER++;
1391
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1392
                 " be found: " + e.getMessage());
1393
       }
1394
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1395
                               localrev);
1396

    
1397
       //check the revs for an update because this document is in the
1398
       //local DB, it might be out of date.
1399
       if (localrev == -1)
1400
       {
1401
          // check if the revision is in the revision table
1402
    	   Vector<Integer> localRevVector = null;
1403
    	 try {
1404
        	 long revQueryStartTime = System.currentTimeMillis();
1405
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1406
             long revQueryEndTime = System.currentTimeMillis();
1407
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1408
             _xmlRevQueryCount++;
1409
    	 } catch (SQLException sqle) {
1410
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1411
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1412
    	 }
1413
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1414
         {
1415
             // this version was deleted, so don't need replicate
1416
             flag = false;
1417
         }
1418
         else
1419
         {
1420
           //insert this document as new because it is not in the local DB
1421
           action = "INSERT";
1422
           flag = true;
1423
         }
1424
       }
1425
       else
1426
       {
1427
         if(localrev == rev)
1428
         {
1429
           // Local meatacat has the same rev to remote host, don't need
1430
           // update and flag set false
1431
           flag = false;
1432
         }
1433
         else if(localrev < rev)
1434
         {
1435
           //this document needs to be updated so send an read request
1436
           action = "UPDATE";
1437
           flag = true;
1438
         }
1439
       }
1440
       
1441
       String accNumber = null;
1442
       try {
1443
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1444
       } catch (PropertyNotFoundException pnfe) {
1445
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1446
    			   + "account number separator : " + pnfe.getMessage());
1447
       }
1448
       // this is non-data file
1449
       if(flag && !dataFile)
1450
       {
1451
         try
1452
         {
1453
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1454
         }
1455
         catch(HandlerException he)
1456
         {
1457
           // skip this document
1458
           throw he;
1459
         }
1460
       }//if for non-data file
1461

    
1462
        // this is for data file
1463
       if(flag && dataFile)
1464
       {
1465
         try
1466
         {
1467
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1468
         }
1469
         catch(HandlerException he)
1470
         {
1471
           // skip this data file
1472
           throw he;
1473
         }
1474

    
1475
       }//for data file
1476
   }
1477
   
1478
   /*
1479
    * This method will handle doc in xml_documents table.
1480
    */
1481
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1482
                                        throws HandlerException
1483
   {
1484
       // compare the update rev and local rev to see what need happen
1485
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1486
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1487
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1488
       Vector<Integer> localrev = null;
1489
       String action = "INSERT";
1490
       boolean flag = false;
1491
       try
1492
       {
1493
      	 long revQueryStartTime = System.currentTimeMillis();
1494
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1495
         long revQueryEndTime = System.currentTimeMillis();
1496
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1497
         _xmlRevQueryCount++;
1498
       }
1499
       catch (SQLException sqle)
1500
       {
1501
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1502
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1503
                                " be found because " + sqle.getMessage());
1504
         REVERRORNUMBER++;
1505
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1506
        		 + sqle.getMessage());
1507
       }
1508
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1509
                               localrev.toString());
1510
       
1511
       // if the rev is not in the xml_revision, we need insert it
1512
       if (!localrev.contains(new Integer(rev)))
1513
       {
1514
           flag = true;    
1515
       }
1516
     
1517
       String accNumber = null;
1518
       try {
1519
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1520
       } catch (PropertyNotFoundException pnfe) {
1521
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1522
    			   + "account number separator : " + pnfe.getMessage());
1523
       }
1524
       // this is non-data file
1525
       if(flag && !dataFile)
1526
       {
1527
         try
1528
         {
1529
           
1530
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1531
         }
1532
         catch(HandlerException he)
1533
         {
1534
           // skip this document
1535
           throw he;
1536
         }
1537
       }//if for non-data file
1538

    
1539
        // this is for data file
1540
       if(flag && dataFile)
1541
       {
1542
         try
1543
         {
1544
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1545
         }
1546
         catch(HandlerException he)
1547
         {
1548
           // skip this data file
1549
           throw he;
1550
         }
1551

    
1552
       }//for data file
1553
   }
1554
   
1555
   /*
1556
    * Return a ip address for given url
1557
    */
1558
   private String getIpFromURL(URL url)
1559
   {
1560
	   String ip = null;
1561
	   try
1562
	   {
1563
	      InetAddress address = InetAddress.getByName(url.getHost());
1564
	      ip = address.getHostAddress();
1565
	   }
1566
	   catch(UnknownHostException e)
1567
	   {
1568
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1569
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1570
                   +e.getMessage());
1571
	   }
1572

    
1573
	   return ip;
1574
   }
1575
  
1576
}
1577

    
(3-3/7)