Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2014-02-25 16:14:20 -0800 (Tue, 25 Feb 2014) $'
10
 * '$Revision: 8647 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.TimerTask;
44
import java.util.Vector;
45

    
46

    
47
import org.apache.log4j.Logger;
48
import org.dataone.service.types.v1.SystemMetadata;
49
import org.dataone.service.util.DateTimeMarshaller;
50
import org.dataone.service.util.TypeMarshaller;
51
import org.xml.sax.ContentHandler;
52
import org.xml.sax.ErrorHandler;
53
import org.xml.sax.InputSource;
54
import org.xml.sax.SAXException;
55
import org.xml.sax.XMLReader;
56
import org.xml.sax.helpers.DefaultHandler;
57
import org.xml.sax.helpers.XMLReaderFactory;
58

    
59
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
60
import edu.ucsb.nceas.metacat.DBUtil;
61
import edu.ucsb.nceas.metacat.DocumentImpl;
62
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
63
import edu.ucsb.nceas.metacat.EventLog;
64
import edu.ucsb.nceas.metacat.IdentifierManager;
65
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
66
import edu.ucsb.nceas.metacat.SchemaLocationResolver;
67
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
68
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
69
import edu.ucsb.nceas.metacat.database.DBConnection;
70
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
71
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
72
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
73
import edu.ucsb.nceas.metacat.properties.PropertyService;
74
import edu.ucsb.nceas.metacat.shared.HandlerException;
75
import edu.ucsb.nceas.metacat.util.DocumentUtil;
76
import edu.ucsb.nceas.metacat.util.MetacatUtil;
77
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
78
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
79
import edu.ucsb.nceas.utilities.access.DocInfoHandler;
80
import edu.ucsb.nceas.utilities.access.XMLAccessDAO;
81

    
82

    
83

    
84
/**
85
 * This class handles deltaT replication checking.  Whenever this TimerTask
86
 * is fired it checks each server in xml_replication for updates and updates
87
 * the local db as needed.
88
 */
89
public class ReplicationHandler extends TimerTask
90
{
91
  int serverCheckCode = 1;
92
  ReplicationServerList serverList = null;
93
  //PrintWriter out;
94
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
95
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
96
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
97
  
98
  private static int DOCINSERTNUMBER = 1;
99
  private static int DOCERRORNUMBER  = 1;
100
  private static int REVINSERTNUMBER = 1;
101
  private static int REVERRORNUMBER  = 1;
102
  
103
  private static int _xmlDocQueryCount = 0;
104
  private static int _xmlRevQueryCount = 0;
105
  private static long _xmlDocQueryTime = 0;
106
  private static long _xmlRevQueryTime = 0;
107
  
108
  
109
  public ReplicationHandler()
110
  {
111
    //this.out = o;
112
    serverList = new ReplicationServerList();
113
  }
114

    
115
  public ReplicationHandler(int serverCheckCode)
116
  {
117
    //this.out = o;
118
    this.serverCheckCode = serverCheckCode;
119
    serverList = new ReplicationServerList();
120
  }
121

    
122
  /**
123
   * Method that implements TimerTask.run().  It runs whenever the timer is
124
   * fired.
125
   */
126
  public void run()
127
  {
128
    //find out the last_checked time of each server in the server list and
129
    //send a query to each server to see if there are any documents in
130
    //xml_documents with an update_date > last_checked
131
	  
132
      //if serverList is null, metacat don't need to replication
133
      if (serverList==null||serverList.isEmpty())
134
      {
135
        return;
136
      }
137
      updateCatalog();
138
      update();
139
      //conn.close();
140
  }
141

    
142
  /**
143
   * Method that uses revision tagging for replication instead of update_date.
144
   */
145
  private void update()
146
  {
147
	  
148
	  _xmlDocQueryCount = 0;
149
	  _xmlRevQueryCount = 0;
150
	  _xmlDocQueryTime = 0;
151
	  _xmlRevQueryTime = 0;
152
    /*
153
     Pseudo-algorithm
154
     - request a doc list from each server in xml_replication
155
     - check the rev number of each of those documents agains the
156
       documents in the local database
157
     - pull any documents that have a lesser rev number on the local server
158
       from the remote server
159
     - delete any documents that still exist in the local xml_documents but
160
       are in the deletedDocuments tag of the remote host response.
161
     - update last_checked to keep track of the last time it was checked.
162
       (this info is theoretically not needed using this system but probably
163
       should be kept anyway)
164
    */
165

    
166
    ReplicationServer replServer = null; // Variable to store the
167
                                        // ReplicationServer got from
168
                                        // Server list
169
    String server = null; // Variable to store server name
170
//    String update;
171
    Vector<InputStream> responses = new Vector<InputStream>();
172
    URL u;
173
    long replicationStartTime = System.currentTimeMillis();
174
    long timeToGetServerList = 0;
175
    
176
    //Check for every server in server list to get updated list and put
177
    // them in to response
178
    long startTimeToGetServers = System.currentTimeMillis();
179
    for (int i=0; i<serverList.size(); i++)
180
    {
181
        // Get ReplicationServer object from server list
182
        replServer = serverList.serverAt(i);
183
        // Get server name from ReplicationServer object
184
        server = replServer.getServerName().trim();
185
        InputStream result = null;
186
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
187
        // Send command to that server to get updated docid information
188
        try
189
        {
190
          u = new URL("https://" + server + "?server="
191
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
192
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
193
          result = ReplicationService.getURLStream(u);
194
        }
195
        catch (Exception e)
196
        {
197
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
198
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
199
                       "for server " + server + " because "+e.getMessage());
200
          continue;
201
        }
202

    
203
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
204
        //check if result have error or not, if has skip it.
205
        // TODO: check for error in stream
206
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
207
        if (result == null) {
208
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
209
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
210
                       "for server " + server + " because "+result);
211
          continue;
212
        }
213
        //Add result to vector
214
        responses.add(result);
215
    }
216
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
217

    
218
    //make sure that there is updated file list
219
    //If response is null, metacat don't need do anything
220
    if (responses==null || responses.isEmpty())
221
    {
222
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
223
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
224
                           "every server and failed to replicate");
225
        return;
226
    }
