Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2011-11-17 11:19:09 -0800 (Thu, 17 Nov 2011) $'
10
 * '$Revision: 6666 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Calendar;
42
import java.util.Date;
43
import java.util.Hashtable;
44
import java.util.TimerTask;
45
import java.util.Vector;
46

    
47
import javax.xml.bind.DatatypeConverter;
48

    
49
import org.apache.log4j.Logger;
50
import org.dataone.service.types.v1.SystemMetadata;
51
import org.dataone.service.util.DateTimeMarshaller;
52
import org.dataone.service.util.TypeMarshaller;
53
import org.xml.sax.ContentHandler;
54
import org.xml.sax.ErrorHandler;
55
import org.xml.sax.InputSource;
56
import org.xml.sax.SAXException;
57
import org.xml.sax.XMLReader;
58
import org.xml.sax.helpers.DefaultHandler;
59
import org.xml.sax.helpers.XMLReaderFactory;
60

    
61
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
62
import edu.ucsb.nceas.metacat.DBUtil;
63
import edu.ucsb.nceas.metacat.DocInfoHandler;
64
import edu.ucsb.nceas.metacat.DocumentImpl;
65
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
66
import edu.ucsb.nceas.metacat.EventLog;
67
import edu.ucsb.nceas.metacat.IdentifierManager;
68
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
69
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
70
import edu.ucsb.nceas.metacat.accesscontrol.XMLAccessDAO;
71
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
72
import edu.ucsb.nceas.metacat.database.DBConnection;
73
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
74
import edu.ucsb.nceas.metacat.database.DatabaseService;
75
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
76
import edu.ucsb.nceas.metacat.properties.PropertyService;
77
import edu.ucsb.nceas.metacat.shared.HandlerException;
78
import edu.ucsb.nceas.metacat.util.MetacatUtil;
79
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
80
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
81

    
82

    
83

    
84
/**
85
 * This class handles deltaT replication checking.  Whenever this TimerTask
86
 * is fired it checks each server in xml_replication for updates and updates
87
 * the local db as needed.
88
 */
89
public class ReplicationHandler extends TimerTask
90
{
91
  int serverCheckCode = 1;
92
  ReplicationServerList serverList = null;
93
  //PrintWriter out;
94
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
95
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
96
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
97
  private static Logger logD1 = Logger.getLogger("DataOneLogger");
98
  
99
  private static int DOCINSERTNUMBER = 1;
100
  private static int DOCERRORNUMBER  = 1;
101
  private static int REVINSERTNUMBER = 1;
102
  private static int REVERRORNUMBER  = 1;
103
  
104
  private static int _xmlDocQueryCount = 0;
105
  private static int _xmlRevQueryCount = 0;
106
  private static long _xmlDocQueryTime = 0;
107
  private static long _xmlRevQueryTime = 0;
108
  
109
  
110
  public ReplicationHandler()
111
  {
112
    //this.out = o;
113
    serverList = new ReplicationServerList();
114
  }
115

    
116
  public ReplicationHandler(int serverCheckCode)
117
  {
118
    //this.out = o;
119
    this.serverCheckCode = serverCheckCode;
120
    serverList = new ReplicationServerList();
121
  }
122

    
123
  /**
124
   * Method that implements TimerTask.run().  It runs whenever the timer is
125
   * fired.
126
   */
127
  public void run()
128
  {
129
    //find out the last_checked time of each server in the server list and
130
    //send a query to each server to see if there are any documents in
131
    //xml_documents with an update_date > last_checked
132
	  
133
      //if serverList is null, metacat don't need to replication
134
      if (serverList==null||serverList.isEmpty())
135
      {
136
        return;
137
      }
138
      updateCatalog();
139
      update();
140
      //conn.close();
141
  }
142

    
143
  /**
144
   * Method that uses revision tagging for replication instead of update_date.
145
   */
146
  private void update()
147
  {
148
	  
149
	  _xmlDocQueryCount = 0;
150
	  _xmlRevQueryCount = 0;
151
	  _xmlDocQueryTime = 0;
152
	  _xmlRevQueryTime = 0;
153
    /*
154
     Pseudo-algorithm
155
     - request a doc list from each server in xml_replication
156
     - check the rev number of each of those documents agains the
157
       documents in the local database
158
     - pull any documents that have a lesser rev number on the local server
159
       from the remote server
160
     - delete any documents that still exist in the local xml_documents but
161
       are in the deletedDocuments tag of the remote host response.
162
     - update last_checked to keep track of the last time it was checked.
163
       (this info is theoretically not needed using this system but probably
164
       should be kept anyway)
165
    */
166

    
167
    ReplicationServer replServer = null; // Variable to store the
168
                                        // ReplicationServer got from
169
                                        // Server list
170
    String server = null; // Variable to store server name
171
//    String update;
172
    Vector<String> responses = new Vector<String>();
173
    URL u;
174
    long replicationStartTime = System.currentTimeMillis();
175
    long timeToGetServerList = 0;
176
    
177
    //Check for every server in server list to get updated list and put
178
    // them in to response
179
    long startTimeToGetServers = System.currentTimeMillis();
180
    for (int i=0; i<serverList.size(); i++)
181
    {
182
        // Get ReplicationServer object from server list
183
        replServer = serverList.serverAt(i);
184
        // Get server name from ReplicationServer object
185
        server = replServer.getServerName().trim();
186
        String result = null;
187
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
188
        // Send command to that server to get updated docid information
189
        try
190
        {
191
          u = new URL("https://" + server + "?server="
192
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
193
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
194
          result = ReplicationService.getURLContent(u);
195
        }
196
        catch (Exception e)
197
        {
198
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
199
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
200
                       "for server " + server + " because "+e.getMessage());
201
          continue;
202
        }
203

    
204
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
205
        //check if result have error or not, if has skip it.
206
        if (result.indexOf("<error>")!=-1 && result.indexOf("</error>")!=-1)
207
        {
208
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
209
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
210
                       "for server " + server + " because "+result);
211
          continue;
212
        }
213
        //Add result to vector
214
        responses.add(result);
215
    }
216
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
217

    
218
    //make sure that there is updated file list
219
    //If response is null, metacat don't need do anything
220
    if (responses==null || responses.isEmpty())
221
    {
222
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
223
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
224
                           "every server and failed to replicate");
225
        return;
226
    }
