Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-08-23 21:45:07 -0700 (Thu, 23 Aug 2012) $'
10
 * '$Revision: 7358 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.TimerTask;
44
import java.util.Vector;
45

    
46

    
47
import org.apache.log4j.Logger;
48
import org.dataone.service.types.v1.SystemMetadata;
49
import org.dataone.service.util.DateTimeMarshaller;
50
import org.dataone.service.util.TypeMarshaller;
51
import org.xml.sax.ContentHandler;
52
import org.xml.sax.ErrorHandler;
53
import org.xml.sax.InputSource;
54
import org.xml.sax.SAXException;
55
import org.xml.sax.XMLReader;
56
import org.xml.sax.helpers.DefaultHandler;
57
import org.xml.sax.helpers.XMLReaderFactory;
58

    
59
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
60
import edu.ucsb.nceas.metacat.DBUtil;
61
import edu.ucsb.nceas.metacat.DocInfoHandler;
62
import edu.ucsb.nceas.metacat.DocumentImpl;
63
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
64
import edu.ucsb.nceas.metacat.EventLog;
65
import edu.ucsb.nceas.metacat.IdentifierManager;
66
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
67
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
68
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
69
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
70
import edu.ucsb.nceas.metacat.database.DBConnection;
71
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
72
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
73
import edu.ucsb.nceas.metacat.properties.PropertyService;
74
import edu.ucsb.nceas.metacat.shared.HandlerException;
75
import edu.ucsb.nceas.metacat.util.DocumentUtil;
76
import edu.ucsb.nceas.metacat.util.MetacatUtil;
77
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
78
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
79

    
80

    
81

    
82
/**
83
 * This class handles deltaT replication checking.  Whenever this TimerTask
84
 * is fired it checks each server in xml_replication for updates and updates
85
 * the local db as needed.
86
 */
87
public class ReplicationHandler extends TimerTask
88
{
89
  int serverCheckCode = 1;
90
  ReplicationServerList serverList = null;
91
  //PrintWriter out;
92
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
93
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
94
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
95
  
96
  private static int DOCINSERTNUMBER = 1;
97
  private static int DOCERRORNUMBER  = 1;
98
  private static int REVINSERTNUMBER = 1;
99
  private static int REVERRORNUMBER  = 1;
100
  
101
  private static int _xmlDocQueryCount = 0;
102
  private static int _xmlRevQueryCount = 0;
103
  private static long _xmlDocQueryTime = 0;
104
  private static long _xmlRevQueryTime = 0;
105
  
106
  
107
  public ReplicationHandler()
108
  {
109
    //this.out = o;
110
    serverList = new ReplicationServerList();
111
  }
112

    
113
  public ReplicationHandler(int serverCheckCode)
114
  {
115
    //this.out = o;
116
    this.serverCheckCode = serverCheckCode;
117
    serverList = new ReplicationServerList();
118
  }
119

    
120
  /**
121
   * Method that implements TimerTask.run().  It runs whenever the timer is
122
   * fired.
123
   */
124
  public void run()
125
  {
126
    //find out the last_checked time of each server in the server list and
127
    //send a query to each server to see if there are any documents in
128
    //xml_documents with an update_date > last_checked
129
	  
130
      //if serverList is null, metacat don't need to replication
131
      if (serverList==null||serverList.isEmpty())
132
      {
133
        return;
134
      }
135
      updateCatalog();
136
      update();
137
      //conn.close();
138
  }
139

    
140
  /**
141
   * Method that uses revision tagging for replication instead of update_date.
142
   */
143
  private void update()
144
  {
145
	  
146
	  _xmlDocQueryCount = 0;
147
	  _xmlRevQueryCount = 0;
148
	  _xmlDocQueryTime = 0;
149
	  _xmlRevQueryTime = 0;
150
    /*
151
     Pseudo-algorithm
152
     - request a doc list from each server in xml_replication
153
     - check the rev number of each of those documents agains the
154
       documents in the local database
155
     - pull any documents that have a lesser rev number on the local server
156
       from the remote server
157
     - delete any documents that still exist in the local xml_documents but
158
       are in the deletedDocuments tag of the remote host response.
159
     - update last_checked to keep track of the last time it was checked.
160
       (this info is theoretically not needed using this system but probably
161
       should be kept anyway)
162
    */
163

    
164
    ReplicationServer replServer = null; // Variable to store the
165
                                        // ReplicationServer got from
166
                                        // Server list
167
    String server = null; // Variable to store server name
168
//    String update;
169
    Vector<InputStream> responses = new Vector<InputStream>();
170
    URL u;
171
    long replicationStartTime = System.currentTimeMillis();
172
    long timeToGetServerList = 0;
173
    
174
    //Check for every server in server list to get updated list and put
175
    // them in to response
176
    long startTimeToGetServers = System.currentTimeMillis();
177
    for (int i=0; i<serverList.size(); i++)
178
    {
179
        // Get ReplicationServer object from server list
180
        replServer = serverList.serverAt(i);
181
        // Get server name from ReplicationServer object
182
        server = replServer.getServerName().trim();
183
        InputStream result = null;
184
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
185
        // Send command to that server to get updated docid information
186
        try
187
        {
188
          u = new URL("https://" + server + "?server="
189
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
190
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
191
          result = ReplicationService.getURLStream(u);
192
        }
193
        catch (Exception e)
194
        {
195
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
196
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
197
                       "for server " + server + " because "+e.getMessage());
198
          continue;
199
        }
200

    
201
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
202
        //check if result have error or not, if has skip it.
203
        // TODO: check for error in stream
204
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
205
        if (result == null) {
206
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
207
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
208
                       "for server " + server + " because "+result);
209
          continue;
210
        }
211
        //Add result to vector
212
        responses.add(result);
213
    }
214
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
215

    
216
    //make sure that there is updated file list
217
    //If response is null, metacat don't need do anything
218
    if (responses==null || responses.isEmpty())
219
    {
220
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
221
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
222
                           "every server and failed to replicate");
223
        return;
224
    }