227

    
228

    
229
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
230
    //               "document information: "+ responses.toString());
231
    
232
    long totalServerListParseTime = 0;
233
    // go through response vector(it contains updated vector and delete vector
234
    for(int i=0; i<responses.size(); i++)
235
    {
236
    	long startServerListParseTime = System.currentTimeMillis();
237
    	XMLReader parser;
238
    	ReplMessageHandler message = new ReplMessageHandler();
239
    	try
240
        {
241
          parser = initParser(message);
242
        }
243
        catch (Exception e)
244
        {
245
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
246
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
247
                                " initParser for message and " +e.getMessage());
248
           // stop replication
249
           return;
250
        }
251
    	
252
        try
253
        {
254
          parser.parse(new InputSource(responses.elementAt(i)));
255
        }
256
        catch(Exception e)
257
        {
258
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
259
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
260
                                   "because "+ e.getMessage());
261
          continue;
262
        }
263
        //v is the list of updated documents
264
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
265
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
266
        //d is the list of deleted documents
267
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
268
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
269
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
270
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
271
        // go though every element in updated document vector
272
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
273
        //handle deleted docs
274
        for(int k=0; k<deleteList.size(); k++)
275
        { //delete the deleted documents;
276
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
277
          String docId = (String)w.elementAt(0);
278
          try
279
          {
280
            handleDeleteSingleDocument(docId, server);
281
          }
282
          catch (Exception ee)
283
          {
284
            continue;
285
          }
286
        }//for delete docs
287
        
288
        // handle replicate doc in xml_revision
289
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
290
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
291
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
292
        DOCINSERTNUMBER = 1;
293
        DOCERRORNUMBER  = 1;
294
        REVINSERTNUMBER = 1;
295
        REVERRORNUMBER  = 1;
296
        
297
        // handle system metadata
298
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
299
        for(int k = 0; k < systemMetadataList.size(); k++) { 
300
        	Vector<String> w = systemMetadataList.elementAt(k);
301
        	String guid = (String) w.elementAt(0);
302
        	String remoteserver = (String) w.elementAt(1);
303
        	try {
304
        		handleSystemMetadata(remoteserver, guid);
305
        	}
306
        	catch (Exception ee) {
307
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
308
        		continue;
309
        	}
310
        }
311
        
312
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
313
    }//for response
314

    
315
    //updated last_checked
316
    for (int i=0;i<serverList.size(); i++)
317
    {
318
       // Get ReplicationServer object from server list
319
       replServer = serverList.serverAt(i);
320
       try
321
       {
322
         updateLastCheckTimeForSingleServer(replServer);
323
       }
324
       catch(Exception e)
325
       {
326
         continue;
327
       }
328
    }//for
329
    
330
    long replicationEndTime = System.currentTimeMillis();
331
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
332
    		(replicationEndTime - replicationStartTime));
333
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
334
    		timeToGetServerList);
335
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
336
    		totalServerListParseTime);
337
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
338
    		_xmlDocQueryCount);
339
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
340
    		_xmlDocQueryTime + " ms");
341
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
342
    		_xmlRevQueryCount);
343
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
344
    		_xmlRevQueryTime + " ms");;
345

    
346
  }//update
347

    
348
  /* Handle replicate single xml document*/
349
  private void handleSingleXMLDocument(String remoteserver, String actions,
350
                                       String accNumber, String tableName)
351
               throws HandlerException