227

    
228

    
229
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
230
    //               "document information: "+ responses.toString());
231
    
232
    long totalServerListParseTime = 0;
233
    // go through response vector(it contains updated vector and delete vector
234
    for(int i=0; i<responses.size(); i++)
235
    {
236
    	long startServerListParseTime = System.currentTimeMillis();
237
    	XMLReader parser;
238
    	ReplMessageHandler message = new ReplMessageHandler();
239
    	try
240
        {
241
          parser = initParser(message);
242
        }
243
        catch (Exception e)
244
        {
245
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
246
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
247
                                " initParser for message and " +e.getMessage());
248
           // stop replication
249
           return;
250
        }
251
    	
252
        try
253
        {
254
          parser.parse(new InputSource(
255
                     new StringReader(
256
                     (String)(responses.elementAt(i)))));
257
        }
258
        catch(Exception e)
259
        {
260
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
261
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
262
                                   "because "+ e.getMessage());
263
          continue;
264
        }
265
        //v is the list of updated documents
266
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
267
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
268
        //d is the list of deleted documents
269
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
270
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
271
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
272
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
273
        // go though every element in updated document vector
274
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
275
        //handle deleted docs
276
        for(int k=0; k<deleteList.size(); k++)
277
        { //delete the deleted documents;
278
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
279
          String docId = (String)w.elementAt(0);
280
          try
281
          {
282
            handleDeleteSingleDocument(docId, server);
283
          }
284
          catch (Exception ee)
285
          {
286
            continue;
287
          }
288
        }//for delete docs
289
        
290
        // handle replicate doc in xml_revision
291
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
292
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
293
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
294
        DOCINSERTNUMBER = 1;
295
        DOCERRORNUMBER  = 1;
296
        REVINSERTNUMBER = 1;
297
        REVERRORNUMBER  = 1;
298
        
299
        // handle system metadata
300
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
301
        for(int k = 0; k < systemMetadataList.size(); k++) { 
302
        	Vector<String> w = systemMetadataList.elementAt(k);
303
        	String guid = (String) w.elementAt(0);
304
        	String remoteserver = (String) w.elementAt(1);
305
        	try {
306
        		handleSystemMetadata(remoteserver, guid);
307
        	}
308
        	catch (Exception ee) {
309
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
310
        		continue;
311
        	}
312
        }
313
        
314
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
315
    }//for response
316

    
317
    //updated last_checked
318
    for (int i=0;i<serverList.size(); i++)
319
    {
320
       // Get ReplicationServer object from server list
321
       replServer = serverList.serverAt(i);
322
       try
323
       {
324
         updateLastCheckTimeForSingleServer(replServer);
325
       }
326
       catch(Exception e)
327
       {
328
         continue;
329
       }
330
    }//for
331
    
332
    long replicationEndTime = System.currentTimeMillis();
333
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
334
    		(replicationEndTime - replicationStartTime));
335
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
336
    		timeToGetServerList);
337
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
338
    		totalServerListParseTime);
339
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
340
    		_xmlDocQueryCount);
341
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
342
    		_xmlDocQueryTime + " ms");
343
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
344
    		_xmlRevQueryCount);
345
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
346
    		_xmlRevQueryTime + " ms");;
347

    
348
  }//update
349

    
350
  /* Handle replicate single xml document*/
351
  private void handleSingleXMLDocument(String remoteserver, String actions,
352
                                       String accNumber, String tableName)
353
               throws HandlerException