225

    
226

    
227
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
228
    //               "document information: "+ responses.toString());
229
    
230
    long totalServerListParseTime = 0;
231
    // go through response vector(it contains updated vector and delete vector
232
    for(int i=0; i<responses.size(); i++)
233
    {
234
    	long startServerListParseTime = System.currentTimeMillis();
235
    	XMLReader parser;
236
    	ReplMessageHandler message = new ReplMessageHandler();
237
    	try
238
        {
239
          parser = initParser(message);
240
        }
241
        catch (Exception e)
242
        {
243
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
244
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
245
                                " initParser for message and " +e.getMessage());
246
           // stop replication
247
           return;
248
        }
249
    	
250
        try
251
        {
252
          parser.parse(new InputSource(responses.elementAt(i)));
253
        }
254
        catch(Exception e)
255
        {
256
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
257
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
258
                                   "because "+ e.getMessage());
259
          continue;
260
        }
261
        //v is the list of updated documents
262
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
263
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
264
        //d is the list of deleted documents
265
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
266
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
267
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
268
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
269
        // go though every element in updated document vector
270
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
271
        //handle deleted docs
272
        for(int k=0; k<deleteList.size(); k++)
273
        { //delete the deleted documents;
274
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
275
          String docId = (String)w.elementAt(0);
276
          try
277
          {
278
            handleDeleteSingleDocument(docId, server);
279
          }
280
          catch (Exception ee)
281
          {
282
            continue;
283
          }
284
        }//for delete docs
285
        
286
        // handle replicate doc in xml_revision
287
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
288
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
289
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
290
        DOCINSERTNUMBER = 1;
291
        DOCERRORNUMBER  = 1;
292
        REVINSERTNUMBER = 1;
293
        REVERRORNUMBER  = 1;
294
        
295
        // handle system metadata
296
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
297
        for(int k = 0; k < systemMetadataList.size(); k++) { 
298
        	Vector<String> w = systemMetadataList.elementAt(k);
299
        	String guid = (String) w.elementAt(0);
300
        	String remoteserver = (String) w.elementAt(1);
301
        	try {
302
        		handleSystemMetadata(remoteserver, guid);
303
        	}
304
        	catch (Exception ee) {
305
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
306
        		continue;
307
        	}
308
        }
309
        
310
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
311
    }//for response
312

    
313
    //updated last_checked
314
    for (int i=0;i<serverList.size(); i++)
315
    {
316
       // Get ReplicationServer object from server list
317
       replServer = serverList.serverAt(i);
318
       try
319
       {
320
         updateLastCheckTimeForSingleServer(replServer);
321
       }
322
       catch(Exception e)
323
       {
324
         continue;
325
       }
326
    }//for
327
    
328
    long replicationEndTime = System.currentTimeMillis();
329
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
330
    		(replicationEndTime - replicationStartTime));
331
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
332
    		timeToGetServerList);
333
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
334
    		totalServerListParseTime);
335
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
336
    		_xmlDocQueryCount);
337
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
338
    		_xmlDocQueryTime + " ms");
339
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
340
    		_xmlRevQueryCount);
341
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
342
    		_xmlRevQueryTime + " ms");;
343

    
344
  }//update
345

    
346
  /* Handle replicate single xml document*/
347
  private void handleSingleXMLDocument(String remoteserver, String actions,
348
                                       String accNumber, String tableName)
349
               throws HandlerException