352
  {
353
    DBConnection dbConn = null;
354
    int serialNumber = -1;
355
    try
356
    {
357
      // Get DBConnection from pool
358
      dbConn=DBConnectionPool.
359
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
360
      serialNumber=dbConn.getCheckOutSerialNumber();
361
      //if the document needs to be updated or inserted, this is executed
362
      String readDocURLString = "https://" + remoteserver + "?server="+
363
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
364
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
365
      URL u = new URL(readDocURLString);
366

    
367
      // Get docid content
368
      String newxmldoc = ReplicationService.getURLContent(u);
369
      // If couldn't get skip it
370
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
371
      {
372
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
373
      }
374
      //logReplication.info("xml documnet:");
375
      //logReplication.info(newxmldoc);
376

    
377
      // Try get the docid info from remote server
378
      DocInfoHandler dih = new DocInfoHandler();
379
      XMLReader docinfoParser = initParser(dih);
380
      String docInfoURLStr = "https://" + remoteserver +
381
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
382
                       "&action=getdocumentinfo&docid="+accNumber;
383
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
384
      URL docinfoUrl = new URL(docInfoURLStr);
385
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
386
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
387
      
388
      // strip out the system metadata portion
389
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
390
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
391
      
392
   	  // process system metadata if we have it
393
      if (systemMetadataXML != null) {
394
    	  SystemMetadata sysMeta = 
395
    		  TypeMarshaller.unmarshalTypeFromStream(
396
    				  SystemMetadata.class, 
397
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
398
    	  // need the guid-to-docid mapping
399
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
400
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
401
    	  }
402
      	  // save the system metadata
403
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
404
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
405
      	  // submit for indexing
406
          MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
407
      }
408
   	  
409
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
410
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
411
      // Get home server of the docid
412
      String docHomeServer = docinfoHash.get("home_server");
413
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
414
     
415
      // dates
416
      String createdDateString = docinfoHash.get("date_created");
417
      String updatedDateString = docinfoHash.get("date_updated");
418
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
419
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
420
      
421
      //docid should include rev number too
422
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
423
                                              (String)docinfoHash.get("rev");*/
424
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
425
      String docType = docinfoHash.get("doctype");
426
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
427

    
428
      String parserBase = null;
429
      // this for eml2 and we need user eml2 parser
430
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
431
      {
432
         parserBase = DocumentImpl.EML200;
433
      }
434
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
435
      {
436
        parserBase = DocumentImpl.EML200;
437
      }
438
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
439
      {
440
        parserBase = DocumentImpl.EML210;
441
      }
442
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
443
      {
444
        parserBase = DocumentImpl.EML210;
445
      }
446
      // Write the document into local host
447
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
448
      String newDocid = wrapper.writeReplication(dbConn,
449
                              newxmldoc,
450
                              docinfoHash.get("public_access"),
451
                              null,  /* the dtd text */
452
                              actions,
453
                              accNumber,
454
                              null, //docinfoHash.get("user_owner"),                              
455
                              null, /* null for groups[] */
456
                              docHomeServer,
457
                              remoteserver, tableName, true,// true is for time replication 
458
                              createdDate,
459
                              updatedDate);
460
      
461
      //set the user information
462
      String user = (String) docinfoHash.get("user_owner");
463
      String updated = (String) docinfoHash.get("user_updated");
464
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
465
      
466
      //process extra access rules 
467
      try {
468
      	// check if we had a guid -> docid mapping
469
      	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
470
      	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
471
      	IdentifierManager.getInstance().getGUID(docid, rev);
472
      	// no need to create the mapping if we have it
473
      } catch (McdbDocNotFoundException mcdbe) {
474
      	// create mapping if we don't
475
      	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
476
      }
477
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
478
      if (xmlAccessDAOList != null) {
479
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
480
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
481
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
482
      			acfsf.insertPermissions(xmlAccessDAO);
483
      		}
484
          }
485
      }
486
      
487
      
488
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
489
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
490
      {
491
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
492
                                     " into "+tableName + " from " +
493
                                         remoteserver);
494
        DOCINSERTNUMBER++;
495
      }
496
      else
497
      {
498
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
499
                  " into "+tableName + " from " +
500
                      remoteserver);
501
          REVINSERTNUMBER++;
502
      }
503
      String ip = getIpFromURL(u);
504
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
505
      
506

    
507
    }//try
508
    catch(Exception e)
509
    {
510
        
511
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
512
        {
513
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
514
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
515
                                       " into "+tableName + " from " +
516
                                           remoteserver + " because "+e.getMessage());
517
          DOCERRORNUMBER++;
518
        }
519
        else
520
        {
521
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
522
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
523
                    " into "+tableName + " from " +
524
                        remoteserver +" because "+e.getMessage());
525
            REVERRORNUMBER++;
526
        }
527
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
528
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
529
                                      " into db because " + e.getMessage(), e);
530
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
531
    		  + "writing Replication: " +e.getMessage());
532
    }
533
    finally
534
    {
535
       //return DBConnection
536
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
537
    }//finally
538
    logMetacat.info("replication.create localId:" + accNumber);
539
  }
540

    
541

    
542

    
543
  /* Handle replicate single xml document*/
544
  private void handleSingleDataFile(String remoteserver, String actions,
545
                                    String accNumber, String tableName)
546
               throws HandlerException