354
  {
355
    DBConnection dbConn = null;
356
    int serialNumber = -1;
357
    try
358
    {
359
      // Get DBConnection from pool
360
      dbConn=DBConnectionPool.
361
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
362
      serialNumber=dbConn.getCheckOutSerialNumber();
363
      //if the document needs to be updated or inserted, this is executed
364
      String readDocURLString = "https://" + remoteserver + "?server="+
365
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
366
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
367
      URL u = new URL(readDocURLString);
368

    
369
      // Get docid content
370
      String newxmldoc = ReplicationService.getURLContent(u);
371
      // If couldn't get skip it
372
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
373
      {
374
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
375
      }
376
      //logReplication.info("xml documnet:");
377
      //logReplication.info(newxmldoc);
378

    
379
      // Try get the docid info from remote server
380
      DocInfoHandler dih = new DocInfoHandler();
381
      XMLReader docinfoParser = initParser(dih);
382
      String docInfoURLStr = "https://" + remoteserver +
383
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
384
                       "&action=getdocumentinfo&docid="+accNumber;
385
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
386
      URL docinfoUrl = new URL(docInfoURLStr);
387
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
388
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
389
      
390
      // strip out the system metadata portion
391
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
392
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
393
      
394
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
395
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
396
      // Get home server of the docid
397
      String docHomeServer = docinfoHash.get("home_server");
398
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
399
     
400
      // dates
401
      String createdDateString = docinfoHash.get("date_created");
402
      String updatedDateString = docinfoHash.get("date_updated");
403
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
404
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
405
      
406
      //docid should include rev number too
407
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
408
                                              (String)docinfoHash.get("rev");*/
409
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
410
      String docType = docinfoHash.get("doctype");
411
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
412

    
413
      String parserBase = null;
414
      // this for eml2 and we need user eml2 parser
415
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
416
      {
417
         parserBase = DocumentImpl.EML200;
418
      }
419
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
420
      {
421
        parserBase = DocumentImpl.EML200;
422
      }
423
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
424
      {
425
        parserBase = DocumentImpl.EML210;
426
      }
427
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
428
      {
429
        parserBase = DocumentImpl.EML210;
430
      }
431
      // Write the document into local host
432
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false);
433
      String newDocid = wrapper.writeReplication(dbConn,
434
                              newxmldoc,
435
                              docinfoHash.get("public_access"),
436
                              null,  /* the dtd text */
437
                              actions,
438
                              accNumber,
439
                              null, //docinfoHash.get("user_owner"),                              
440
                              null, /* null for groups[] */
441
                              docHomeServer,
442
                              remoteserver, tableName, true,// true is for time replication 
443
                              createdDate,
444
                              updatedDate);
445
      
446
      //set the user information
447
      String user = (String) docinfoHash.get("user_owner");
448
      String updated = (String) docinfoHash.get("user_updated");
449
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
450
      
451
      //process extra access rules 
452
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
453
      if (xmlAccessDAOList != null) {
454
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
455
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
456
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
457
      			acfsf.insertPermissions(xmlAccessDAO);
458
      		}
459
          }
460
      }
461
      
462
      // process system metadata
463
      if (systemMetadataXML != null) {
464
    	  SystemMetadata sysMeta = 
465
    		  TypeMarshaller.unmarshalTypeFromStream(
466
    				  SystemMetadata.class, 
467
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
468
    	  // need the guid-to-docid mapping
469
      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
470
      	  // save the system metadata
471
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
472
      }
473
      
474
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
475
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
476
      {
477
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
478
                                     " into "+tableName + " from " +
479
                                         remoteserver);
480
        DOCINSERTNUMBER++;
481
      }
482
      else
483
      {
484
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
485
                  " into "+tableName + " from " +
486
                      remoteserver);
487
          REVINSERTNUMBER++;
488
      }
489
      String ip = getIpFromURL(u);
490
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
491
      
492

    
493
    }//try
494
    catch(Exception e)
495
    {
496
        
497
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
498
        {
499
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
500
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
501
                                       " into "+tableName + " from " +
502
                                           remoteserver + " because "+e.getMessage());
503
          DOCERRORNUMBER++;
504
        }
505
        else
506
        {
507
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
508
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
509
                    " into "+tableName + " from " +
510
                        remoteserver +" because "+e.getMessage());
511
            REVERRORNUMBER++;
512
        }
513
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
514
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
515
                                      " into db because " +e.getMessage());
516
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
517
    		  + "writing Replication: " +e.getMessage());
518
    }
519
    finally
520
    {
521
       //return DBConnection
522
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
523
    }//finally
524
    logD1.info("replication.create localId:" + accNumber);
525
  }
526

    
527

    
528

    
529
  /* Handle replicate single xml document*/
530
  private void handleSingleDataFile(String remoteserver, String actions,
531
                                    String accNumber, String tableName)
532
               throws HandlerException