350
  {
351
    DBConnection dbConn = null;
352
    int serialNumber = -1;
353
    try
354
    {
355
      // Get DBConnection from pool
356
      dbConn=DBConnectionPool.
357
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
358
      serialNumber=dbConn.getCheckOutSerialNumber();
359
      //if the document needs to be updated or inserted, this is executed
360
      String readDocURLString = "https://" + remoteserver + "?server="+
361
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
362
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
363
      URL u = new URL(readDocURLString);
364

    
365
      // Get docid content
366
      String newxmldoc = ReplicationService.getURLContent(u);
367
      // If couldn't get skip it
368
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
369
      {
370
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
371
      }
372
      //logReplication.info("xml documnet:");
373
      //logReplication.info(newxmldoc);
374

    
375
      // Try get the docid info from remote server
376
      DocInfoHandler dih = new DocInfoHandler();
377
      XMLReader docinfoParser = initParser(dih);
378
      String docInfoURLStr = "https://" + remoteserver +
379
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
380
                       "&action=getdocumentinfo&docid="+accNumber;
381
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
382
      URL docinfoUrl = new URL(docInfoURLStr);
383
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
384
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
385
      
386
      // strip out the system metadata portion
387
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
388
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
389
      
390
   	  // process system metadata if we have it
391
      if (systemMetadataXML != null) {
392
    	  SystemMetadata sysMeta = 
393
    		  TypeMarshaller.unmarshalTypeFromStream(
394
    				  SystemMetadata.class, 
395
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
396
    	  // need the guid-to-docid mapping
397
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
398
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
399
    	  }
400
      	  // save the system metadata
401
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
402
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
403
      }
404
   	  
405
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
406
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
407
      // Get home server of the docid
408
      String docHomeServer = docinfoHash.get("home_server");
409
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
410
     
411
      // dates
412
      String createdDateString = docinfoHash.get("date_created");
413
      String updatedDateString = docinfoHash.get("date_updated");
414
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
415
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
416
      
417
      //docid should include rev number too
418
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
419
                                              (String)docinfoHash.get("rev");*/
420
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
421
      String docType = docinfoHash.get("doctype");
422
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
423

    
424
      String parserBase = null;
425
      // this for eml2 and we need user eml2 parser
426
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
427
      {
428
         parserBase = DocumentImpl.EML200;
429
      }
430
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
431
      {
432
        parserBase = DocumentImpl.EML200;
433
      }
434
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
435
      {
436
        parserBase = DocumentImpl.EML210;
437
      }
438
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
439
      {
440
        parserBase = DocumentImpl.EML210;
441
      }
442
      // Write the document into local host
443
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
444
      String newDocid = wrapper.writeReplication(dbConn,
445
                              newxmldoc,
446
                              docinfoHash.get("public_access"),
447
                              null,  /* the dtd text */
448
                              actions,
449
                              accNumber,
450
                              null, //docinfoHash.get("user_owner"),                              
451
                              null, /* null for groups[] */
452
                              docHomeServer,
453
                              remoteserver, tableName, true,// true is for time replication 
454
                              createdDate,
455
                              updatedDate);
456
      
457
      //set the user information
458
      String user = (String) docinfoHash.get("user_owner");
459
      String updated = (String) docinfoHash.get("user_updated");
460
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
461
      
462
      //process extra access rules 
463
      try {
464
      	// check if we had a guid -> docid mapping
465
      	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
466
      	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
467
      	IdentifierManager.getInstance().getGUID(docid, rev);
468
      	// no need to create the mapping if we have it
469
      } catch (McdbDocNotFoundException mcdbe) {
470
      	// create mapping if we don't
471
      	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
472
      }
473
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
474
      if (xmlAccessDAOList != null) {
475
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
476
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
477
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
478
      			acfsf.insertPermissions(xmlAccessDAO);
479
      		}
480
          }
481
      }
482
      
483
      
484
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
485
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
486
      {
487
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
488
                                     " into "+tableName + " from " +
489
                                         remoteserver);
490
        DOCINSERTNUMBER++;
491
      }
492
      else
493
      {
494
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
495
                  " into "+tableName + " from " +
496
                      remoteserver);
497
          REVINSERTNUMBER++;
498
      }
499
      String ip = getIpFromURL(u);
500
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
501
      
502

    
503
    }//try
504
    catch(Exception e)
505
    {
506
        
507
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
508
        {
509
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
510
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
511
                                       " into "+tableName + " from " +
512
                                           remoteserver + " because "+e.getMessage());
513
          DOCERRORNUMBER++;
514
        }
515
        else
516
        {
517
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
518
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
519
                    " into "+tableName + " from " +
520
                        remoteserver +" because "+e.getMessage());
521
            REVERRORNUMBER++;
522
        }
523
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
524
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
525
                                      " into db because " + e.getMessage(), e);
526
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
527
    		  + "writing Replication: " +e.getMessage());
528
    }
529
    finally
530
    {
531
       //return DBConnection
532
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
533
    }//finally
534
    logMetacat.info("replication.create localId:" + accNumber);
535
  }
536

    
537

    
538

    
539
  /* Handle replicate single xml document*/
540
  private void handleSingleDataFile(String remoteserver, String actions,
541
                                    String accNumber, String tableName)
542
               throws HandlerException