547
  {
548
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
549
    DBConnection dbConn = null;
550
    int serialNumber = -1;
551
    try
552
    {
553
      // Get DBConnection from pool
554
      dbConn=DBConnectionPool.
555
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
556
      serialNumber=dbConn.getCheckOutSerialNumber();
557
      // Try get docid info from remote server
558
      DocInfoHandler dih = new DocInfoHandler();
559
      XMLReader docinfoParser = initParser(dih);
560
      String docInfoURLString = "https://" + remoteserver +
561
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
562
                  "&action=getdocumentinfo&docid="+accNumber;
563
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
564
      URL docinfoUrl = new URL(docInfoURLString);
565

    
566
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
567
      
568
      // strip out the system metadata portion
569
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
570
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
571
   	  
572
   	  // process system metadata
573
      if (systemMetadataXML != null) {
574
    	  SystemMetadata sysMeta = 
575
    		TypeMarshaller.unmarshalTypeFromStream(
576
    				  SystemMetadata.class, 
577
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
578
    	  // need the guid-to-docid mapping
579
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
580
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
581
    	  }
582
    	  // save the system metadata
583
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
584
    	  // submit for indexing
585
          MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
586

    
587
      }
588
   	  
589
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
590
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
591
      
592
      // Get docid name (such as acl or dataset)
593
      String docName = docinfoHash.get("docname");
594
      // Get doc type (eml public id)
595
      String docType = docinfoHash.get("doctype");
596
      // Get docid home sever. it might be different to remoteserver
597
      // because of hub feature
598
      String docHomeServer = docinfoHash.get("home_server");
599
      String createdDateString = docinfoHash.get("date_created");
600
      String updatedDateString = docinfoHash.get("date_updated");
601
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
602
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
603
      //docid should include rev number too
604
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
605
                                              (String)docinfoHash.get("rev");*/
606

    
607
      String datafilePath = PropertyService.getProperty("application.datafilepath");
608
      // Get data file content
609
      String readDataURLString = "https://" + remoteserver + "?server="+
610
                                        MetacatUtil.getLocalReplicationServerName()+
611
                                            "&action=readdata&docid="+accNumber;
612
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
613
      URL u = new URL(readDataURLString);
614
      InputStream input = ReplicationService.getURLStream(u);
615
      //register data file into xml_documents table and wite data file
616
      //into file system
617
      if ( input != null)
618
      {
619
        DocumentImpl.writeDataFileInReplication(input,
620
                                                datafilePath,
621
                                                docName,docType,
622
                                                accNumber,
623
                                                null,
624
                                                docHomeServer,
625
                                                remoteserver,
626
                                                tableName,
627
                                                true, //true means timed replication
628
                                                createdDate,
629
                                                updatedDate);
630
                                         
631
        //set the user information
632
        String user = (String) docinfoHash.get("user_owner");
633
		String updated = (String) docinfoHash.get("user_updated");
634
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
635
        
636
        //process extra access rules
637
        try {
638
        	// check if we had a guid -> docid mapping
639
        	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
640
        	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
641
        	IdentifierManager.getInstance().getGUID(docid, rev);
642
        	// no need to create the mapping if we have it
643
        } catch (McdbDocNotFoundException mcdbe) {
644
        	// create mapping if we don't
645
        	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
646
        }
647
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
648
        if (xmlAccessDAOList != null) {
649
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
650
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
651
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
652
        			acfsf.insertPermissions(xmlAccessDAO);
653
        		}
654
            }
655
        }
656
        
657
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
658
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
659
                                    remote server);*/
660
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
661
        {
662
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
663
                                       " into "+tableName + " from " +
664
                                           remoteserver);
665
          DOCINSERTNUMBER++;
666
        }
667
        else
668
        {
669
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
670
                    " into "+tableName + " from " +
671
                        remoteserver);
672
            REVINSERTNUMBER++;
673
        }
674
        String ip = getIpFromURL(u);
675
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
676
        
677
      }//if
678
      else
679
      {
680
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
681
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
682
      }//else
683

    
684
    }//try
685
    catch(Exception e)
686
    {
687
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
688
                                      " because " +e.getMessage());*/
689
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
690
      {
691
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
692
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
693
                                     " into " + tableName + " from " +
694
                                         remoteserver + " because " + e.getMessage());
695
        DOCERRORNUMBER++;
696
      }
697
      else
698
      {
699
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
700
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
701
                  " into " + tableName + " from " +
702
                      remoteserver +" because "+ e.getMessage());
703
          REVERRORNUMBER++;
704
      }
705
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
706
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
707
                                      " because " + e.getMessage());
708
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
709
    		  + "writing Replication: " + e.getMessage());
710
    }
711
    finally
712
    {
713
       //return DBConnection
714
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
715
    }//finally
716
    logMetacat.info("replication.create localId:" + accNumber);
717
  }
718

    
719

    
720

    
721
  /* Handle delete single document*/
722
  private void handleDeleteSingleDocument(String docId, String notifyServer)
723
               throws HandlerException
724
  {
725
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
726
    DBConnection dbConn = null;
727
    int serialNumber = -1;
728
    try
729
    {
730
      // Get DBConnection from pool
731
      dbConn=DBConnectionPool.
732
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
733
      serialNumber=dbConn.getCheckOutSerialNumber();
734
      if(!alreadyDeleted(docId))
735
      {
736

    
737
         //because delete method docid should have rev number
738
         //so we just add one for it. This rev number is no sence.
739
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
740
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
741
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
742
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
743
         URL u = new URL("https://"+notifyServer);
744
         String ip = getIpFromURL(u);
745
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
746
      }
747

    
748
    }//try
749
    catch(McdbDocNotFoundException e)
750
    {
751
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
752
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
753
                                 " in db because because " + e.getMessage());
754
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
755
    		  + "when handling document: " + e.getMessage());
756
    }
757
    catch(InsufficientKarmaException e)
758
    {
759
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
760
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
761
                                 " in db because because " + e.getMessage());
762
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
763
    		  + "when handling document: " + e.getMessage());
764
    }
765
    catch(SQLException e)
766
    {
767
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
768
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
769
                                 " in db because because " + e.getMessage());
770
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
771
    		  + "when handling document: " + e.getMessage());
772
    }
773
    catch(Exception e)
774
    {
775
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
776
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
777
                                 " in db because because " + e.getMessage());
778
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
779
    		  + "when handling document: " + e.getMessage());
780
    }
781
    finally
782
    {
783
       //return DBConnection
784
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
785
    }//finally
786
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
787
  }
788

    
789
  /* Handle updateLastCheckTimForSingleServer*/
790
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
791
                                                  throws HandlerException