533
  {
534
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
535
    DBConnection dbConn = null;
536
    int serialNumber = -1;
537
    try
538
    {
539
      // Get DBConnection from pool
540
      dbConn=DBConnectionPool.
541
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
542
      serialNumber=dbConn.getCheckOutSerialNumber();
543
      // Try get docid info from remote server
544
      DocInfoHandler dih = new DocInfoHandler();
545
      XMLReader docinfoParser = initParser(dih);
546
      String docInfoURLString = "https://" + remoteserver +
547
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
548
                  "&action=getdocumentinfo&docid="+accNumber;
549
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
550
      URL docinfoUrl = new URL(docInfoURLString);
551

    
552
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
553
      
554
      // strip out the system metadata portion
555
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
556
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
557
   	  
558
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
559
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
560
      
561
      // Get docid name (such as acl or dataset)
562
      String docName = docinfoHash.get("docname");
563
      // Get doc type (eml public id)
564
      String docType = docinfoHash.get("doctype");
565
      // Get docid home sever. it might be different to remoteserver
566
      // because of hub feature
567
      String docHomeServer = docinfoHash.get("home_server");
568
      String createdDateString = docinfoHash.get("date_created");
569
      String updatedDateString = docinfoHash.get("date_updated");
570
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
571
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
572
      //docid should include rev number too
573
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
574
                                              (String)docinfoHash.get("rev");*/
575

    
576
      String datafilePath = PropertyService.getProperty("application.datafilepath");
577
      // Get data file content
578
      String readDataURLString = "https://" + remoteserver + "?server="+
579
                                        MetacatUtil.getLocalReplicationServerName()+
580
                                            "&action=readdata&docid="+accNumber;
581
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
582
      URL u = new URL(readDataURLString);
583
      InputStream input = ReplicationService.getURLStream(u);
584
      //register data file into xml_documents table and wite data file
585
      //into file system
586
      if ( input != null)
587
      {
588
        DocumentImpl.writeDataFileInReplication(input,
589
                                                datafilePath,
590
                                                docName,docType,
591
                                                accNumber,
592
                                                null,
593
                                                docHomeServer,
594
                                                remoteserver,
595
                                                tableName,
596
                                                true, //true means timed replication
597
                                                createdDate,
598
                                                updatedDate);
599
                                         
600
        //set the user information
601
        String user = (String) docinfoHash.get("user_owner");
602
		String updated = (String) docinfoHash.get("user_updated");
603
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
604
        
605
        //process extra access rules
606
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
607
        if (xmlAccessDAOList != null) {
608
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
609
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
610
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
611
        			acfsf.insertPermissions(xmlAccessDAO);
612
        		}
613
            }
614
        }
615
        
616
        // process system metadata
617
        if (systemMetadataXML != null) {
618
      	  SystemMetadata sysMeta = 
619
      		TypeMarshaller.unmarshalTypeFromStream(
620
      				  SystemMetadata.class, 
621
      				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
622
      	  // need the guid-to-docid mapping
623
      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
624
      	  // save the system metadata
625
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
626

    
627
        }
628
        
629
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
630
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
631
                                    remote server);*/
632
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
633
        {
634
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
635
                                       " into "+tableName + " from " +
636
                                           remoteserver);
637
          DOCINSERTNUMBER++;
638
        }
639
        else
640
        {
641
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
642
                    " into "+tableName + " from " +
643
                        remoteserver);
644
            REVINSERTNUMBER++;
645
        }
646
        String ip = getIpFromURL(u);
647
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
648
        
649
      }//if
650
      else
651
      {
652
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
653
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
654
      }//else
655

    
656
    }//try
657
    catch(Exception e)
658
    {
659
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
660
                                      " because " +e.getMessage());*/
661
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
662
      {
663
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
664
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
665
                                     " into " + tableName + " from " +
666
                                         remoteserver + " because " + e.getMessage());
667
        DOCERRORNUMBER++;
668
      }
669
      else
670
      {
671
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
672
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
673
                  " into " + tableName + " from " +
674
                      remoteserver +" because "+ e.getMessage());
675
          REVERRORNUMBER++;
676
      }
677
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
678
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
679
                                      " because " + e.getMessage());
680
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
681
    		  + "writing Replication: " + e.getMessage());
682
    }
683
    finally
684
    {
685
       //return DBConnection
686
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
687
    }//finally
688
    logD1.info("replication.create localId:" + accNumber);
689
  }
690

    
691

    
692

    
693
  /* Handle delete single document*/
694
  private void handleDeleteSingleDocument(String docId, String notifyServer)
695
               throws HandlerException
696
  {
697
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
698
    DBConnection dbConn = null;
699
    int serialNumber = -1;
700
    try
701
    {
702
      // Get DBConnection from pool
703
      dbConn=DBConnectionPool.
704
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
705
      serialNumber=dbConn.getCheckOutSerialNumber();
706
      if(!alreadyDeleted(docId))
707
      {
708

    
709
         //because delete method docid should have rev number
710
         //so we just add one for it. This rev number is no sence.
711
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
712
         DocumentImpl.delete(accnum, null, null, notifyServer);
713
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
714
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
715
         URL u = new URL("https://"+notifyServer);
716
         String ip = getIpFromURL(u);
717
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
718
      }
719

    
720
    }//try
721
    catch(McdbDocNotFoundException e)
722
    {
723
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
724
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
725
                                 " in db because because " + e.getMessage());
726
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
727
    		  + "when handling document: " + e.getMessage());
728
    }
729
    catch(InsufficientKarmaException e)
730
    {
731
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
732
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
733
                                 " in db because because " + e.getMessage());
734
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
735
    		  + "when handling document: " + e.getMessage());
736
    }
737
    catch(SQLException e)
738
    {
739
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
740
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
741
                                 " in db because because " + e.getMessage());
742
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
743
    		  + "when handling document: " + e.getMessage());
744
    }
745
    catch(Exception e)
746
    {
747
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
748
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
749
                                 " in db because because " + e.getMessage());
750
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
751
    		  + "when handling document: " + e.getMessage());
752
    }
753
    finally
754
    {
755
       //return DBConnection
756
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
757
    }//finally
758
    logD1.info("replication.handleDeleteSingleDocument localId:" + docId);
759
  }
760

    
761
  /* Handle updateLastCheckTimForSingleServer*/
762
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
763
                                                  throws HandlerException