543
  {
544
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
545
    DBConnection dbConn = null;
546
    int serialNumber = -1;
547
    try
548
    {
549
      // Get DBConnection from pool
550
      dbConn=DBConnectionPool.
551
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
552
      serialNumber=dbConn.getCheckOutSerialNumber();
553
      // Try get docid info from remote server
554
      DocInfoHandler dih = new DocInfoHandler();
555
      XMLReader docinfoParser = initParser(dih);
556
      String docInfoURLString = "https://" + remoteserver +
557
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
558
                  "&action=getdocumentinfo&docid="+accNumber;
559
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
560
      URL docinfoUrl = new URL(docInfoURLString);
561

    
562
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
563
      
564
      // strip out the system metadata portion
565
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
566
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
567
   	  
568
   	  // process system metadata
569
      if (systemMetadataXML != null) {
570
    	  SystemMetadata sysMeta = 
571
    		TypeMarshaller.unmarshalTypeFromStream(
572
    				  SystemMetadata.class, 
573
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
574
    	  // need the guid-to-docid mapping
575
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
576
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
577
    	  }
578
    	  // save the system metadata
579
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
580

    
581
      }
582
   	  
583
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
584
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
585
      
586
      // Get docid name (such as acl or dataset)
587
      String docName = docinfoHash.get("docname");
588
      // Get doc type (eml public id)
589
      String docType = docinfoHash.get("doctype");
590
      // Get docid home sever. it might be different to remoteserver
591
      // because of hub feature
592
      String docHomeServer = docinfoHash.get("home_server");
593
      String createdDateString = docinfoHash.get("date_created");
594
      String updatedDateString = docinfoHash.get("date_updated");
595
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
596
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
597
      //docid should include rev number too
598
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
599
                                              (String)docinfoHash.get("rev");*/
600

    
601
      String datafilePath = PropertyService.getProperty("application.datafilepath");
602
      // Get data file content
603
      String readDataURLString = "https://" + remoteserver + "?server="+
604
                                        MetacatUtil.getLocalReplicationServerName()+
605
                                            "&action=readdata&docid="+accNumber;
606
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
607
      URL u = new URL(readDataURLString);
608
      InputStream input = ReplicationService.getURLStream(u);
609
      //register data file into xml_documents table and wite data file
610
      //into file system
611
      if ( input != null)
612
      {
613
        DocumentImpl.writeDataFileInReplication(input,
614
                                                datafilePath,
615
                                                docName,docType,
616
                                                accNumber,
617
                                                null,
618
                                                docHomeServer,
619
                                                remoteserver,
620
                                                tableName,
621
                                                true, //true means timed replication
622
                                                createdDate,
623
                                                updatedDate);
624
                                         
625
        //set the user information
626
        String user = (String) docinfoHash.get("user_owner");
627
		String updated = (String) docinfoHash.get("user_updated");
628
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
629
        
630
        //process extra access rules
631
        try {
632
        	// check if we had a guid -> docid mapping
633
        	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
634
        	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
635
        	IdentifierManager.getInstance().getGUID(docid, rev);
636
        	// no need to create the mapping if we have it
637
        } catch (McdbDocNotFoundException mcdbe) {
638
        	// create mapping if we don't
639
        	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
640
        }
641
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
642
        if (xmlAccessDAOList != null) {
643
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
644
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
645
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
646
        			acfsf.insertPermissions(xmlAccessDAO);
647
        		}
648
            }
649
        }
650
        
651
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
652
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
653
                                    remote server);*/
654
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
655
        {
656
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
657
                                       " into "+tableName + " from " +
658
                                           remoteserver);
659
          DOCINSERTNUMBER++;
660
        }
661
        else
662
        {
663
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
664
                    " into "+tableName + " from " +
665
                        remoteserver);
666
            REVINSERTNUMBER++;
667
        }
668
        String ip = getIpFromURL(u);
669
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
670
        
671
      }//if
672
      else
673
      {
674
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
675
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
676
      }//else
677

    
678
    }//try
679
    catch(Exception e)
680
    {
681
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
682
                                      " because " +e.getMessage());*/
683
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
684
      {
685
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
686
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
687
                                     " into " + tableName + " from " +
688
                                         remoteserver + " because " + e.getMessage());
689
        DOCERRORNUMBER++;
690
      }
691
      else
692
      {
693
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
694
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
695
                  " into " + tableName + " from " +
696
                      remoteserver +" because "+ e.getMessage());
697
          REVERRORNUMBER++;
698
      }
699
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
700
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
701
                                      " because " + e.getMessage());
702
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
703
    		  + "writing Replication: " + e.getMessage());
704
    }
705
    finally
706
    {
707
       //return DBConnection
708
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
709
    }//finally
710
    logMetacat.info("replication.create localId:" + accNumber);
711
  }
712

    
713

    
714

    
715
  /* Handle delete single document*/
716
  private void handleDeleteSingleDocument(String docId, String notifyServer)
717
               throws HandlerException
718
  {
719
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
720
    DBConnection dbConn = null;
721
    int serialNumber = -1;
722
    try
723
    {
724
      // Get DBConnection from pool
725
      dbConn=DBConnectionPool.
726
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
727
      serialNumber=dbConn.getCheckOutSerialNumber();
728
      if(!alreadyDeleted(docId))
729
      {
730

    
731
         //because delete method docid should have rev number
732
         //so we just add one for it. This rev number is no sence.
733
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
734
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
735
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
736
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
737
         URL u = new URL("https://"+notifyServer);
738
         String ip = getIpFromURL(u);
739
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
740
      }
741

    
742
    }//try
743
    catch(McdbDocNotFoundException e)
744
    {
745
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
746
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
747
                                 " in db because because " + e.getMessage());
748
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
749
    		  + "when handling document: " + e.getMessage());
750
    }
751
    catch(InsufficientKarmaException e)
752
    {
753
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
754
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
755
                                 " in db because because " + e.getMessage());
756
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
757
    		  + "when handling document: " + e.getMessage());
758
    }
759
    catch(SQLException e)
760
    {
761
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
762
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
763
                                 " in db because because " + e.getMessage());
764
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
765
    		  + "when handling document: " + e.getMessage());
766
    }
767
    catch(Exception e)
768
    {
769
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
770
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
771
                                 " in db because because " + e.getMessage());
772
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
773
    		  + "when handling document: " + e.getMessage());
774
    }
775
    finally
776
    {
777
       //return DBConnection
778
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
779
    }//finally
780
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
781
  }
782

    
783
  /* Handle updateLastCheckTimForSingleServer*/
784
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
785
                                                  throws HandlerException