792
  {
793
    String server = repServer.getServerName();
794
    DBConnection dbConn = null;
795
    int serialNumber = -1;
796
    PreparedStatement pstmt = null;
797
    try
798
    {
799
      // Get DBConnection from pool
800
      dbConn=DBConnectionPool.
801
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
802
      serialNumber=dbConn.getCheckOutSerialNumber();
803

    
804
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
805
      // Get time from remote server
806
      URL dateurl = new URL("https://" + server + "?server="+
807
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
808
      String datexml = ReplicationService.getURLContent(dateurl);
809
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
810
      if (datexml != null && !datexml.equals("")) {
811
    	  
812
    	  // parse the ISO datetime
813
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
814
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
815
         
816
         StringBuffer sql = new StringBuffer();
817
         sql.append("update xml_replication set last_checked = ? ");
818
         sql.append(" where server like ? ");
819
         pstmt = dbConn.prepareStatement(sql.toString());
820
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
821
         pstmt.setString(2, server);
822
         
823
         pstmt.executeUpdate();
824
         dbConn.commit();
825
         pstmt.close();
826
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
827
                                      + server);
828
      }//if
829
      else
830
      {
831

    
832
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
833
                                  server + " in db because couldn't get time "
834
                                  );
835
         throw new Exception("Couldn't get time for server "+ server);
836
      }
837

    
838
    }//try
839
    catch(Exception e)
840
    {
841
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
842
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
843
                                server + " in db because because " + e.getMessage());
844
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
845
    		  + "Error updating last checked time: " + e.getMessage());
846
    }
847
    finally
848
    {
849
       //return DBConnection
850
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
851
    }//finally
852
  }
853
  
854
  	/**
855
	 * Handle replicate system metadata
856
	 * 
857
	 * @param remoteserver
858
	 * @param guid
859
	 * @throws HandlerException
860
	 */
861
	private void handleSystemMetadata(String remoteserver, String guid) 
862
		throws HandlerException {
863
		try {
864

    
865
			// Try get the system metadata from remote server
866
			String sysMetaURLStr = "https://" + remoteserver + "?server="
867
					+ MetacatUtil.getLocalReplicationServerName()
868
					+ "&action=getsystemmetadata&guid=" + guid;
869
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
870
			URL sysMetaUrl = new URL(sysMetaURLStr);
871
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
872
							+ sysMetaUrl.toString());
873
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
874

    
875
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
876

    
877
			// process system metadata
878
			if (systemMetadataXML != null) {
879
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
880
								new ByteArrayInputStream(systemMetadataXML
881
										.getBytes("UTF-8")));
882
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
883
				// submit for indexing
884
                MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
885
			}
886

    
887
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
888
							+ guid);
889

    
890
			String ip = getIpFromURL(sysMetaUrl);
891
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
892

    
893
		} catch (Exception e) {
894
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
895
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
896
			logReplication
897
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
898
							+ guid + " into db because " + e.getMessage());
899
			throw new HandlerException(
900
					"ReplicationHandler.handleSystemMetadata - generic exception "
901
							+ "writing Replication: " + e.getMessage());
902
		}
903

    
904
	}
905

    
906
  /**
907
   * updates xml_catalog with entries from other servers.
908
   */
909
  private void updateCatalog()
910
  {
911
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
912
    // ReplicationServer object in server list
913
    ReplicationServer replServer = null;
914
    PreparedStatement pstmt = null;
915
    String server = null;
916

    
917

    
918
    // Go through each ReplicationServer object in sererlist
919
    for (int j=0; j<serverList.size(); j++)
920
    {
921
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
922
      Vector<String> publicId = new Vector<String>();
923
      try
924
      {
925
        // Get ReplicationServer object from server list
926
        replServer = serverList.serverAt(j);
927
        // Get server name from the ReplicationServer object
928
        server = replServer.getServerName();
929
        // Try to get catalog
930
        URL u = new URL("https://" + server + "?server="+
931
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
932
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
933
        String catxml = ReplicationService.getURLContent(u);
934

    
935
        // Make sure there are not error, no empty string
936
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
937
        {
938
          throw new Exception("Couldn't get catalog list form server " +server);
939
        }
940
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
941
        CatalogMessageHandler cmh = new CatalogMessageHandler();
942
        XMLReader catparser = initParser(cmh);
943
        catparser.parse(new InputSource(new StringReader(catxml)));
944
        //parse the returned catalog xml and put it into a vector
945
        remoteCatalog = cmh.getCatalogVect();
946

    
947
        // Make sure remoteCatalog is not empty
948
        if (remoteCatalog.isEmpty())
949
        {
950
          throw new Exception("Couldn't get catalog list form server " +server);
951
        }
952

    
953
        String localcatxml = ReplicationService.getCatalogXML();
954

    
955
        // Make sure local catalog is no empty
956
        if (localcatxml==null||localcatxml.equals(""))
957
        {
958
          throw new Exception("Couldn't get catalog list form server " +server);
959
        }
960

    
961
        cmh = new CatalogMessageHandler();
962
        catparser = initParser(cmh);
963
        catparser.parse(new InputSource(new StringReader(localcatxml)));
964
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
965

    
966
        //now we have the catalog from the remote server and this local server
967
        //we now need to compare the two and merge the differences.
968
        //the comparison is base on the public_id fields which is the 4th
969
        //entry in each row vector.
970
        publicId = new Vector<String>();
971
        for(int i=0; i<localCatalog.size(); i++)
972
        {
973
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
974
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
975
          publicId.add(new String((String)v.elementAt(3)));
976
        }
977
      }//try
978
      catch (Exception e)
979
      {
980
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
981
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
982
                                    server + " because " +e.getMessage());
983
      }//catch
984

    
985
      for(int i=0; i<remoteCatalog.size(); i++)
986
      {
987
         // DConnection
988
        DBConnection dbConn = null;
989
        // DBConnection checkout serial number
990
        int serialNumber = -1;
991
        try
992
        {
993
            dbConn=DBConnectionPool.
994
                  getDBConnection("ReplicationHandler.updateCatalog");
995
            serialNumber=dbConn.getCheckOutSerialNumber();
996
            Vector<String> v = remoteCatalog.elementAt(i);
997
            //logMetacat.debug("v2: " + v.toString());
998
            //logMetacat.debug("i: " + i);
999
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
1000
            //logMetacat.debug("publicID: " + publicId.toString());
1001
            logReplication.info
1002
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
1003
           if(!publicId.contains(v.elementAt(3)))
1004
           { //so we don't have this public id in our local table so we need to
1005
             //add it.
1006
        	   
1007
        	   // check where it is pointing first, before adding
1008
        	   String entryType = (String)v.elementAt(0);
1009
        	   if (entryType.equals(DocumentImpl.SCHEMA)) {
1010
	        	   String nameSpace = (String)v.elementAt(3);
1011
	        	   String schemaLocation = (String)v.elementAt(4);
1012
	        	   SchemaLocationResolver slr = new SchemaLocationResolver(nameSpace, schemaLocation);
1013
	        	   try {
1014
	        		   slr.resolveNameSpace();
1015
	        	   } catch (Exception e) {
1016
	        		   String msg = "Could not save remote schema to xml catalog. " + "nameSpace: " + nameSpace + " location: " + schemaLocation;
1017
	        		   logMetacat.error(msg, e);
1018
	        		   logReplication.error(msg, e);
1019
	        	   }
1020
	        	   // skip whatever else we were going to do
1021
	        	   continue;
1022
        	   }
1023
        	   
1024
             //logMetacat.debug("in if");
1025
             StringBuffer sql = new StringBuffer();
1026
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
1027
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
1028
             sql.append("?,?)");
1029
             //logMetacat.debug("sql: " + sql.toString());
1030
             pstmt = dbConn.prepareStatement(sql.toString());
1031
             pstmt.setString(1, (String)v.elementAt(0));
1032
             pstmt.setString(2, (String)v.elementAt(1));
1033
             pstmt.setString(3, (String)v.elementAt(2));
1034
             pstmt.setString(4, (String)v.elementAt(3));
1035
             pstmt.setString(5, (String)v.elementAt(4));
1036
             pstmt.execute();
1037
             pstmt.close();
1038
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
1039
                               (String)v.elementAt(3) + " from server"+server);
1040
           }
1041
        }
1042
        catch(Exception e)
1043
        {
1044
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1045
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1046
                                    server + " because " +e.getMessage());
1047
        }//catch
1048
        finally
1049
        {
1050
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1051
        }//finally
1052
      }//for remote catalog