764
  {
765
    String server = repServer.getServerName();
766
    DBConnection dbConn = null;
767
    int serialNumber = -1;
768
    PreparedStatement pstmt = null;
769
    try
770
    {
771
      // Get DBConnection from pool
772
      dbConn=DBConnectionPool.
773
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
774
      serialNumber=dbConn.getCheckOutSerialNumber();
775

    
776
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
777
      // Get time from remote server
778
      URL dateurl = new URL("https://" + server + "?server="+
779
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
780
      String datexml = ReplicationService.getURLContent(dateurl);
781
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
782
      if (datexml != null && !datexml.equals("")) {
783
    	  
784
    	  // parse the ISO datetime
785
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
786
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
787
         
788
         StringBuffer sql = new StringBuffer();
789
         sql.append("update xml_replication set last_checked = ? ");
790
         sql.append(" where server like ? ");
791
         pstmt = dbConn.prepareStatement(sql.toString());
792
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
793
         pstmt.setString(2, server);
794
         
795
         pstmt.executeUpdate();
796
         dbConn.commit();
797
         pstmt.close();
798
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
799
                                      + server);
800
      }//if
801
      else
802
      {
803

    
804
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
805
                                  server + " in db because couldn't get time "
806
                                  );
807
         throw new Exception("Couldn't get time for server "+ server);
808
      }
809

    
810
    }//try
811
    catch(Exception e)
812
    {
813
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
814
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
815
                                server + " in db because because " + e.getMessage());
816
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
817
    		  + "Error updating last checked time: " + e.getMessage());
818
    }
819
    finally
820
    {
821
       //return DBConnection
822
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
823
    }//finally
824
  }
825
  
826
  	/**
827
	 * Handle replicate system metadata
828
	 * 
829
	 * @param remoteserver
830
	 * @param guid
831
	 * @throws HandlerException
832
	 */
833
	private void handleSystemMetadata(String remoteserver, String guid) 
834
		throws HandlerException {
835
		try {
836

    
837
			// Try get the system metadata from remote server
838
			String sysMetaURLStr = "https://" + remoteserver + "?server="
839
					+ MetacatUtil.getLocalReplicationServerName()
840
					+ "&action=getsystemmetadata&guid=" + guid;
841
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
842
			URL sysMetaUrl = new URL(sysMetaURLStr);
843
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
844
							+ sysMetaUrl.toString());
845
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
846

    
847
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
848

    
849
			// process system metadata
850
			if (systemMetadataXML != null) {
851
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
852
								new ByteArrayInputStream(systemMetadataXML
853
										.getBytes("UTF-8")));
854
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
855
			}
856

    
857
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
858
							+ guid);
859

    
860
			String ip = getIpFromURL(sysMetaUrl);
861
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
862

    
863
		} catch (Exception e) {
864
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
865
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
866
			logReplication
867
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
868
							+ guid + " into db because " + e.getMessage());
869
			throw new HandlerException(
870
					"ReplicationHandler.handleSystemMetadata - generic exception "
871
							+ "writing Replication: " + e.getMessage());
872
		}
873

    
874
	}
875

    
876
  /**
877
   * updates xml_catalog with entries from other servers.
878
   */
879
  private void updateCatalog()
880
  {
881
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
882
    // ReplicationServer object in server list
883
    ReplicationServer replServer = null;
884
    PreparedStatement pstmt = null;
885
    String server = null;
886

    
887

    
888
    // Go through each ReplicationServer object in sererlist
889
    for (int j=0; j<serverList.size(); j++)
890
    {
891
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
892
      Vector<String> publicId = new Vector<String>();
893
      try
894
      {
895
        // Get ReplicationServer object from server list
896
        replServer = serverList.serverAt(j);
897
        // Get server name from the ReplicationServer object
898
        server = replServer.getServerName();
899
        // Try to get catalog
900
        URL u = new URL("https://" + server + "?server="+
901
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
902
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
903
        String catxml = ReplicationService.getURLContent(u);
904

    
905
        // Make sure there are not error, no empty string
906
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
907
        {
908
          throw new Exception("Couldn't get catalog list form server " +server);
909
        }
910
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
911
        CatalogMessageHandler cmh = new CatalogMessageHandler();
912
        XMLReader catparser = initParser(cmh);
913
        catparser.parse(new InputSource(new StringReader(catxml)));
914
        //parse the returned catalog xml and put it into a vector
915
        remoteCatalog = cmh.getCatalogVect();
916

    
917
        // Make sure remoteCatalog is not empty
918
        if (remoteCatalog.isEmpty())
919
        {
920
          throw new Exception("Couldn't get catalog list form server " +server);
921
        }
922

    
923
        String localcatxml = ReplicationService.getCatalogXML();
924

    
925
        // Make sure local catalog is no empty
926
        if (localcatxml==null||localcatxml.equals(""))
927
        {
928
          throw new Exception("Couldn't get catalog list form server " +server);
929
        }
930

    
931
        cmh = new CatalogMessageHandler();
932
        catparser = initParser(cmh);
933
        catparser.parse(new InputSource(new StringReader(localcatxml)));
934
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
935

    
936
        //now we have the catalog from the remote server and this local server
937
        //we now need to compare the two and merge the differences.
938
        //the comparison is base on the public_id fields which is the 4th
939
        //entry in each row vector.
940
        publicId = new Vector<String>();
941
        for(int i=0; i<localCatalog.size(); i++)
942
        {
943
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
944
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
945
          publicId.add(new String((String)v.elementAt(3)));
946
        }
947
      }//try
948
      catch (Exception e)
949
      {
950
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
951
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
952
                                    server + " because " +e.getMessage());
953
      }//catch
954

    
955
      for(int i=0; i<remoteCatalog.size(); i++)
956
      {
957
         // DConnection
958
        DBConnection dbConn = null;
959
        // DBConnection checkout serial number
960
        int serialNumber = -1;
961
        try
962
        {
963
            dbConn=DBConnectionPool.
964
                  getDBConnection("ReplicationHandler.updateCatalog");
965
            serialNumber=dbConn.getCheckOutSerialNumber();
966
            Vector<String> v = remoteCatalog.elementAt(i);
967
            //logMetacat.debug("v2: " + v.toString());
968
            //logMetacat.debug("i: " + i);
969
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
970
            //logMetacat.debug("publicID: " + publicId.toString());
971
            logReplication.info
972
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
973
           if(!publicId.contains(v.elementAt(3)))
974
           { //so we don't have this public id in our local table so we need to
975
             //add it.
976
             //logMetacat.debug("in if");
977
             StringBuffer sql = new StringBuffer();
978
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
979
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
980
             sql.append("?,?)");
981
             //logMetacat.debug("sql: " + sql.toString());
982
             pstmt = dbConn.prepareStatement(sql.toString());
983
             pstmt.setString(1, (String)v.elementAt(0));
984
             pstmt.setString(2, (String)v.elementAt(1));
985
             pstmt.setString(3, (String)v.elementAt(2));
986
             pstmt.setString(4, (String)v.elementAt(3));
987
             pstmt.setString(5, (String)v.elementAt(4));
988
             pstmt.execute();
989
             pstmt.close();
990
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
991
                               (String)v.elementAt(3) + " from server"+server);
992
           }
993
        }
994
        catch(Exception e)
995
        {
996
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
997
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
998
                                    server + " because " +e.getMessage());
999
        }//catch
1000
        finally
1001
        {
1002
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1003
        }//finally
1004
      }//for remote catalog