786
  {
787
    String server = repServer.getServerName();
788
    DBConnection dbConn = null;
789
    int serialNumber = -1;
790
    PreparedStatement pstmt = null;
791
    try
792
    {
793
      // Get DBConnection from pool
794
      dbConn=DBConnectionPool.
795
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
796
      serialNumber=dbConn.getCheckOutSerialNumber();
797

    
798
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
799
      // Get time from remote server
800
      URL dateurl = new URL("https://" + server + "?server="+
801
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
802
      String datexml = ReplicationService.getURLContent(dateurl);
803
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
804
      if (datexml != null && !datexml.equals("")) {
805
    	  
806
    	  // parse the ISO datetime
807
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
808
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
809
         
810
         StringBuffer sql = new StringBuffer();
811
         sql.append("update xml_replication set last_checked = ? ");
812
         sql.append(" where server like ? ");
813
         pstmt = dbConn.prepareStatement(sql.toString());
814
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
815
         pstmt.setString(2, server);
816
         
817
         pstmt.executeUpdate();
818
         dbConn.commit();
819
         pstmt.close();
820
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
821
                                      + server);
822
      }//if
823
      else
824
      {
825

    
826
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
827
                                  server + " in db because couldn't get time "
828
                                  );
829
         throw new Exception("Couldn't get time for server "+ server);
830
      }
831

    
832
    }//try
833
    catch(Exception e)
834
    {
835
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
836
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
837
                                server + " in db because because " + e.getMessage());
838
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
839
    		  + "Error updating last checked time: " + e.getMessage());
840
    }
841
    finally
842
    {
843
       //return DBConnection
844
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
845
    }//finally
846
  }
847
  
848
  	/**
849
	 * Handle replicate system metadata
850
	 * 
851
	 * @param remoteserver
852
	 * @param guid
853
	 * @throws HandlerException
854
	 */
855
	private void handleSystemMetadata(String remoteserver, String guid) 
856
		throws HandlerException {
857
		try {
858

    
859
			// Try get the system metadata from remote server
860
			String sysMetaURLStr = "https://" + remoteserver + "?server="
861
					+ MetacatUtil.getLocalReplicationServerName()
862
					+ "&action=getsystemmetadata&guid=" + guid;
863
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
864
			URL sysMetaUrl = new URL(sysMetaURLStr);
865
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
866
							+ sysMetaUrl.toString());
867
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
868

    
869
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
870

    
871
			// process system metadata
872
			if (systemMetadataXML != null) {
873
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
874
								new ByteArrayInputStream(systemMetadataXML
875
										.getBytes("UTF-8")));
876
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
877
			}
878

    
879
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
880
							+ guid);
881

    
882
			String ip = getIpFromURL(sysMetaUrl);
883
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
884

    
885
		} catch (Exception e) {
886
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
887
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
888
			logReplication
889
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
890
							+ guid + " into db because " + e.getMessage());
891
			throw new HandlerException(
892
					"ReplicationHandler.handleSystemMetadata - generic exception "
893
							+ "writing Replication: " + e.getMessage());
894
		}
895

    
896
	}
897

    
898
  /**
899
   * updates xml_catalog with entries from other servers.
900
   */
901
  private void updateCatalog()
902
  {
903
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
904
    // ReplicationServer object in server list
905
    ReplicationServer replServer = null;
906
    PreparedStatement pstmt = null;
907
    String server = null;
908

    
909

    
910
    // Go through each ReplicationServer object in sererlist
911
    for (int j=0; j<serverList.size(); j++)
912
    {
913
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
914
      Vector<String> publicId = new Vector<String>();
915
      try
916
      {
917
        // Get ReplicationServer object from server list
918
        replServer = serverList.serverAt(j);
919
        // Get server name from the ReplicationServer object
920
        server = replServer.getServerName();
921
        // Try to get catalog
922
        URL u = new URL("https://" + server + "?server="+
923
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
924
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
925
        String catxml = ReplicationService.getURLContent(u);
926

    
927
        // Make sure there are not error, no empty string
928
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
929
        {
930
          throw new Exception("Couldn't get catalog list form server " +server);
931
        }
932
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
933
        CatalogMessageHandler cmh = new CatalogMessageHandler();
934
        XMLReader catparser = initParser(cmh);
935
        catparser.parse(new InputSource(new StringReader(catxml)));
936
        //parse the returned catalog xml and put it into a vector
937
        remoteCatalog = cmh.getCatalogVect();
938

    
939
        // Make sure remoteCatalog is not empty
940
        if (remoteCatalog.isEmpty())
941
        {
942
          throw new Exception("Couldn't get catalog list form server " +server);
943
        }
944

    
945
        String localcatxml = ReplicationService.getCatalogXML();
946

    
947
        // Make sure local catalog is no empty
948
        if (localcatxml==null||localcatxml.equals(""))
949
        {
950
          throw new Exception("Couldn't get catalog list form server " +server);
951
        }
952

    
953
        cmh = new CatalogMessageHandler();
954
        catparser = initParser(cmh);
955
        catparser.parse(new InputSource(new StringReader(localcatxml)));
956
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
957

    
958
        //now we have the catalog from the remote server and this local server
959
        //we now need to compare the two and merge the differences.
960
        //the comparison is base on the public_id fields which is the 4th
961
        //entry in each row vector.
962
        publicId = new Vector<String>();
963
        for(int i=0; i<localCatalog.size(); i++)
964
        {
965
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
966
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
967
          publicId.add(new String((String)v.elementAt(3)));
968
        }
969
      }//try
970
      catch (Exception e)
971
      {
972
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
973
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
974
                                    server + " because " +e.getMessage());
975
      }//catch
976

    
977
      for(int i=0; i<remoteCatalog.size(); i++)
978
      {
979
         // DConnection
980
        DBConnection dbConn = null;
981
        // DBConnection checkout serial number
982
        int serialNumber = -1;
983
        try
984
        {
985
            dbConn=DBConnectionPool.
986
                  getDBConnection("ReplicationHandler.updateCatalog");
987
            serialNumber=dbConn.getCheckOutSerialNumber();
988
            Vector<String> v = remoteCatalog.elementAt(i);
989
            //logMetacat.debug("v2: " + v.toString());
990
            //logMetacat.debug("i: " + i);
991
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
992
            //logMetacat.debug("publicID: " + publicId.toString());
993
            logReplication.info
994
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
995
           if(!publicId.contains(v.elementAt(3)))
996
           { //so we don't have this public id in our local table so we need to
997
             //add it.
998
             //logMetacat.debug("in if");
999
             StringBuffer sql = new StringBuffer();
1000
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
1001
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
1002
             sql.append("?,?)");
1003
             //logMetacat.debug("sql: " + sql.toString());
1004
             pstmt = dbConn.prepareStatement(sql.toString());
1005
             pstmt.setString(1, (String)v.elementAt(0));
1006
             pstmt.setString(2, (String)v.elementAt(1));
1007
             pstmt.setString(3, (String)v.elementAt(2));
1008
             pstmt.setString(4, (String)v.elementAt(3));
1009
             pstmt.setString(5, (String)v.elementAt(4));
1010
             pstmt.execute();
1011
             pstmt.close();
1012
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
1013
                               (String)v.elementAt(3) + " from server"+server);
1014
           }
1015
        }
1016
        catch(Exception e)
1017
        {
1018
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1019
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1020
                                    server + " because " +e.getMessage());
1021
        }//catch
1022
        finally
1023
        {
1024
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1025
        }//finally
1026
      }//for remote catalog