1053
    }//for server list
1054
    logReplication.info("End of updateCatalog");
1055
  }
1056

    
1057
  /**
1058
   * Method that returns true if docid has already been "deleted" from metacat.
1059
   * This method really implements a truth table for deleted documents
1060
   * The table is (a docid in one of the tables is represented by the X):
1061
   * xml_docs      xml_revs      deleted?
1062
   * ------------------------------------
1063
   *   X             X             FALSE
1064
   *   X             _             FALSE
1065
   *   _             X             TRUE
1066
   *   _             _             TRUE
1067
   */
1068
  private static boolean alreadyDeleted(String docid) throws HandlerException
1069
  {
1070
    DBConnection dbConn = null;
1071
    int serialNumber = -1;
1072
    PreparedStatement pstmt = null;
1073
    try
1074
    {
1075
      dbConn=DBConnectionPool.
1076
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1077
      serialNumber=dbConn.getCheckOutSerialNumber();
1078
      boolean xml_docs = false;
1079
      boolean xml_revs = false;
1080

    
1081
      StringBuffer sb = new StringBuffer();
1082
      sb.append("select docid from xml_revisions where docid like ? ");
1083
      pstmt = dbConn.prepareStatement(sb.toString());
1084
      pstmt.setString(1, docid);
1085
      pstmt.execute();
1086
      ResultSet rs = pstmt.getResultSet();
1087
      boolean tablehasrows = rs.next();
1088
      if(tablehasrows)
1089
      {
1090
        xml_revs = true;
1091
      }
1092

    
1093
      sb = new StringBuffer();
1094
      sb.append("select docid from xml_documents where docid like '");
1095
      sb.append(docid).append("'");
1096
      pstmt.close();
1097
      pstmt = dbConn.prepareStatement(sb.toString());
1098
      //increase usage count
1099
      dbConn.increaseUsageCount(1);
1100
      pstmt.execute();
1101
      rs = pstmt.getResultSet();
1102
      tablehasrows = rs.next();
1103
      pstmt.close();
1104
      if(tablehasrows)
1105
      {
1106
        xml_docs = true;
1107
      }
1108

    
1109
      if(xml_docs && xml_revs)
1110
      {
1111
        return false;
1112
      }
1113
      else if(xml_docs && !xml_revs)
1114
      {
1115
        return false;
1116
      }
1117
      else if(!xml_docs && xml_revs)
1118
      {
1119
        return true;
1120
      }
1121
      else if(!xml_docs && !xml_revs)
1122
      {
1123
        return true;
1124
      }
1125
    }
1126
    catch(Exception e)
1127
    {
1128
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1129
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1130
                          e.getMessage());
1131
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1132
    		  + e.getMessage());
1133
    }
1134
    finally
1135
    {
1136
      try
1137
      {
1138
        pstmt.close();
1139
      }//try
1140
      catch (SQLException ee)
1141
      {
1142
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1143
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1144
                          "to close pstmt: "+ee.getMessage());
1145
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1146
      		  + ee.getMessage());