1005
    }//for server list
1006
    logReplication.info("End of updateCatalog");
1007
  }
1008

    
1009
  /**
1010
   * Method that returns true if docid has already been "deleted" from metacat.
1011
   * This method really implements a truth table for deleted documents
1012
   * The table is (a docid in one of the tables is represented by the X):
1013
   * xml_docs      xml_revs      deleted?
1014
   * ------------------------------------
1015
   *   X             X             FALSE
1016
   *   X             _             FALSE
1017
   *   _             X             TRUE
1018
   *   _             _             TRUE
1019
   */
1020
  private static boolean alreadyDeleted(String docid) throws HandlerException
1021
  {
1022
    DBConnection dbConn = null;
1023
    int serialNumber = -1;
1024
    PreparedStatement pstmt = null;
1025
    try
1026
    {
1027
      dbConn=DBConnectionPool.
1028
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1029
      serialNumber=dbConn.getCheckOutSerialNumber();
1030
      boolean xml_docs = false;
1031
      boolean xml_revs = false;
1032

    
1033
      StringBuffer sb = new StringBuffer();
1034
      sb.append("select docid from xml_revisions where docid like ? ");
1035
      pstmt = dbConn.prepareStatement(sb.toString());
1036
      pstmt.setString(1, docid);
1037
      pstmt.execute();
1038
      ResultSet rs = pstmt.getResultSet();
1039
      boolean tablehasrows = rs.next();
1040
      if(tablehasrows)
1041
      {
1042
        xml_revs = true;
1043
      }
1044

    
1045
      sb = new StringBuffer();
1046
      sb.append("select docid from xml_documents where docid like '");
1047
      sb.append(docid).append("'");
1048
      pstmt.close();
1049
      pstmt = dbConn.prepareStatement(sb.toString());
1050
      //increase usage count
1051
      dbConn.increaseUsageCount(1);
1052
      pstmt.execute();
1053
      rs = pstmt.getResultSet();
1054
      tablehasrows = rs.next();
1055
      pstmt.close();
1056
      if(tablehasrows)
1057
      {
1058
        xml_docs = true;
1059
      }
1060

    
1061
      if(xml_docs && xml_revs)
1062
      {
1063
        return false;
1064
      }
1065
      else if(xml_docs && !xml_revs)
1066
      {
1067
        return false;
1068
      }
1069
      else if(!xml_docs && xml_revs)
1070
      {
1071
        return true;
1072
      }
1073
      else if(!xml_docs && !xml_revs)
1074
      {
1075
        return true;
1076
      }
1077
    }
1078
    catch(Exception e)
1079
    {
1080
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1081
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1082
                          e.getMessage());
1083
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1084
    		  + e.getMessage());
1085
    }
1086
    finally
1087
    {
1088
      try
1089
      {
1090
        pstmt.close();
1091
      }//try
1092
      catch (SQLException ee)
1093
      {
1094
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1095
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1096
                          "to close pstmt: "+ee.getMessage());
1097
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1098
      		  + ee.getMessage());
1099
      }//catch
1100
      finally
1101
      {
1102
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1103
      }//finally
1104
    }//finally
1105
    return false;
1106
  }
1107

    
1108

    
1109
  /**
1110
   * Method to initialize the message parser
1111
   */
1112
  public static XMLReader initParser(DefaultHandler dh)
1113
          throws HandlerException