1027
    }//for server list
1028
    logReplication.info("End of updateCatalog");
1029
  }
1030

    
1031
  /**
1032
   * Method that returns true if docid has already been "deleted" from metacat.
1033
   * This method really implements a truth table for deleted documents
1034
   * The table is (a docid in one of the tables is represented by the X):
1035
   * xml_docs      xml_revs      deleted?
1036
   * ------------------------------------
1037
   *   X             X             FALSE
1038
   *   X             _             FALSE
1039
   *   _             X             TRUE
1040
   *   _             _             TRUE
1041
   */
1042
  private static boolean alreadyDeleted(String docid) throws HandlerException
1043
  {
1044
    DBConnection dbConn = null;
1045
    int serialNumber = -1;
1046
    PreparedStatement pstmt = null;
1047
    try
1048
    {
1049
      dbConn=DBConnectionPool.
1050
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1051
      serialNumber=dbConn.getCheckOutSerialNumber();
1052
      boolean xml_docs = false;
1053
      boolean xml_revs = false;
1054

    
1055
      StringBuffer sb = new StringBuffer();
1056
      sb.append("select docid from xml_revisions where docid like ? ");
1057
      pstmt = dbConn.prepareStatement(sb.toString());
1058
      pstmt.setString(1, docid);
1059
      pstmt.execute();
1060
      ResultSet rs = pstmt.getResultSet();
1061
      boolean tablehasrows = rs.next();
1062
      if(tablehasrows)
1063
      {
1064
        xml_revs = true;
1065
      }
1066

    
1067
      sb = new StringBuffer();
1068
      sb.append("select docid from xml_documents where docid like '");
1069
      sb.append(docid).append("'");
1070
      pstmt.close();
1071
      pstmt = dbConn.prepareStatement(sb.toString());
1072
      //increase usage count
1073
      dbConn.increaseUsageCount(1);
1074
      pstmt.execute();
1075
      rs = pstmt.getResultSet();
1076
      tablehasrows = rs.next();
1077
      pstmt.close();
1078
      if(tablehasrows)
1079
      {
1080
        xml_docs = true;
1081
      }
1082

    
1083
      if(xml_docs && xml_revs)
1084
      {
1085
        return false;
1086
      }
1087
      else if(xml_docs && !xml_revs)
1088
      {
1089
        return false;
1090
      }
1091
      else if(!xml_docs && xml_revs)
1092
      {
1093
        return true;
1094
      }
1095
      else if(!xml_docs && !xml_revs)
1096
      {
1097
        return true;
1098
      }
1099
    }
1100
    catch(Exception e)
1101
    {
1102
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1103
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1104
                          e.getMessage());
1105
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1106
    		  + e.getMessage());
1107
    }
1108
    finally
1109
    {
1110
      try
1111
      {
1112
        pstmt.close();
1113
      }//try
1114
      catch (SQLException ee)
1115
      {
1116
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1117
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1118
                          "to close pstmt: "+ee.getMessage());
1119
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1120
      		  + ee.getMessage());
1121
      }//catch
1122
      finally
1123
      {
1124
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1125
      }//finally
1126
    }//finally
1127
    return false;
1128
  }
1129

    
1130

    
1131
  /**
1132
   * Method to initialize the message parser
1133
   */
1134
  public static XMLReader initParser(DefaultHandler dh)