1147
      }//catch
1148
      finally
1149
      {
1150
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1151
      }//finally
1152
    }//finally
1153
    return false;
1154
  }
1155

    
1156

    
1157
  /**
1158
   * Method to initialize the message parser
1159
   */
1160
  public static XMLReader initParser(DefaultHandler dh)
1161
          throws HandlerException
1162
  {
1163
    XMLReader parser = null;
1164

    
1165
    try {
1166
      ContentHandler chandler = dh;
1167

    
1168
      // Get an instance of the parser
1169
      String parserName = PropertyService.getProperty("xml.saxparser");
1170
      parser = XMLReaderFactory.createXMLReader(parserName);
1171

    
1172
      // Turn off validation
1173
      parser.setFeature("http://xml.org/sax/features/validation", false);
1174

    
1175
      parser.setContentHandler((ContentHandler)chandler);
1176
      parser.setErrorHandler((ErrorHandler)chandler);
1177

    
1178
    } catch (SAXException se) {
1179
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1180
    		  + " initializing parser: " + se.getMessage());
1181
    } catch (PropertyNotFoundException pnfe) {
1182
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1183
      		  + " getting parser name: " + pnfe.getMessage());
1184
    } 
1185

    
1186
    return parser;
1187
  }
1188

    
1189
  /**
1190
	 * This method will combine given time string(in short format) to current
1191
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1192
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1193
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1194
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1195
	 * 
1196
	 * @param givenTime
1197
	 *            the format should be "10:00 AM " or "2:00 PM"
1198
	 * @return
1199
	 * @throws Exception
1200
	 */
1201
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1202
  {
1203
	  try {
1204
     Date givenDate = parseTime(givenTime);
1205
     Date newDate = null;
1206
     Date now = new Date();
1207
     String currentTimeString = getTimeString(now);
1208
     Date currentTime = parseTime(currentTimeString); 
1209
     if ( currentTime.getTime() >= givenDate.getTime())
1210
     {
1211
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1212
        String dateAndTime = getDateString(now) + " " + givenTime;
1213
        Date combinationDate = parseDateTime(dateAndTime);
1214
        // new date should plus 24 hours to make is the second day
1215
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1216
     }
1217
     else
1218
     {
1219
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1220
         String dateAndTime = getDateString(now) + " " + givenTime;
1221
         newDate = parseDateTime(dateAndTime);
1222
     }
1223
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1224
     return newDate;
1225
	  } catch (ParseException pe) {
1226
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1227
				  + "parsing error: "  + pe.getMessage());
1228
	  }
1229
  }
1230

    
1231
  /*
1232
	 * parse a given string to Time in short format. For example, given time is
1233
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1234
	 */
1235
  private static Date parseTime(String timeString) throws ParseException
1236
  {
1237
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1238
    Date time = format.parse(timeString); 
1239
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1240
                              +time.toString());
1241
    return time;
1242

    
1243
  }
1244
  
1245
  /*
1246
   * Parse a given string to date and time. Date format is long and time
1247
   * format is short.
1248
   */
1249
  private static Date parseDateTime(String timeString) throws ParseException
1250
  {
1251
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1252
    Date time = format.parse(timeString);
1253
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1254
                             time.toString());
1255
    return time;
1256
  }
1257
  
1258
  /*
1259
   * Get a date string from a Date object. The date format will be long
1260
   */
1261
  private static String getDateString(Date now)
1262
  {
1263
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1264
     String s = df.format(now);
1265
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1266
     return s;
1267
  }
1268
  
1269
  /*
1270
   * Get a time string from a Date object, the time format will be short
1271
   */
1272
  private static String getTimeString(Date now)
1273
  {
1274
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1275
     String s = df.format(now);
1276
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1277
     return s;
1278
  }
1279
  
1280
  
1281
  /*
1282
	 * This method will go through the docid list both in xml_Documents table
1283
	 * and in xml_revisions table @author tao
1284
	 */
1285
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1286
		boolean dataFile = false;
1287
		for (int j = 0; j < docList.size(); j++) {
1288
			// initial dataFile is false
1289
			dataFile = false;
1290
			// w is information for one document, information contain
1291
			// docid, rev, server or datafile.
1292
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1293
			// Check if the vector w contain "datafile"
1294
			// If it has, this document is data file
1295
			try {
1296
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1297
					dataFile = true;
1298
				}
1299
			} catch (PropertyNotFoundException pnfe) {
1300
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1301
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1302
						+ "Leaving as false: " + pnfe.getMessage());
1303
			}
1304
			// logMetacat.debug("w: " + w.toString());
1305
			// Get docid
1306
			String docid = (String) w.elementAt(0);
1307
			logReplication.info("docid: " + docid);
1308
			// Get revision number
1309
			int rev = Integer.parseInt((String) w.elementAt(1));
1310
			logReplication.info("rev: " + rev);
1311
			// Get remote server name (it is may not be doc home server because
1312
			// the new hub feature
1313
			String remoteServer = (String) w.elementAt(2);
1314
			remoteServer = remoteServer.trim();
1315

    
1316
			try {
1317
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1318
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1319
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1320
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1321
				} else {
1322
					continue;
1323
				}
1324

    
1325
			} catch (Exception e) {
1326
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1327
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1328
						+ " in time replication" + e.getMessage(), e);
1329
				continue;
1330
			}
1331
			
1332
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1333
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1334
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1335
	        }
1336
	        
1337
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1338
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1339
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1340
	        }
1341

    
1342
		}// for update docs