1114
  {
1115
    XMLReader parser = null;
1116

    
1117
    try {
1118
      ContentHandler chandler = dh;
1119

    
1120
      // Get an instance of the parser
1121
      String parserName = PropertyService.getProperty("xml.saxparser");
1122
      parser = XMLReaderFactory.createXMLReader(parserName);
1123

    
1124
      // Turn off validation
1125
      parser.setFeature("http://xml.org/sax/features/validation", false);
1126

    
1127
      parser.setContentHandler((ContentHandler)chandler);
1128
      parser.setErrorHandler((ErrorHandler)chandler);
1129

    
1130
    } catch (SAXException se) {
1131
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1132
    		  + " initializing parser: " + se.getMessage());
1133
    } catch (PropertyNotFoundException pnfe) {
1134
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1135
      		  + " getting parser name: " + pnfe.getMessage());
1136
    } 
1137

    
1138
    return parser;
1139
  }
1140

    
1141
  /**
1142
	 * This method will combine given time string(in short format) to current
1143
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1144
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1145
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1146
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1147
	 * 
1148
	 * @param givenTime
1149
	 *            the format should be "10:00 AM " or "2:00 PM"
1150
	 * @return
1151
	 * @throws Exception
1152
	 */
1153
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1154
  {
1155
	  try {
1156
     Date givenDate = parseTime(givenTime);
1157
     Date newDate = null;
1158
     Date now = new Date();
1159
     String currentTimeString = getTimeString(now);
1160
     Date currentTime = parseTime(currentTimeString); 
1161
     if ( currentTime.getTime() >= givenDate.getTime())
1162
     {
1163
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1164
        String dateAndTime = getDateString(now) + " " + givenTime;
1165
        Date combinationDate = parseDateTime(dateAndTime);
1166
        // new date should plus 24 hours to make is the second day
1167
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1168
     }
1169
     else
1170
     {
1171
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1172
         String dateAndTime = getDateString(now) + " " + givenTime;
1173
         newDate = parseDateTime(dateAndTime);
1174
     }
1175
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1176
     return newDate;
1177
	  } catch (ParseException pe) {
1178
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1179
				  + "parsing error: "  + pe.getMessage());
1180
	  }
1181
  }
1182

    
1183
  /*
1184
	 * parse a given string to Time in short format. For example, given time is
1185
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1186
	 */
1187
  private static Date parseTime(String timeString) throws ParseException
1188
  {
1189
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1190
    Date time = format.parse(timeString); 
1191
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1192
                              +time.toString());
1193
    return time;
1194

    
1195
  }
1196
  
1197
  /*
1198
   * Parse a given string to date and time. Date format is long and time
1199
   * format is short.
1200
   */
1201
  private static Date parseDateTime(String timeString) throws ParseException
1202
  {
1203
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1204
    Date time = format.parse(timeString);
1205
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1206
                             time.toString());
1207
    return time;
1208
  }
1209
  
1210
  /*
1211
   * Get a date string from a Date object. The date format will be long
1212
   */
1213
  private static String getDateString(Date now)
1214
  {
1215
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1216
     String s = df.format(now);
1217
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1218
     return s;
1219
  }
1220
  
1221
  /*
1222
   * Get a time string from a Date object, the time format will be short
1223
   */
1224
  private static String getTimeString(Date now)
1225
  {
1226
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1227
     String s = df.format(now);
1228
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1229
     return s;
1230
  }
1231
  
1232
  
1233
  /*
1234
	 * This method will go through the docid list both in xml_Documents table
1235
	 * and in xml_revisions table @author tao
1236
	 */
1237
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1238
		boolean dataFile = false;
1239
		for (int j = 0; j < docList.size(); j++) {
1240
			// initial dataFile is false
1241
			dataFile = false;
1242
			// w is information for one document, information contain
1243
			// docid, rev, server or datafile.
1244
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1245
			// Check if the vector w contain "datafile"
1246
			// If it has, this document is data file
1247
			try {
1248
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1249
					dataFile = true;
1250
				}
1251
			} catch (PropertyNotFoundException pnfe) {
1252
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1253
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1254
						+ "Leaving as false: " + pnfe.getMessage());
1255
			}
1256
			// logMetacat.debug("w: " + w.toString());
1257
			// Get docid
1258
			String docid = (String) w.elementAt(0);
1259
			logReplication.info("docid: " + docid);
1260
			// Get revision number
1261
			int rev = Integer.parseInt((String) w.elementAt(1));
1262
			logReplication.info("rev: " + rev);
1263
			// Get remote server name (it is may not be doc home server because
1264
			// the new hub feature
1265
			String remoteServer = (String) w.elementAt(2);
1266
			remoteServer = remoteServer.trim();
1267

    
1268
			try {
1269
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1270
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1271
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1272
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1273
				} else {
1274
					continue;
1275
				}
1276

    
1277
			} catch (Exception e) {
1278
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1279
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1280
						+ " in time replication" + e.getMessage());
1281
				continue;
1282
			}
1283
			
1284
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1285
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1286
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1287
	        }
1288
	        
1289
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1290
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1291
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1292
	        }
1293

    
1294
		}// for update docs
1295

    
1296
	}
1297
   
1298
   /*
1299
	 * This method will handle doc in xml_documents table.
1300
	 */
1301
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1302
                                        throws HandlerException