1135
          throws HandlerException
1136
  {
1137
    XMLReader parser = null;
1138

    
1139
    try {
1140
      ContentHandler chandler = dh;
1141

    
1142
      // Get an instance of the parser
1143
      String parserName = PropertyService.getProperty("xml.saxparser");
1144
      parser = XMLReaderFactory.createXMLReader(parserName);
1145

    
1146
      // Turn off validation
1147
      parser.setFeature("http://xml.org/sax/features/validation", false);
1148

    
1149
      parser.setContentHandler((ContentHandler)chandler);
1150
      parser.setErrorHandler((ErrorHandler)chandler);
1151

    
1152
    } catch (SAXException se) {
1153
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1154
    		  + " initializing parser: " + se.getMessage());
1155
    } catch (PropertyNotFoundException pnfe) {
1156
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1157
      		  + " getting parser name: " + pnfe.getMessage());
1158
    } 
1159

    
1160
    return parser;
1161
  }
1162

    
1163
  /**
1164
	 * This method will combine given time string(in short format) to current
1165
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1166
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1167
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1168
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1169
	 * 
1170
	 * @param givenTime
1171
	 *            the format should be "10:00 AM " or "2:00 PM"
1172
	 * @return
1173
	 * @throws Exception
1174
	 */
1175
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1176
  {
1177
	  try {
1178
     Date givenDate = parseTime(givenTime);
1179
     Date newDate = null;
1180
     Date now = new Date();
1181
     String currentTimeString = getTimeString(now);
1182
     Date currentTime = parseTime(currentTimeString); 
1183
     if ( currentTime.getTime() >= givenDate.getTime())
1184
     {
1185
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1186
        String dateAndTime = getDateString(now) + " " + givenTime;
1187
        Date combinationDate = parseDateTime(dateAndTime);
1188
        // new date should plus 24 hours to make is the second day
1189
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1190
     }
1191
     else
1192
     {
1193
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1194
         String dateAndTime = getDateString(now) + " " + givenTime;
1195
         newDate = parseDateTime(dateAndTime);
1196
     }
1197
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1198
     return newDate;
1199
	  } catch (ParseException pe) {
1200
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1201
				  + "parsing error: "  + pe.getMessage());
1202
	  }
1203
  }
1204

    
1205
  /*
1206
	 * parse a given string to Time in short format. For example, given time is
1207
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1208
	 */
1209
  private static Date parseTime(String timeString) throws ParseException
1210
  {
1211
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1212
    Date time = format.parse(timeString); 
1213
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1214
                              +time.toString());
1215
    return time;
1216

    
1217
  }
1218
  
1219
  /*
1220
   * Parse a given string to date and time. Date format is long and time
1221
   * format is short.
1222
   */
1223
  private static Date parseDateTime(String timeString) throws ParseException
1224
  {
1225
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1226
    Date time = format.parse(timeString);
1227
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1228
                             time.toString());
1229
    return time;
1230
  }
1231
  
1232
  /*
1233
   * Get a date string from a Date object. The date format will be long
1234
   */
1235
  private static String getDateString(Date now)
1236
  {
1237
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1238
     String s = df.format(now);
1239
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1240
     return s;
1241
  }
1242
  
1243
  /*
1244
   * Get a time string from a Date object, the time format will be short
1245
   */
1246
  private static String getTimeString(Date now)
1247
  {
1248
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1249
     String s = df.format(now);
1250
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1251
     return s;
1252
  }
1253
  
1254
  
1255
  /*
1256
	 * This method will go through the docid list both in xml_Documents table
1257
	 * and in xml_revisions table @author tao
1258
	 */
1259
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1260
		boolean dataFile = false;
1261
		for (int j = 0; j < docList.size(); j++) {
1262
			// initial dataFile is false
1263
			dataFile = false;
1264
			// w is information for one document, information contain
1265
			// docid, rev, server or datafile.
1266
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1267
			// Check if the vector w contain "datafile"
1268
			// If it has, this document is data file
1269
			try {
1270
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1271
					dataFile = true;
1272
				}
1273
			} catch (PropertyNotFoundException pnfe) {
1274
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1275
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1276
						+ "Leaving as false: " + pnfe.getMessage());
1277
			}
1278
			// logMetacat.debug("w: " + w.toString());
1279
			// Get docid
1280
			String docid = (String) w.elementAt(0);
1281
			logReplication.info("docid: " + docid);
1282
			// Get revision number
1283
			int rev = Integer.parseInt((String) w.elementAt(1));
1284
			logReplication.info("rev: " + rev);
1285
			// Get remote server name (it is may not be doc home server because
1286
			// the new hub feature
1287
			String remoteServer = (String) w.elementAt(2);
1288
			remoteServer = remoteServer.trim();
1289

    
1290
			try {
1291
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1292
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1293
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1294
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1295
				} else {
1296
					continue;
1297
				}
1298

    
1299
			} catch (Exception e) {
1300
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1301
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1302
						+ " in time replication" + e.getMessage(), e);
1303
				continue;
1304
			}
1305
			
1306
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1307
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1308
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1309
	        }
1310
	        
1311
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1312
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1313
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1314
	        }
1315

    
1316
		}// for update docs
1317

    
1318
	}
1319
   
1320
   /*
1321
	 * This method will handle doc in xml_documents table.
1322
	 */