1343

    
1344
	}
1345
   
1346
   /*
1347
	 * This method will handle doc in xml_documents table.
1348
	 */
1349
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1350
                                        throws HandlerException
1351
   {
1352
       // compare the update rev and local rev to see what need happen
1353
       int localrev = -1;
1354
       String action = null;
1355
       boolean flag = false;
1356
       try
1357
       {
1358
    	 long docQueryStartTime = System.currentTimeMillis();
1359
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1360
         long docQueryEndTime = System.currentTimeMillis();
1361
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1362
         _xmlDocQueryCount++;
1363
       }
1364
       catch (SQLException e)
1365
       {
1366
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1367
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1368
                                " be found because " + e.getMessage());
1369
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1370
                 "written because error happend to find it's local revision");
1371
         DOCERRORNUMBER++;
1372
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1373
                 " be found: " + e.getMessage());
1374
       }
1375
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1376
                               localrev);
1377

    
1378
       //check the revs for an update because this document is in the
1379
       //local DB, it might be out of date.
1380
       if (localrev == -1)
1381
       {
1382
          // check if the revision is in the revision table
1383
    	   Vector<Integer> localRevVector = null;
1384
    	 try {
1385
        	 long revQueryStartTime = System.currentTimeMillis();
1386
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1387
             long revQueryEndTime = System.currentTimeMillis();
1388
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1389
             _xmlRevQueryCount++;
1390
    	 } catch (SQLException sqle) {
1391
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1392
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1393
    	 }
1394
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1395
         {
1396
             // this version was deleted, so don't need replicate
1397
             flag = false;
1398
         }
1399
         else
1400
         {
1401
           //insert this document as new because it is not in the local DB
1402
           action = "INSERT";
1403
           flag = true;
1404
         }
1405
       }
1406
       else
1407
       {
1408
         if(localrev == rev)
1409
         {
1410
           // Local meatacat has the same rev to remote host, don't need
1411
           // update and flag set false
1412
           flag = false;
1413
         }
1414
         else if(localrev < rev)
1415
         {
1416
           //this document needs to be updated so send an read request
1417
           action = "UPDATE";
1418
           flag = true;
1419
         }
1420
       }
1421
       
1422
       String accNumber = null;
1423
       try {
1424
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1425
       } catch (PropertyNotFoundException pnfe) {
1426
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1427
    			   + "account number separator : " + pnfe.getMessage());
1428
       }
1429
       // this is non-data file
1430
       if(flag && !dataFile)
1431
       {
1432
         try
1433
         {
1434
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1435
         }
1436
         catch(HandlerException he)
1437
         {
1438
           // skip this document
1439
           throw he;
1440
         }
1441
       }//if for non-data file
1442

    
1443
        // this is for data file
1444
       if(flag && dataFile)
1445
       {
1446
         try
1447
         {
1448
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1449
         }
1450
         catch(HandlerException he)
1451
         {
1452
           // skip this data file
1453
           throw he;
1454
         }
1455

    
1456
       }//for data file
1457
   }
1458
   
1459
   /*
1460
    * This method will handle doc in xml_documents table.
1461
    */
1462
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1463
                                        throws HandlerException
1464
   {
1465
       // compare the update rev and local rev to see what need happen
1466
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1467
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1468
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1469
       Vector<Integer> localrev = null;
1470
       String action = "INSERT";
1471
       boolean flag = false;
1472
       try
1473
       {
1474
      	 long revQueryStartTime = System.currentTimeMillis();
1475
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1476
         long revQueryEndTime = System.currentTimeMillis();
1477
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1478
         _xmlRevQueryCount++;
1479
       }
1480
       catch (SQLException sqle)
1481
       {
1482
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1483
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1484
                                " be found because " + sqle.getMessage());
1485
         REVERRORNUMBER++;
1486
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1487
        		 + sqle.getMessage());
1488
       }
1489
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1490
                               localrev.toString());
1491
       
1492
       // if the rev is not in the xml_revision, we need insert it
1493
       if (!localrev.contains(new Integer(rev)))
1494
       {
1495
           flag = true;    
1496
       }
1497
     
1498
       String accNumber = null;
1499
       try {
1500
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1501
       } catch (PropertyNotFoundException pnfe) {
1502
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1503
    			   + "account number separator : " + pnfe.getMessage());
1504
       }
1505
       // this is non-data file
1506
       if(flag && !dataFile)
1507
       {
1508
         try
1509
         {
1510
           
1511
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1512
         }
1513
         catch(HandlerException he)
1514
         {
1515
           // skip this document
1516
           throw he;
1517
         }
1518
       }//if for non-data file
1519

    
1520
        // this is for data file
1521
       if(flag && dataFile)
1522
       {
1523
         try
1524
         {
1525
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1526
         }
1527
         catch(HandlerException he)
1528
         {
1529
           // skip this data file
1530
           throw he;
1531
         }
1532

    
1533
       }//for data file
1534
   }
1535
   
1536
   /*
1537
    * Return a ip address for given url
1538
    */
1539
   private String getIpFromURL(URL url)
1540
   {
1541
	   String ip = null;
1542
	   try
1543
	   {
1544
	      InetAddress address = InetAddress.getByName(url.getHost());
1545
	      ip = address.getHostAddress();
1546
	   }
1547
	   catch(UnknownHostException e)
1548
	   {
1549
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1550
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1551
                   +e.getMessage());
1552
	   }
1553

    
1554
	   return ip;
1555
   }
1556
  
1557
}
1558

    
(3-3/7)