1303
   {
1304
       // compare the update rev and local rev to see what need happen
1305
       int localrev = -1;
1306
       String action = null;
1307
       boolean flag = false;
1308
       try
1309
       {
1310
    	 long docQueryStartTime = System.currentTimeMillis();
1311
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1312
         long docQueryEndTime = System.currentTimeMillis();
1313
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1314
         _xmlDocQueryCount++;
1315
       }
1316
       catch (SQLException e)
1317
       {
1318
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1319
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1320
                                " be found because " + e.getMessage());
1321
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1322
                 "written because error happend to find it's local revision");
1323
         DOCERRORNUMBER++;
1324
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1325
                 " be found: " + e.getMessage());
1326
       }
1327
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1328
                               localrev);
1329

    
1330
       //check the revs for an update because this document is in the
1331
       //local DB, it might be out of date.
1332
       if (localrev == -1)
1333
       {
1334
          // check if the revision is in the revision table
1335
    	   Vector<Integer> localRevVector = null;
1336
    	 try {
1337
        	 long revQueryStartTime = System.currentTimeMillis();
1338
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1339
             long revQueryEndTime = System.currentTimeMillis();
1340
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1341
             _xmlRevQueryCount++;
1342
    	 } catch (SQLException sqle) {
1343
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1344
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1345
    	 }
1346
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1347
         {
1348
             // this version was deleted, so don't need replicate
1349
             flag = false;
1350
         }
1351
         else
1352
         {
1353
           //insert this document as new because it is not in the local DB
1354
           action = "INSERT";
1355
           flag = true;
1356
         }
1357
       }
1358
       else
1359
       {
1360
         if(localrev == rev)
1361
         {
1362
           // Local meatacat has the same rev to remote host, don't need
1363
           // update and flag set false
1364
           flag = false;
1365
         }
1366
         else if(localrev < rev)
1367
         {
1368
           //this document needs to be updated so send an read request
1369
           action = "UPDATE";
1370
           flag = true;
1371
         }
1372
       }
1373
       
1374
       String accNumber = null;
1375
       try {
1376
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1377
       } catch (PropertyNotFoundException pnfe) {
1378
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1379
    			   + "account number separator : " + pnfe.getMessage());
1380
       }
1381
       // this is non-data file
1382
       if(flag && !dataFile)
1383
       {
1384
         try
1385
         {
1386
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1387
         }
1388
         catch(HandlerException he)
1389
         {
1390
           // skip this document
1391
           throw he;
1392
         }
1393
       }//if for non-data file
1394

    
1395
        // this is for data file
1396
       if(flag && dataFile)
1397
       {
1398
         try
1399
         {
1400
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1401
         }
1402
         catch(HandlerException he)
1403
         {
1404
           // skip this data file
1405
           throw he;
1406
         }
1407

    
1408
       }//for data file
1409
   }
1410
   
1411
   /*
1412
    * This method will handle doc in xml_documents table.
1413
    */
1414
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1415
                                        throws HandlerException
1416
   {
1417
       // compare the update rev and local rev to see what need happen
1418
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1419
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1420
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1421
       Vector<Integer> localrev = null;
1422
       String action = "INSERT";
1423
       boolean flag = false;
1424
       try
1425
       {
1426
      	 long revQueryStartTime = System.currentTimeMillis();
1427
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1428
         long revQueryEndTime = System.currentTimeMillis();
1429
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1430
         _xmlRevQueryCount++;
1431
       }
1432
       catch (SQLException sqle)
1433
       {
1434
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1435
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1436
                                " be found because " + sqle.getMessage());
1437
         REVERRORNUMBER++;
1438
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1439
        		 + sqle.getMessage());
1440
       }
1441
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1442
                               localrev.toString());
1443
       
1444
       // if the rev is not in the xml_revision, we need insert it
1445
       if (!localrev.contains(new Integer(rev)))
1446
       {
1447
           flag = true;    
1448
       }
1449
     
1450
       String accNumber = null;
1451
       try {
1452
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1453
       } catch (PropertyNotFoundException pnfe) {
1454
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1455
    			   + "account number separator : " + pnfe.getMessage());
1456
       }
1457
       // this is non-data file
1458
       if(flag && !dataFile)
1459
       {
1460
         try
1461
         {
1462
           
1463
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1464
         }
1465
         catch(HandlerException he)
1466
         {
1467
           // skip this document
1468
           throw he;
1469
         }
1470
       }//if for non-data file
1471

    
1472
        // this is for data file
1473
       if(flag && dataFile)
1474
       {
1475
         try
1476
         {
1477
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1478
         }
1479
         catch(HandlerException he)
1480
         {
1481
           // skip this data file
1482
           throw he;
1483
         }
1484

    
1485
       }//for data file
1486
   }
1487
   
1488
   /*
1489
    * Return a ip address for given url
1490
    */
1491
   private String getIpFromURL(URL url)
1492
   {
1493
	   String ip = null;
1494
	   try
1495
	   {
1496
	      InetAddress address = InetAddress.getByName(url.getHost());
1497
	      ip = address.getHostAddress();
1498
	   }
1499
	   catch(UnknownHostException e)
1500
	   {
1501
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1502
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1503
                   +e.getMessage());
1504
	   }
1505

    
1506
	   return ip;
1507
   }
1508
  
1509
}
1510

    
(4-4/8)