1323
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1324
                                        throws HandlerException
1325
   {
1326
       // compare the update rev and local rev to see what need happen
1327
       int localrev = -1;
1328
       String action = null;
1329
       boolean flag = false;
1330
       try
1331
       {
1332
    	 long docQueryStartTime = System.currentTimeMillis();
1333
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1334
         long docQueryEndTime = System.currentTimeMillis();
1335
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1336
         _xmlDocQueryCount++;
1337
       }
1338
       catch (SQLException e)
1339
       {
1340
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1341
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1342
                                " be found because " + e.getMessage());
1343
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1344
                 "written because error happend to find it's local revision");
1345
         DOCERRORNUMBER++;
1346
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1347
                 " be found: " + e.getMessage());
1348
       }
1349
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1350
                               localrev);
1351

    
1352
       //check the revs for an update because this document is in the
1353
       //local DB, it might be out of date.
1354
       if (localrev == -1)
1355
       {
1356
          // check if the revision is in the revision table
1357
    	   Vector<Integer> localRevVector = null;
1358
    	 try {
1359
        	 long revQueryStartTime = System.currentTimeMillis();
1360
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1361
             long revQueryEndTime = System.currentTimeMillis();
1362
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1363
             _xmlRevQueryCount++;
1364
    	 } catch (SQLException sqle) {
1365
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1366
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1367
    	 }
1368
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1369
         {
1370
             // this version was deleted, so don't need replicate
1371
             flag = false;
1372
         }
1373
         else
1374
         {
1375
           //insert this document as new because it is not in the local DB
1376
           action = "INSERT";
1377
           flag = true;
1378
         }
1379
       }
1380
       else
1381
       {
1382
         if(localrev == rev)
1383
         {
1384
           // Local meatacat has the same rev to remote host, don't need
1385
           // update and flag set false
1386
           flag = false;
1387
         }
1388
         else if(localrev < rev)
1389
         {
1390
           //this document needs to be updated so send an read request
1391
           action = "UPDATE";
1392
           flag = true;
1393
         }
1394
       }
1395
       
1396
       String accNumber = null;
1397
       try {
1398
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1399
       } catch (PropertyNotFoundException pnfe) {
1400
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1401
    			   + "account number separator : " + pnfe.getMessage());
1402
       }
1403
       // this is non-data file
1404
       if(flag && !dataFile)
1405
       {
1406
         try
1407
         {
1408
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1409
         }
1410
         catch(HandlerException he)
1411
         {
1412
           // skip this document
1413
           throw he;
1414
         }
1415
       }//if for non-data file
1416

    
1417
        // this is for data file
1418
       if(flag && dataFile)
1419
       {
1420
         try
1421
         {
1422
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1423
         }
1424
         catch(HandlerException he)
1425
         {
1426
           // skip this data file
1427
           throw he;
1428
         }
1429

    
1430
       }//for data file
1431
   }
1432
   
1433
   /*
1434
    * This method will handle doc in xml_documents table.
1435
    */
1436
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1437
                                        throws HandlerException
1438
   {
1439
       // compare the update rev and local rev to see what need happen
1440
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1441
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1442
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1443
       Vector<Integer> localrev = null;
1444
       String action = "INSERT";
1445
       boolean flag = false;
1446
       try
1447
       {
1448
      	 long revQueryStartTime = System.currentTimeMillis();
1449
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1450
         long revQueryEndTime = System.currentTimeMillis();
1451
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1452
         _xmlRevQueryCount++;
1453
       }
1454
       catch (SQLException sqle)
1455
       {
1456
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1457
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1458
                                " be found because " + sqle.getMessage());
1459
         REVERRORNUMBER++;
1460
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1461
        		 + sqle.getMessage());
1462
       }
1463
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1464
                               localrev.toString());
1465
       
1466
       // if the rev is not in the xml_revision, we need insert it
1467
       if (!localrev.contains(new Integer(rev)))
1468
       {
1469
           flag = true;    
1470
       }
1471
     
1472
       String accNumber = null;
1473
       try {
1474
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1475
       } catch (PropertyNotFoundException pnfe) {
1476
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1477
    			   + "account number separator : " + pnfe.getMessage());
1478
       }
1479
       // this is non-data file
1480
       if(flag && !dataFile)
1481
       {
1482
         try
1483
         {
1484
           
1485
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1486
         }
1487
         catch(HandlerException he)
1488
         {
1489
           // skip this document
1490
           throw he;
1491
         }
1492
       }//if for non-data file
1493

    
1494
        // this is for data file
1495
       if(flag && dataFile)
1496
       {
1497
         try
1498
         {
1499
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1500
         }
1501
         catch(HandlerException he)
1502
         {
1503
           // skip this data file
1504
           throw he;
1505
         }
1506

    
1507
       }//for data file
1508
   }
1509
   
1510
   /*
1511
    * Return a ip address for given url
1512
    */
1513
   private String getIpFromURL(URL url)
1514
   {
1515
	   String ip = null;
1516
	   try
1517
	   {
1518
	      InetAddress address = InetAddress.getByName(url.getHost());
1519
	      ip = address.getHostAddress();
1520
	   }
1521
	   catch(UnknownHostException e)
1522
	   {
1523
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1524
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1525
                   +e.getMessage());
1526
	   }
1527

    
1528
	   return ip;
1529
   }
1530
  
1531
}
1532

    
(3-3/7)