Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *
8
 *   '$Author: tao $'
9
 *     '$Date: 2016-09-09 21:37:43 -0700 (Fri, 09 Sep 2016) $'
10
 * '$Revision: 9958 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat.replication;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.InputStream;
31
import java.io.StringReader;
32
import java.net.InetAddress;
33
import java.net.URL;
34
import java.net.UnknownHostException;
35
import java.sql.PreparedStatement;
36
import java.sql.ResultSet;
37
import java.sql.SQLException;
38
import java.sql.Timestamp;
39
import java.text.DateFormat;
40
import java.text.ParseException;
41
import java.util.Date;
42
import java.util.Hashtable;
43
import java.util.Iterator;
44
import java.util.TimerTask;
45
import java.util.Vector;
46

    
47
import org.apache.commons.io.IOUtils;
48
import org.apache.log4j.Logger;
49
import org.dataone.service.types.v2.SystemMetadata;
50
import org.dataone.service.util.DateTimeMarshaller;
51
import org.dataone.service.util.TypeMarshaller;
52
import org.xml.sax.ContentHandler;
53
import org.xml.sax.ErrorHandler;
54
import org.xml.sax.InputSource;
55
import org.xml.sax.SAXException;
56
import org.xml.sax.XMLReader;
57
import org.xml.sax.helpers.DefaultHandler;
58
import org.xml.sax.helpers.XMLReaderFactory;
59

    
60
import edu.ucsb.nceas.metacat.CatalogMessageHandler;
61
import edu.ucsb.nceas.metacat.DBUtil;
62
import edu.ucsb.nceas.metacat.DocumentImpl;
63
import edu.ucsb.nceas.metacat.DocumentImplWrapper;
64
import edu.ucsb.nceas.metacat.EventLog;
65
import edu.ucsb.nceas.metacat.IdentifierManager;
66
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
67
import edu.ucsb.nceas.metacat.SchemaLocationResolver;
68
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlForSingleFile;
69
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
70
import edu.ucsb.nceas.metacat.database.DBConnection;
71
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
72
import edu.ucsb.nceas.metacat.dataone.hazelcast.HazelcastService;
73
import edu.ucsb.nceas.metacat.index.MetacatSolrIndex;
74
import edu.ucsb.nceas.metacat.properties.PropertyService;
75
import edu.ucsb.nceas.metacat.shared.HandlerException;
76
import edu.ucsb.nceas.metacat.util.DocumentUtil;
77
import edu.ucsb.nceas.metacat.util.MetacatUtil;
78
import edu.ucsb.nceas.metacat.util.ReplicationUtil;
79
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
80
import edu.ucsb.nceas.utilities.access.DocInfoHandler;
81
import edu.ucsb.nceas.utilities.access.XMLAccessDAO;
82

    
83

    
84

    
85
/**
86
 * This class handles deltaT replication checking.  Whenever this TimerTask
87
 * is fired it checks each server in xml_replication for updates and updates
88
 * the local db as needed.
89
 */
90
public class ReplicationHandler extends TimerTask
91
{
92
  int serverCheckCode = 1;
93
  ReplicationServerList serverList = null;
94
  //PrintWriter out;
95
//  private static final AbstractDatabase dbAdapter = MetacatUtil.dbAdapter;
96
  private static Logger logReplication = Logger.getLogger("ReplicationLogging");
97
  private static Logger logMetacat = Logger.getLogger(ReplicationHandler.class);
98
  
99
  private static int DOCINSERTNUMBER = 1;
100
  private static int DOCERRORNUMBER  = 1;
101
  private static int REVINSERTNUMBER = 1;
102
  private static int REVERRORNUMBER  = 1;
103
  
104
  private static int _xmlDocQueryCount = 0;
105
  private static int _xmlRevQueryCount = 0;
106
  private static long _xmlDocQueryTime = 0;
107
  private static long _xmlRevQueryTime = 0;
108
  
109
  
110
  public ReplicationHandler()
111
  {
112
    //this.out = o;
113
    serverList = new ReplicationServerList();
114
  }
115

    
116
  public ReplicationHandler(int serverCheckCode)
117
  {
118
    //this.out = o;
119
    this.serverCheckCode = serverCheckCode;
120
    serverList = new ReplicationServerList();
121
  }
122

    
123
  /**
124
   * Method that implements TimerTask.run().  It runs whenever the timer is
125
   * fired.
126
   */
127
  public void run()
128
  {
129
    //find out the last_checked time of each server in the server list and
130
    //send a query to each server to see if there are any documents in
131
    //xml_documents with an update_date > last_checked
132
	  
133
      //if serverList is null, metacat don't need to replication
134
      if (serverList==null||serverList.isEmpty())
135
      {
136
        return;
137
      }
138
      updateCatalog();
139
      update();
140
      //conn.close();
141
  }
142

    
143
  /**
144
   * Method that uses revision tagging for replication instead of update_date.
145
   */
146
  private void update()
147
  {
148
    Vector<InputStream> responses = new Vector<InputStream>();
149
    try {
150

    
151
        _xmlDocQueryCount = 0;
152
        _xmlRevQueryCount = 0;
153
        _xmlDocQueryTime = 0;
154
        _xmlRevQueryTime = 0;
155
        /*
156
     Pseudo-algorithm
157
     - request a doc list from each server in xml_replication
158
     - check the rev number of each of those documents agains the
159
       documents in the local database
160
     - pull any documents that have a lesser rev number on the local server
161
       from the remote server
162
     - delete any documents that still exist in the local xml_documents but
163
       are in the deletedDocuments tag of the remote host response.
164
     - update last_checked to keep track of the last time it was checked.
165
       (this info is theoretically not needed using this system but probably
166
       should be kept anyway)
167
         */
168

    
169
        ReplicationServer replServer = null; // Variable to store the
170
        // ReplicationServer got from
171
        // Server list
172
        String server = null; // Variable to store server name
173
        //    String update;
174

    
175
        URL u;
176
        long replicationStartTime = System.currentTimeMillis();
177
        long timeToGetServerList = 0;
178

    
179
        //Check for every server in server list to get updated list and put
180
        // them in to response
181
        long startTimeToGetServers = System.currentTimeMillis();
182
        for (int i=0; i<serverList.size(); i++)
183
        {
184
            // Get ReplicationServer object from server list
185
            replServer = serverList.serverAt(i);
186
            // Get server name from ReplicationServer object
187
            server = replServer.getServerName().trim();
188
            InputStream result = null;
189
            logReplication.info("ReplicationHandler.update - full update started to: " + server);
190
            // Send command to that server to get updated docid information
191
            try
192
            {
193
                u = new URL("https://" + server + "?server="
194
                        +MetacatUtil.getLocalReplicationServerName()+"&action=update");
195
                logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
196
                result = ReplicationService.getURLStream(u);
197
            }
198
            catch (Exception e)
199
            {
200
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
201
                logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
202
                        "for server " + server + " because "+e.getMessage());
203
                continue;
204
            }
205

    
206
            //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
207
            //check if result have error or not, if has skip it.
208
            // TODO: check for error in stream
209
            //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
210
            if (result == null) {
211
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
212
                logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
213
                        "for server " + server + " because "+result);
214
                continue;
215
            }
216
            //Add result to vector
217
            responses.add(result);
218
        }
219
        timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
220

    
221
        //make sure that there is updated file list
222
        //If response is null, metacat don't need do anything
223
        if (responses==null || responses.isEmpty())
224
        {
225
            logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
226
            logReplication.info( "ReplicationHandler.update - No updated doc list for "+
227
                    "every server and failed to replicate");
228
            return;
229
        }
230

    
231

    
232
        //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
233
        //               "document information: "+ responses.toString());
234

    
235
        long totalServerListParseTime = 0;
236
        // go through response vector(it contains updated vector and delete vector
237
        for(int i=0; i<responses.size(); i++)
238
        {
239
            long startServerListParseTime = System.currentTimeMillis();
240
            XMLReader parser;
241
            ReplMessageHandler message = new ReplMessageHandler();
242
            try
243
            {
244
                parser = initParser(message);
245
            }
246
            catch (Exception e)
247
            {
248
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
249
                logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
250
                        " initParser for message and " +e.getMessage());
251
                // stop replication
252
                return;
253
            }
254

    
255
            try
256
            {
257
                parser.parse(new InputSource(responses.elementAt(i)));
258
            }
259
            catch(Exception e)
260
            {
261
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
262
                logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
263
                        "because "+ e.getMessage());
264
                continue;
265
            }
266
            finally 
267
            {
268
                IOUtils.closeQuietly(responses.elementAt(i));
269
            }
270
            //v is the list of updated documents
271
            Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
272
            logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
273
            //d is the list of deleted documents
274
            Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
275
            logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
276
            logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
277
            logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
278
            // go though every element in updated document vector
279
            handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
280
            //handle deleted docs
281
            for(int k=0; k<deleteList.size(); k++)
282
            { //delete the deleted documents;
283
                Vector<String> w = new Vector<String>(deleteList.elementAt(k));
284
                String docId = (String)w.elementAt(0);
285
                try
286
                {
287
                    handleDeleteSingleDocument(docId, server);
288
                }
289
                catch (Exception ee)
290
                {
291
                    continue;
292
                }
293
            }//for delete docs
294

    
295
            // handle replicate doc in xml_revision
296
            Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
297
            logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
298
            handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
299
            DOCINSERTNUMBER = 1;
300
            DOCERRORNUMBER  = 1;
301
            REVINSERTNUMBER = 1;
302
            REVERRORNUMBER  = 1;
303

    
304
            // handle system metadata
305
            Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
306
            for(int k = 0; k < systemMetadataList.size(); k++) { 
307
                Vector<String> w = systemMetadataList.elementAt(k);
308
                String guid = (String) w.elementAt(0);
309
                String remoteserver = (String) w.elementAt(1);
310
                try {
311
                    handleSystemMetadata(remoteserver, guid);
312
                }
313
                catch (Exception ee) {
314
                    logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
315
                    continue;
316
                }
317
            }
318

    
319
            totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
320
        }//for response
321

    
322
        //updated last_checked
323
        for (int i=0;i<serverList.size(); i++)
324
        {
325
            // Get ReplicationServer object from server list
326
            replServer = serverList.serverAt(i);
327
            try
328
            {
329
                updateLastCheckTimeForSingleServer(replServer);
330
            }
331
            catch(Exception e)
332
            {
333
                continue;
334
            }
335
        }//for
336

    
337
        long replicationEndTime = System.currentTimeMillis();
338
        logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
339
                (replicationEndTime - replicationStartTime));
340
        logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
341
                timeToGetServerList);
342
        logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
343
                totalServerListParseTime);
344
        logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
345
                _xmlDocQueryCount);
346
        logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
347
                _xmlDocQueryTime + " ms");
348
        logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
349
                _xmlRevQueryCount);
350
        logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
351
                _xmlRevQueryTime + " ms");;
352

    
353
    } finally { // need to close all inputstreams unconditionally
354
        Iterator<InputStream> isit = responses.iterator();
355
        while (isit.hasNext()) {
356
            IOUtils.closeQuietly(isit.next());
357
        }
358
    }
359
    }//update
360

    
361
  /* Handle replicate single xml document*/
362
  private void handleSingleXMLDocument(String remoteserver, String actions,
363
                                       String accNumber, String tableName)
364
               throws HandlerException
365
  {
366
    DBConnection dbConn = null;
367
    int serialNumber = -1;
368
    try
369
    {
370
      // Get DBConnection from pool
371
      dbConn=DBConnectionPool.
372
                  getDBConnection("ReplicationHandler.handleSingleXMLDocument");
373
      serialNumber=dbConn.getCheckOutSerialNumber();
374
      //if the document needs to be updated or inserted, this is executed
375
      String readDocURLString = "https://" + remoteserver + "?server="+
376
              MetacatUtil.getLocalReplicationServerName()+"&action=read&docid="+accNumber;
377
      readDocURLString = MetacatUtil.replaceWhiteSpaceForURL(readDocURLString);
378
      URL u = new URL(readDocURLString);
379

    
380
      // Get docid content
381
      byte[] xmlBytes = ReplicationService.getURLBytes(u);
382
      String newxmldoc = new String(xmlBytes, "UTF-8");
383
      // If couldn't get skip it
384
      if ( newxmldoc.indexOf("<error>")!= -1 && newxmldoc.indexOf("</error>")!=-1)
385
      {
386
         throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - " + newxmldoc);
387
      }
388
      //logReplication.info("xml documnet:");
389
      //logReplication.info(newxmldoc);
390

    
391
      // Try get the docid info from remote server
392
      DocInfoHandler dih = new DocInfoHandler();
393
      XMLReader docinfoParser = initParser(dih);
394
      String docInfoURLStr = "https://" + remoteserver +
395
                       "?server="+MetacatUtil.getLocalReplicationServerName()+
396
                       "&action=getdocumentinfo&docid="+accNumber;
397
      docInfoURLStr = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLStr);
398
      URL docinfoUrl = new URL(docInfoURLStr);
399
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Sending message: " + docinfoUrl.toString());
400
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
401
      
402
      // strip out the system metadata portion
403
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
404
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);
405
   	  SystemMetadata sysMeta = null;
406
   	  // process system metadata if we have it
407
      if (systemMetadataXML != null) {
408
    	  sysMeta = 
409
    		  TypeMarshaller.unmarshalTypeFromStream(
410
    				  SystemMetadata.class, 
411
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
412
    	  // need the guid-to-docid mapping
413
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
414
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
415
    	  }
416
      	  // save the system metadata
417
    	  logReplication.debug("Saving SystemMetadata to shared map: " + sysMeta.getIdentifier().getValue());
418
      	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
419
      	  
420
      }
421
   	  
422
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
423
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
424
      // Get home server of the docid
425
      String docHomeServer = docinfoHash.get("home_server");
426
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doc home server in repl: "+docHomeServer);
427
     
428
      // dates
429
      String createdDateString = docinfoHash.get("date_created");
430
      String updatedDateString = docinfoHash.get("date_updated");
431
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
432
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
433
      
434
      //docid should include rev number too
435
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
436
                                              (String)docinfoHash.get("rev");*/
437
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - docid in repl: "+accNumber);
438
      String docType = docinfoHash.get("doctype");
439
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - doctype in repl: "+docType);
440

    
441
      String parserBase = null;
442
      // this for eml2 and we need user eml2 parser
443
      if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_0NAMESPACE))
444
      {
445
         parserBase = DocumentImpl.EML200;
446
      }
447
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_0_1NAMESPACE))
448
      {
449
        parserBase = DocumentImpl.EML200;
450
      }
451
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_0NAMESPACE))
452
      {
453
        parserBase = DocumentImpl.EML210;
454
      }
455
      else if (docType != null && (docType.trim()).equals(DocumentImpl.EML2_1_1NAMESPACE))
456
      {
457
        parserBase = DocumentImpl.EML210;
458
      }
459
      
460
      /*String formatId = null;
461
      //get the format id from the system metadata 
462
      if(sysMeta != null && sysMeta.getFormatId() != null) {
463
          logMetacat.debug("ReplicationService.handleForceReplicateRequest - the format id will be got from the system metadata for the object "+accNumber);
464
          formatId = sysMeta.getFormatId().getValue();
465
      }*/
466
      // Write the document into local host
467
      DocumentImplWrapper wrapper = new DocumentImplWrapper(parserBase, false, false);
468
      String newDocid = wrapper.writeReplication(dbConn,
469
                              newxmldoc, xmlBytes,
470
                              docinfoHash.get("public_access"),
471
                              null,  /* the dtd text */
472
                              actions,
473
                              accNumber,
474
                              null, //docinfoHash.get("user_owner"),                              
475
                              null, /* null for groups[] */
476
                              docHomeServer,
477
                              remoteserver, tableName, true,// true is for time replication 
478
                              createdDate,
479
                              updatedDate);
480
      
481
      if(sysMeta != null) {
482
			// submit for indexing. When the doc writing process fails, the index process will fail as well. But this failure
483
			// will not interrupt the process.
484
			try {
485
				MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
486
			} catch (Exception ee) {
487
				logReplication.warn("ReplicationService.handleForceReplicateRequest - couldn't index the doc since "+ee.getMessage());
488
			}
489
          
490
		}
491
      
492
      //set the user information
493
      String user = (String) docinfoHash.get("user_owner");
494
      String updated = (String) docinfoHash.get("user_updated");
495
      ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
496
      
497
      //process extra access rules 
498
      try {
499
      	// check if we had a guid -> docid mapping
500
      	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
501
      	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
502
      	IdentifierManager.getInstance().getGUID(docid, rev);
503
      	// no need to create the mapping if we have it
504
      } catch (McdbDocNotFoundException mcdbe) {
505
      	// create mapping if we don't
506
      	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
507
      }
508
      Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
509
      if (xmlAccessDAOList != null) {
510
      	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
511
      	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
512
      		if (!acfsf.accessControlExists(xmlAccessDAO)) {
513
      			acfsf.insertPermissions(xmlAccessDAO);
514
      		}
515
          }
516
      }
517
      
518
      
519
      logReplication.info("ReplicationHandler.handleSingleXMLDocument - Successfully replicated doc " + accNumber);
520
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
521
      {
522
        logReplication.info("ReplicationHandler.handleSingleXMLDocument - " + DOCINSERTNUMBER + " Wrote xml doc " + accNumber +
523
                                     " into "+tableName + " from " +
524
                                         remoteserver);
525
        DOCINSERTNUMBER++;
526
      }
527
      else
528
      {
529
          logReplication.info("ReplicationHandler.handleSingleXMLDocument - " +REVINSERTNUMBER + " Wrote xml doc " + accNumber +
530
                  " into "+tableName + " from " +
531
                      remoteserver);
532
          REVINSERTNUMBER++;
533
      }
534
      String ip = getIpFromURL(u);
535
      EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
536
      
537

    
538
    }//try
539
    catch(Exception e)
540
    {
541
        
542
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
543
        {
544
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
545
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +DOCERRORNUMBER + " Failed to write xml doc " + accNumber +
546
                                       " into "+tableName + " from " +
547
                                           remoteserver + " because "+e.getMessage());
548
          DOCERRORNUMBER++;
549
        }
550
        else
551
        {
552
        	logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
553
        	logReplication.error("ReplicationHandler.handleSingleXMLDocument - " +REVERRORNUMBER + " Failed to write xml doc " + accNumber +
554
                    " into "+tableName + " from " +
555
                        remoteserver +" because "+e.getMessage());
556
            REVERRORNUMBER++;
557
        }
558
        logMetacat.error("ReplicationHandler.handleSingleXMLDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
559
        logReplication.error("ReplicationHandler.handleSingleXMLDocument - Failed to write doc " + accNumber +
560
                                      " into db because " + e.getMessage(), e);
561
      throw new HandlerException("ReplicationHandler.handleSingleXMLDocument - generic exception " 
562
    		  + "writing Replication: " +e.getMessage());
563
    }
564
    finally
565
    {
566
       //return DBConnection
567
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
568
    }//finally
569
    logMetacat.info("replication.create localId:" + accNumber);
570
  }
571

    
572

    
573

    
574
  /* Handle replicate single xml document*/
575
  private void handleSingleDataFile(String remoteserver, String actions,
576
                                    String accNumber, String tableName)
577
               throws HandlerException
578
  {
579
    logReplication.info("ReplicationHandler.handleSingleDataFile - Try to replicate data file: " + accNumber);
580
    DBConnection dbConn = null;
581
    int serialNumber = -1;
582
    InputStream input = null;
583
    try
584
    {
585
      // Get DBConnection from pool
586
      dbConn=DBConnectionPool.
587
                  getDBConnection("ReplicationHandler.handleSinlgeDataFile");
588
      serialNumber=dbConn.getCheckOutSerialNumber();
589
      // Try get docid info from remote server
590
      DocInfoHandler dih = new DocInfoHandler();
591
      XMLReader docinfoParser = initParser(dih);
592
      String docInfoURLString = "https://" + remoteserver +
593
                  "?server="+MetacatUtil.getLocalReplicationServerName()+
594
                  "&action=getdocumentinfo&docid="+accNumber;
595
      docInfoURLString = MetacatUtil.replaceWhiteSpaceForURL(docInfoURLString);
596
      URL docinfoUrl = new URL(docInfoURLString);
597

    
598
      String docInfoStr = ReplicationService.getURLContent(docinfoUrl);
599
      
600
      // strip out the system metadata portion
601
      String systemMetadataXML = ReplicationUtil.getSystemMetadataContent(docInfoStr);
602
   	  docInfoStr = ReplicationUtil.getContentWithoutSystemMetadata(docInfoStr);  
603
   	  
604
   	  // process system metadata
605
      if (systemMetadataXML != null) {
606
    	  SystemMetadata sysMeta = 
607
    		TypeMarshaller.unmarshalTypeFromStream(
608
    				  SystemMetadata.class, 
609
    				  new ByteArrayInputStream(systemMetadataXML.getBytes("UTF-8")));
610
    	  // need the guid-to-docid mapping
611
    	  if (!IdentifierManager.getInstance().mappingExists(sysMeta.getIdentifier().getValue())) {
612
	      	  IdentifierManager.getInstance().createMapping(sysMeta.getIdentifier().getValue(), accNumber);
613
    	  }
614
    	  // save the system metadata
615
    	  HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
616
    	  // submit for indexing
617
          MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
618

    
619
      }
620
   	  
621
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
622
      Hashtable<String, String> docinfoHash = dih.getDocInfo();
623
      
624
      // Get docid name (such as acl or dataset)
625
      String docName = docinfoHash.get("docname");
626
      // Get doc type (eml public id)
627
      String docType = docinfoHash.get("doctype");
628
      // Get docid home sever. it might be different to remoteserver
629
      // because of hub feature
630
      String docHomeServer = docinfoHash.get("home_server");
631
      String createdDateString = docinfoHash.get("date_created");
632
      String updatedDateString = docinfoHash.get("date_updated");
633
      Date createdDate = DateTimeMarshaller.deserializeDateToUTC(createdDateString);
634
      Date updatedDate = DateTimeMarshaller.deserializeDateToUTC(updatedDateString);
635
      //docid should include rev number too
636
      /*String accnum=docId+util.getProperty("document.accNumSeparator")+
637
                                              (String)docinfoHash.get("rev");*/
638

    
639
      String datafilePath = PropertyService.getProperty("application.datafilepath");
640
      // Get data file content
641
      String readDataURLString = "https://" + remoteserver + "?server="+
642
                                        MetacatUtil.getLocalReplicationServerName()+
643
                                            "&action=readdata&docid="+accNumber;
644
      readDataURLString = MetacatUtil.replaceWhiteSpaceForURL(readDataURLString);
645
      URL u = new URL(readDataURLString);
646
      input = ReplicationService.getURLStream(u);
647
      //register data file into xml_documents table and wite data file
648
      //into file system
649
      if ( input != null)
650
      {
651
        DocumentImpl.writeDataFileInReplication(input,
652
                                                datafilePath,
653
                                                docName,docType,
654
                                                accNumber,
655
                                                null,
656
                                                docHomeServer,
657
                                                remoteserver,
658
                                                tableName,
659
                                                true, //true means timed replication
660
                                                createdDate,
661
                                                updatedDate);
662
                                         
663
        //set the user information
664
        String user = (String) docinfoHash.get("user_owner");
665
		String updated = (String) docinfoHash.get("user_updated");
666
        ReplicationService.updateUserOwner(dbConn, accNumber, user, updated);
667
        
668
        //process extra access rules
669
        try {
670
        	// check if we had a guid -> docid mapping
671
        	String docid = DocumentUtil.getDocIdFromAccessionNumber(accNumber);
672
        	int rev = DocumentUtil.getRevisionFromAccessionNumber(accNumber);
673
        	IdentifierManager.getInstance().getGUID(docid, rev);
674
        	// no need to create the mapping if we have it
675
        } catch (McdbDocNotFoundException mcdbe) {
676
        	// create mapping if we don't
677
        	IdentifierManager.getInstance().createMapping(accNumber, accNumber);
678
        }
679
        Vector<XMLAccessDAO> xmlAccessDAOList = dih.getAccessControlList();
680
        if (xmlAccessDAOList != null) {
681
        	AccessControlForSingleFile acfsf = new AccessControlForSingleFile(accNumber);
682
        	for (XMLAccessDAO xmlAccessDAO : xmlAccessDAOList) {
683
        		if (!acfsf.accessControlExists(xmlAccessDAO)) {
684
        			acfsf.insertPermissions(xmlAccessDAO);
685
        		}
686
            }
687
        }
688
        
689
        logReplication.info("ReplicationHandler.handleSingleDataFile - Successfully to write datafile " + accNumber);
690
        /*MetacatReplication.replLog("wrote datafile " + accNumber + " from " +
691
                                    remote server);*/
692
        if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
693
        {
694
          logReplication.info("ReplicationHandler.handleSingleDataFile - " + DOCINSERTNUMBER + " Wrote data file" + accNumber +
695
                                       " into "+tableName + " from " +
696
                                           remoteserver);
697
          DOCINSERTNUMBER++;
698
        }
699
        else
700
        {
701
            logReplication.info("ReplicationHandler.handleSingleDataFile - " + REVINSERTNUMBER + " Wrote data file" + accNumber +
702
                    " into "+tableName + " from " +
703
                        remoteserver);
704
            REVINSERTNUMBER++;
705
        }
706
        String ip = getIpFromURL(u);
707
        EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, accNumber, actions);
708
        
709
      }//if
710
      else
711
      {
712
         logReplication.info("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
713
         throw new HandlerException("ReplicationHandler.handleSingleDataFile - Couldn't open the data file: " + accNumber);
714
      }//else
715

    
716
    }//try
717
    catch(Exception e)
718
    {
719
      /*MetacatReplication.replErrorLog("Failed to try wrote data file " + accNumber +
720
                                      " because " +e.getMessage());*/
721
      if (tableName.equals(DocumentImpl.DOCUMENTTABLE))
722
      {
723
    	logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
724
    	logReplication.error("ReplicationHandler.handleSingleDataFile - " + DOCERRORNUMBER + " Failed to write data file " + accNumber +
725
                                     " into " + tableName + " from " +
726
                                         remoteserver + " because " + e.getMessage());
727
        DOCERRORNUMBER++;
728
      }
729
      else
730
      {
731
    	  logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
732
    	  logReplication.error("ReplicationHandler.handleSingleDataFile - " + REVERRORNUMBER + " Failed to write data file" + accNumber +
733
                  " into " + tableName + " from " +
734
                      remoteserver +" because "+ e.getMessage());
735
          REVERRORNUMBER++;
736
      }
737
      logMetacat.error("ReplicationHandler.handleSingleDataFile - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
738
      logReplication.error("ReplicationHandler.handleSingleDataFile - Failed to try wrote datafile " + accNumber +
739
                                      " because " + e.getMessage());
740
      throw new HandlerException("ReplicationHandler.handleSingleDataFile - generic exception " 
741
    		  + "writing Replication: " + e.getMessage());
742
    }
743
    finally
744
    {
745
       IOUtils.closeQuietly(input);
746
       //return DBConnection
747
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
748

    
749
     
750
    }//finally
751
    logMetacat.info("replication.create localId:" + accNumber);
752
  }
753

    
754

    
755

    
756
  /* Handle delete single document*/
757
  private void handleDeleteSingleDocument(String docId, String notifyServer)
758
               throws HandlerException
759
  {
760
    logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Try delete doc: "+docId);
761
    DBConnection dbConn = null;
762
    int serialNumber = -1;
763
    try
764
    {
765
      // Get DBConnection from pool
766
      dbConn=DBConnectionPool.
767
                  getDBConnection("ReplicationHandler.handleDeleteSingleDoc");
768
      serialNumber=dbConn.getCheckOutSerialNumber();
769
      if(!alreadyDeleted(docId))
770
      {
771

    
772
         //because delete method docid should have rev number
773
         //so we just add one for it. This rev number is no sence.
774
         String accnum=docId+PropertyService.getProperty("document.accNumSeparator")+"1";
775
         DocumentImpl.delete(accnum, null, null, notifyServer, false);
776
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Successfully deleted doc " + docId);
777
         logReplication.info("ReplicationHandler.handleDeleteSingleDocument - Doc " + docId + " deleted");
778
         URL u = new URL("https://"+notifyServer);
779
         String ip = getIpFromURL(u);
780
         EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, docId, "delete");
781
      }
782

    
783
    }//try
784
    catch(McdbDocNotFoundException e)
785
    {
786
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
787
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
788
                                 " in db because because " + e.getMessage());
789
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
790
    		  + "when handling document: " + e.getMessage());
791
    }
792
    catch(InsufficientKarmaException e)
793
    {
794
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
795
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
796
                                 " in db because because " + e.getMessage());
797
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
798
    		  + "when handling document: " + e.getMessage());
799
    }
800
    catch(SQLException e)
801
    {
802
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
803
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
804
                                 " in db because because " + e.getMessage());
805
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
806
    		  + "when handling document: " + e.getMessage());
807
    }
808
    catch(Exception e)
809
    {
810
      logMetacat.error("ReplicationHandler.handleDeleteSingleDocument - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
811
      logReplication.error("ReplicationHandler.handleDeleteSingleDocument - Failed to delete doc " + docId +
812
                                 " in db because because " + e.getMessage());
813
      throw new HandlerException("ReplicationHandler.handleDeleteSingleDocument - generic exception " 
814
    		  + "when handling document: " + e.getMessage());
815
    }
816
    finally
817
    {
818
       //return DBConnection
819
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
820
    }//finally
821
    logMetacat.info("replication.handleDeleteSingleDocument localId:" + docId);
822
  }
823

    
824
  /* Handle updateLastCheckTimForSingleServer*/
825
  private void updateLastCheckTimeForSingleServer(ReplicationServer repServer)
826
                                                  throws HandlerException
827
  {
828
    String server = repServer.getServerName();
829
    DBConnection dbConn = null;
830
    int serialNumber = -1;
831
    PreparedStatement pstmt = null;
832
    try
833
    {
834
      // Get DBConnection from pool
835
      dbConn=DBConnectionPool.
836
             getDBConnection("ReplicationHandler.updateLastCheckTimeForServer");
837
      serialNumber=dbConn.getCheckOutSerialNumber();
838

    
839
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Try to update last_check for server: "+server);
840
      // Get time from remote server
841
      URL dateurl = new URL("https://" + server + "?server="+
842
      MetacatUtil.getLocalReplicationServerName()+"&action=gettime");
843
      String datexml = ReplicationService.getURLContent(dateurl);
844
      logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - datexml: "+datexml);
845
      if (datexml != null && !datexml.equals("")) {
846
    	  
847
    	  // parse the ISO datetime
848
         String datestr = datexml.substring(11, datexml.indexOf('<', 11));
849
         Date updated = DateTimeMarshaller.deserializeDateToUTC(datestr);
850
         
851
         StringBuffer sql = new StringBuffer();
852
         sql.append("update xml_replication set last_checked = ? ");
853
         sql.append(" where server like ? ");
854
         pstmt = dbConn.prepareStatement(sql.toString());
855
         pstmt.setTimestamp(1, new Timestamp(updated.getTime()));
856
         pstmt.setString(2, server);
857
         
858
         pstmt.executeUpdate();
859
         dbConn.commit();
860
         pstmt.close();
861
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - last_checked updated to "+datestr+" on "
862
                                      + server);
863
      }//if
864
      else
865
      {
866

    
867
         logReplication.info("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server "  +
868
                                  server + " in db because couldn't get time "
869
                                  );
870
         throw new Exception("Couldn't get time for server "+ server);
871
      }
872

    
873
    }//try
874
    catch(Exception e)
875
    {
876
      logMetacat.error("ReplicationHandler.updateLastCheckTimeForSingleServer - " + ReplicationService.METACAT_REPL_ERROR_MSG); 
877
      logReplication.error("ReplicationHandler.updateLastCheckTimeForSingleServer - Failed to update last_checked for server " +
878
                                server + " in db because because " + e.getMessage());
879
      throw new HandlerException("ReplicationHandler.updateLastCheckTimeForSingleServer - " 
880
    		  + "Error updating last checked time: " + e.getMessage());
881
    }
882
    finally
883
    {
884
       //return DBConnection
885
       DBConnectionPool.returnDBConnection(dbConn, serialNumber);
886
    }//finally
887
  }
888
  
889
  	/**
890
	 * Handle replicate system metadata
891
	 * 
892
	 * @param remoteserver
893
	 * @param guid
894
	 * @throws HandlerException
895
	 */
896
	private void handleSystemMetadata(String remoteserver, String guid) 
897
		throws HandlerException {
898
		try {
899

    
900
			// Try get the system metadata from remote server
901
			String sysMetaURLStr = "https://" + remoteserver + "?server="
902
					+ MetacatUtil.getLocalReplicationServerName()
903
					+ "&action=getsystemmetadata&guid=" + guid;
904
			sysMetaURLStr = MetacatUtil.replaceWhiteSpaceForURL(sysMetaURLStr);
905
			URL sysMetaUrl = new URL(sysMetaURLStr);
906
			logReplication.info("ReplicationHandler.handleSystemMetadata - Sending message: "
907
							+ sysMetaUrl.toString());
908
			String systemMetadataXML = ReplicationService.getURLContent(sysMetaUrl);
909

    
910
			logReplication.info("ReplicationHandler.handleSystemMetadata - guid in repl: " + guid);
911

    
912
			// process system metadata
913
			if (systemMetadataXML != null) {
914
				SystemMetadata sysMeta = TypeMarshaller.unmarshalTypeFromStream(SystemMetadata.class,
915
								new ByteArrayInputStream(systemMetadataXML
916
										.getBytes("UTF-8")));
917
				HazelcastService.getInstance().getSystemMetadataMap().put(sysMeta.getIdentifier(), sysMeta);
918
				// submit for indexing
919
                MetacatSolrIndex.getInstance().submit(sysMeta.getIdentifier(), sysMeta, null, true);
920
			}
921

    
922
			logReplication.info("ReplicationHandler.handleSystemMetadata - Successfully replicated system metadata for guid: "
923
							+ guid);
924

    
925
			String ip = getIpFromURL(sysMetaUrl);
926
			EventLog.getInstance().log(ip, null, ReplicationService.REPLICATIONUSER, guid, "systemMetadata");
927

    
928
		} catch (Exception e) {
929
			logMetacat.error("ReplicationHandler.handleSystemMetadata - "
930
					+ ReplicationService.METACAT_REPL_ERROR_MSG);
931
			logReplication
932
					.error("ReplicationHandler.handleSystemMetadata - Failed to write system metadata "
933
							+ guid + " into db because " + e.getMessage());
934
			throw new HandlerException(
935
					"ReplicationHandler.handleSystemMetadata - generic exception "
936
							+ "writing Replication: " + e.getMessage());
937
		}
938

    
939
	}
940

    
941
  /**
942
   * updates xml_catalog with entries from other servers.
943
   */
944
  private void updateCatalog()
945
  {
946
    logReplication.info("ReplicationHandler.updateCatalog - Start of updateCatalog");
947
    // ReplicationServer object in server list
948
    ReplicationServer replServer = null;
949
    PreparedStatement pstmt = null;
950
    String server = null;
951

    
952

    
953
    // Go through each ReplicationServer object in sererlist
954
    for (int j=0; j<serverList.size(); j++)
955
    {
956
      Vector<Vector<String>> remoteCatalog = new Vector<Vector<String>>();
957
      Vector<String> publicId = new Vector<String>();
958
      try
959
      {
960
        // Get ReplicationServer object from server list
961
        replServer = serverList.serverAt(j);
962
        // Get server name from the ReplicationServer object
963
        server = replServer.getServerName();
964
        // Try to get catalog
965
        URL u = new URL("https://" + server + "?server="+
966
        MetacatUtil.getLocalReplicationServerName()+"&action=getcatalog");
967
        logReplication.info("ReplicationHandler.updateCatalog - sending message " + u.toString());
968
        String catxml = ReplicationService.getURLContent(u);
969

    
970
        // Make sure there are not error, no empty string
971
        if (catxml.indexOf("error")!=-1 || catxml==null||catxml.equals(""))
972
        {
973
          throw new Exception("Couldn't get catalog list form server " +server);
974
        }
975
        logReplication.debug("ReplicationHandler.updateCatalog - catxml: " + catxml);
976
        CatalogMessageHandler cmh = new CatalogMessageHandler();
977
        XMLReader catparser = initParser(cmh);
978
        catparser.parse(new InputSource(new StringReader(catxml)));
979
        //parse the returned catalog xml and put it into a vector
980
        remoteCatalog = cmh.getCatalogVect();
981

    
982
        // Make sure remoteCatalog is not empty
983
        if (remoteCatalog.isEmpty())
984
        {
985
          throw new Exception("Couldn't get catalog list form server " +server);
986
        }
987

    
988
        String localcatxml = ReplicationService.getCatalogXML();
989

    
990
        // Make sure local catalog is no empty
991
        if (localcatxml==null||localcatxml.equals(""))
992
        {
993
          throw new Exception("Couldn't get catalog list form server " +server);
994
        }
995

    
996
        cmh = new CatalogMessageHandler();
997
        catparser = initParser(cmh);
998
        catparser.parse(new InputSource(new StringReader(localcatxml)));
999
        Vector<Vector<String>> localCatalog = cmh.getCatalogVect();
1000

    
1001
        //now we have the catalog from the remote server and this local server
1002
        //we now need to compare the two and merge the differences.
1003
        //the comparison is base on the public_id fields which is the 4th
1004
        //entry in each row vector.
1005
        publicId = new Vector<String>();
1006
        for(int i=0; i<localCatalog.size(); i++)
1007
        {
1008
          Vector<String> v = new Vector<String>(localCatalog.elementAt(i));
1009
          logReplication.info("ReplicationHandler.updateCatalog - v1: " + v.toString());
1010
          publicId.add(new String((String)v.elementAt(3)));
1011
        }
1012
      }//try
1013
      catch (Exception e)
1014
      {
1015
        logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1016
        logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1017
                                    server + " because " +e.getMessage());
1018
      }//catch
1019

    
1020
      for(int i=0; i<remoteCatalog.size(); i++)
1021
      {
1022
         // DConnection
1023
        DBConnection dbConn = null;
1024
        // DBConnection checkout serial number
1025
        int serialNumber = -1;
1026
        try
1027
        {
1028
            dbConn=DBConnectionPool.
1029
                  getDBConnection("ReplicationHandler.updateCatalog");
1030
            serialNumber=dbConn.getCheckOutSerialNumber();
1031
            Vector<String> v = remoteCatalog.elementAt(i);
1032
            //logMetacat.debug("v2: " + v.toString());
1033
            //logMetacat.debug("i: " + i);
1034
            //logMetacat.debug("remoteCatalog.size(): " + remoteCatalog.size());
1035
            //logMetacat.debug("publicID: " + publicId.toString());
1036
            logReplication.info
1037
                              ("ReplicationHandler.updateCatalog - v.elementAt(3): " + (String)v.elementAt(3));
1038
           /*if(!publicId.contains(v.elementAt(3)))
1039
           { //so we don't have this public id in our local table so we need to
1040
             //add it.
1041
        	   
1042
        	   // check where it is pointing first, before adding
1043
        	   String entryType = (String)v.elementAt(0);
1044
        	   if (entryType.equals(DocumentImpl.SCHEMA)) {
1045
	        	   String nameSpace = (String)v.elementAt(3);
1046
	        	   String schemaLocation = (String)v.elementAt(4);
1047
	        	   SchemaLocationResolver slr = new SchemaLocationResolver(nameSpace, schemaLocation);
1048
	        	   try {
1049
	        		   slr.resolveNameSpace();
1050
	        	   } catch (Exception e) {
1051
	        		   String msg = "Could not save remote schema to xml catalog. " + "nameSpace: " + nameSpace + " location: " + schemaLocation;
1052
	        		   logMetacat.error(msg, e);
1053
	        		   logReplication.error(msg, e);
1054
	        	   }
1055
	        	   // skip whatever else we were going to do
1056
	        	   continue;
1057
        	   }
1058
        	   
1059
             //logMetacat.debug("in if");
1060
             StringBuffer sql = new StringBuffer();
1061
             sql.append("insert into xml_catalog (entry_type, source_doctype, ");
1062
             sql.append("target_doctype, public_id, system_id) values (?,?,?,");
1063
             sql.append("?,?)");
1064
             //logMetacat.debug("sql: " + sql.toString());
1065
             pstmt = dbConn.prepareStatement(sql.toString());
1066
             pstmt.setString(1, (String)v.elementAt(0));
1067
             pstmt.setString(2, (String)v.elementAt(1));
1068
             pstmt.setString(3, (String)v.elementAt(2));
1069
             pstmt.setString(4, (String)v.elementAt(3));
1070
             pstmt.setString(5, (String)v.elementAt(4));
1071
             pstmt.execute();
1072
             pstmt.close();
1073
             logReplication.info("ReplicationHandler.updateCatalog - Success fully to insert new publicid "+
1074
                               (String)v.elementAt(3) + " from server"+server);
1075
           }*/
1076
        }
1077
        catch(Exception e)
1078
        {
1079
           logMetacat.error("ReplicationHandler.updateCatalog - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1080
           logReplication.error("ReplicationHandler.updateCatalog - Failed to update catalog for server "+
1081
                                    server + " because " +e.getMessage());
1082
        }//catch
1083
        finally
1084
        {
1085
           DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1086
        }//finally
1087
      }//for remote catalog
1088
    }//for server list
1089
    logReplication.info("End of updateCatalog");
1090
  }
1091

    
1092
  /**
1093
   * Method that returns true if docid has already been "deleted" from metacat.
1094
   * This method really implements a truth table for deleted documents
1095
   * The table is (a docid in one of the tables is represented by the X):
1096
   * xml_docs      xml_revs      deleted?
1097
   * ------------------------------------
1098
   *   X             X             FALSE
1099
   *   X             _             FALSE
1100
   *   _             X             TRUE
1101
   *   _             _             TRUE
1102
   */
1103
  private static boolean alreadyDeleted(String docid) throws HandlerException
1104
  {
1105
    DBConnection dbConn = null;
1106
    int serialNumber = -1;
1107
    PreparedStatement pstmt = null;
1108
    try
1109
    {
1110
      dbConn=DBConnectionPool.
1111
                  getDBConnection("ReplicationHandler.alreadyDeleted");
1112
      serialNumber=dbConn.getCheckOutSerialNumber();
1113
      boolean xml_docs = false;
1114
      boolean xml_revs = false;
1115

    
1116
      StringBuffer sb = new StringBuffer();
1117
      sb.append("select docid from xml_revisions where docid like ? ");
1118
      pstmt = dbConn.prepareStatement(sb.toString());
1119
      pstmt.setString(1, docid);
1120
      pstmt.execute();
1121
      ResultSet rs = pstmt.getResultSet();
1122
      boolean tablehasrows = rs.next();
1123
      if(tablehasrows)
1124
      {
1125
        xml_revs = true;
1126
      }
1127

    
1128
      sb = new StringBuffer();
1129
      sb.append("select docid from xml_documents where docid like '");
1130
      sb.append(docid).append("'");
1131
      pstmt.close();
1132
      pstmt = dbConn.prepareStatement(sb.toString());
1133
      //increase usage count
1134
      dbConn.increaseUsageCount(1);
1135
      pstmt.execute();
1136
      rs = pstmt.getResultSet();
1137
      tablehasrows = rs.next();
1138
      pstmt.close();
1139
      if(tablehasrows)
1140
      {
1141
        xml_docs = true;
1142
      }
1143

    
1144
      if(xml_docs && xml_revs)
1145
      {
1146
        return false;
1147
      }
1148
      else if(xml_docs && !xml_revs)
1149
      {
1150
        return false;
1151
      }
1152
      else if(!xml_docs && xml_revs)
1153
      {
1154
        return true;
1155
      }
1156
      else if(!xml_docs && !xml_revs)
1157
      {
1158
        return true;
1159
      }
1160
    }
1161
    catch(Exception e)
1162
    {
1163
      logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1164
      logReplication.error("ReplicationHandler.alreadyDeleted - general error in alreadyDeleted: " +
1165
                          e.getMessage());
1166
      throw new HandlerException("ReplicationHandler.alreadyDeleted - general error: " 
1167
    		  + e.getMessage());
1168
    }
1169
    finally
1170
    {
1171
      try
1172
      {
1173
        pstmt.close();
1174
      }//try
1175
      catch (SQLException ee)
1176
      {
1177
    	logMetacat.error("ReplicationHandler.alreadyDeleted - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1178
        logReplication.error("ReplicationHandler.alreadyDeleted - Error in replicationHandler.alreadyDeleted "+
1179
                          "to close pstmt: "+ee.getMessage());
1180
        throw new HandlerException("ReplicationHandler.alreadyDeleted - SQL error when closing prepared statement: " 
1181
      		  + ee.getMessage());
1182
      }//catch
1183
      finally
1184
      {
1185
        DBConnectionPool.returnDBConnection(dbConn, serialNumber);
1186
      }//finally
1187
    }//finally
1188
    return false;
1189
  }
1190

    
1191

    
1192
  /**
1193
   * Method to initialize the message parser
1194
   */
1195
  public static XMLReader initParser(DefaultHandler dh)
1196
          throws HandlerException
1197
  {
1198
    XMLReader parser = null;
1199

    
1200
    try {
1201
      ContentHandler chandler = dh;
1202

    
1203
      // Get an instance of the parser
1204
      String parserName = PropertyService.getProperty("xml.saxparser");
1205
      parser = XMLReaderFactory.createXMLReader(parserName);
1206

    
1207
      // Turn off validation
1208
      parser.setFeature("http://xml.org/sax/features/validation", false);
1209

    
1210
      parser.setContentHandler((ContentHandler)chandler);
1211
      parser.setErrorHandler((ErrorHandler)chandler);
1212

    
1213
    } catch (SAXException se) {
1214
      throw new HandlerException("ReplicationHandler.initParser - Sax error when " 
1215
    		  + " initializing parser: " + se.getMessage());
1216
    } catch (PropertyNotFoundException pnfe) {
1217
        throw new HandlerException("ReplicationHandler.initParser - Property error when " 
1218
      		  + " getting parser name: " + pnfe.getMessage());
1219
    } 
1220

    
1221
    return parser;
1222
  }
1223

    
1224
  /**
1225
	 * This method will combine given time string(in short format) to current
1226
	 * date. If the given time (e.g 10:00 AM) passed the current time (e.g 2:00
1227
	 * PM Aug 21, 2005), then the time will set to second day, 10:00 AM Aug 22,
1228
	 * 2005. If the given time (e.g 10:00 AM) haven't passed the current time
1229
	 * (e.g 8:00 AM Aug 21, 2005) The time will set to be 10:00 AM Aug 21, 2005.
1230
	 * 
1231
	 * @param givenTime
1232
	 *            the format should be "10:00 AM " or "2:00 PM"
1233
	 * @return
1234
	 * @throws Exception
1235
	 */
1236
	public static Date combinateCurrentDateAndGivenTime(String givenTime) throws HandlerException
1237
  {
1238
	  try {
1239
     Date givenDate = parseTime(givenTime);
1240
     Date newDate = null;
1241
     Date now = new Date();
1242
     String currentTimeString = getTimeString(now);
1243
     Date currentTime = parseTime(currentTimeString); 
1244
     if ( currentTime.getTime() >= givenDate.getTime())
1245
     {
1246
        logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today already pass the given time, we should set it as tomorrow");
1247
        String dateAndTime = getDateString(now) + " " + givenTime;
1248
        Date combinationDate = parseDateTime(dateAndTime);
1249
        // new date should plus 24 hours to make is the second day
1250
        newDate = new Date(combinationDate.getTime()+24*3600*1000);
1251
     }
1252
     else
1253
     {
1254
         logReplication.info("ReplicationHandler.combinateCurrentDateAndGivenTime - Today haven't pass the given time, we should it as today");
1255
         String dateAndTime = getDateString(now) + " " + givenTime;
1256
         newDate = parseDateTime(dateAndTime);
1257
     }
1258
     logReplication.warn("ReplicationHandler.combinateCurrentDateAndGivenTime - final setting time is "+ newDate.toString());
1259
     return newDate;
1260
	  } catch (ParseException pe) {
1261
		  throw new HandlerException("ReplicationHandler.combinateCurrentDateAndGivenTime - "
1262
				  + "parsing error: "  + pe.getMessage());
1263
	  }
1264
  }
1265

    
1266
  /*
1267
	 * parse a given string to Time in short format. For example, given time is
1268
	 * 10:00 AM, the date will be return as Jan 1 1970, 10:00 AM
1269
	 */
1270
  private static Date parseTime(String timeString) throws ParseException
1271
  {
1272
    DateFormat format = DateFormat.getTimeInstance(DateFormat.SHORT);
1273
    Date time = format.parse(timeString); 
1274
    logReplication.info("ReplicationHandler.parseTime - Date string is after parse a time string "
1275
                              +time.toString());
1276
    return time;
1277

    
1278
  }
1279
  
1280
  /*
1281
   * Parse a given string to date and time. Date format is long and time
1282
   * format is short.
1283
   */
1284
  private static Date parseDateTime(String timeString) throws ParseException
1285
  {
1286
    DateFormat format = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.SHORT);
1287
    Date time = format.parse(timeString);
1288
    logReplication.info("ReplicationHandler.parseDateTime - Date string is after parse a time string "+
1289
                             time.toString());
1290
    return time;
1291
  }
1292
  
1293
  /*
1294
   * Get a date string from a Date object. The date format will be long
1295
   */
1296
  private static String getDateString(Date now)
1297
  {
1298
     DateFormat df = DateFormat.getDateInstance(DateFormat.LONG);
1299
     String s = df.format(now);
1300
     logReplication.info("ReplicationHandler.getDateString - Today is " + s);
1301
     return s;
1302
  }
1303
  
1304
  /*
1305
   * Get a time string from a Date object, the time format will be short
1306
   */
1307
  private static String getTimeString(Date now)
1308
  {
1309
     DateFormat df = DateFormat.getTimeInstance(DateFormat.SHORT);
1310
     String s = df.format(now);
1311
     logReplication.info("ReplicationHandler.getTimeString - Time is " + s);
1312
     return s;
1313
  }
1314
  
1315
  
1316
  /*
1317
	 * This method will go through the docid list both in xml_Documents table
1318
	 * and in xml_revisions table @author tao
1319
	 */
1320
	private void handleDocList(Vector<Vector<String>> docList, String tableName) {
1321
		boolean dataFile = false;
1322
		for (int j = 0; j < docList.size(); j++) {
1323
			// initial dataFile is false
1324
			dataFile = false;
1325
			// w is information for one document, information contain
1326
			// docid, rev, server or datafile.
1327
			Vector<String> w = new Vector<String>(docList.elementAt(j));
1328
			// Check if the vector w contain "datafile"
1329
			// If it has, this document is data file
1330
			try {
1331
				if (w.contains((String) PropertyService.getProperty("replication.datafileflag"))) {
1332
					dataFile = true;
1333
				}
1334
			} catch (PropertyNotFoundException pnfe) {
1335
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1336
				logReplication.error("ReplicationHandler.handleDocList - Could not retrieve data file flag property.  "
1337
						+ "Leaving as false: " + pnfe.getMessage());
1338
			}
1339
			// logMetacat.debug("w: " + w.toString());
1340
			// Get docid
1341
			String docid = (String) w.elementAt(0);
1342
			logReplication.info("docid: " + docid);
1343
			// Get revision number
1344
			int rev = Integer.parseInt((String) w.elementAt(1));
1345
			logReplication.info("rev: " + rev);
1346
			// Get remote server name (it is may not be doc home server because
1347
			// the new hub feature
1348
			String remoteServer = (String) w.elementAt(2);
1349
			remoteServer = remoteServer.trim();
1350

    
1351
			try {
1352
				if (tableName.equals(DocumentImpl.DOCUMENTTABLE)) {
1353
					handleDocInXMLDocuments(docid, rev, remoteServer, dataFile);
1354
				} else if (tableName.equals(DocumentImpl.REVISIONTABLE)) {
1355
					handleDocInXMLRevisions(docid, rev, remoteServer, dataFile);
1356
				} else {
1357
					continue;
1358
				}
1359

    
1360
			} catch (Exception e) {
1361
				logMetacat.error("ReplicationHandler.handleDocList - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1362
				logReplication.error("ReplicationHandler.handleDocList - error to handle update doc in " + tableName
1363
						+ " in time replication" + e.getMessage(), e);
1364
				continue;
1365
			}
1366
			
1367
	        if (_xmlDocQueryCount > 0 && (_xmlDocQueryCount % 100) == 0) {
1368
	        	logMetacat.debug("ReplicationHandler.update - xml_doc query count: " + _xmlDocQueryCount + 
1369
	        			", xml_doc avg query time: " + (_xmlDocQueryTime / _xmlDocQueryCount));
1370
	        }
1371
	        
1372
	        if (_xmlRevQueryCount > 0 && (_xmlRevQueryCount % 100) == 0) {
1373
	        	logMetacat.debug("ReplicationHandler.update - xml_rev query count: " + _xmlRevQueryCount + 
1374
	        			", xml_rev avg query time: " + (_xmlRevQueryTime / _xmlRevQueryCount));
1375
	        }
1376

    
1377
		}// for update docs
1378

    
1379
	}
1380
   
1381
   /*
1382
	 * This method will handle doc in xml_documents table.
1383
	 */
1384
   private void handleDocInXMLDocuments(String docid, int rev, String remoteServer, boolean dataFile) 
1385
                                        throws HandlerException
1386
   {
1387
       // compare the update rev and local rev to see what need happen
1388
       int localrev = -1;
1389
       String action = null;
1390
       boolean flag = false;
1391
       try
1392
       {
1393
    	 long docQueryStartTime = System.currentTimeMillis();
1394
         localrev = DBUtil.getLatestRevisionInDocumentTable(docid);
1395
         long docQueryEndTime = System.currentTimeMillis();
1396
         _xmlDocQueryTime += (docQueryEndTime - docQueryStartTime);
1397
         _xmlDocQueryCount++;
1398
       }
1399
       catch (SQLException e)
1400
       {
1401
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1402
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1403
                                " be found because " + e.getMessage());
1404
         logReplication.error("ReplicationHandler.handleDocInXMLDocuments - " + DOCERRORNUMBER+"Docid "+ docid + " could not be "+
1405
                 "written because error happend to find it's local revision");
1406
         DOCERRORNUMBER++;
1407
         throw new HandlerException ("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " could not "+
1408
                 " be found: " + e.getMessage());
1409
       }
1410
       logReplication.info("ReplicationHandler.handleDocInXMLDocuments - Local rev for docid "+ docid + " is "+
1411
                               localrev);
1412

    
1413
       //check the revs for an update because this document is in the
1414
       //local DB, it might be out of date.
1415
       if (localrev == -1)
1416
       {
1417
          // check if the revision is in the revision table
1418
    	   Vector<Integer> localRevVector = null;
1419
    	 try {
1420
        	 long revQueryStartTime = System.currentTimeMillis();
1421
    		 localRevVector = DBUtil.getRevListFromRevisionTable(docid);
1422
             long revQueryEndTime = System.currentTimeMillis();
1423
             _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1424
             _xmlRevQueryCount++;
1425
    	 } catch (SQLException sqle) {
1426
    		 throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - SQL error " 
1427
    				 + " when getting rev list for docid: " + docid + " : " + sqle.getMessage());
1428
    	 }
1429
         if (localRevVector != null && localRevVector.contains(new Integer(rev)))
1430
         {
1431
             // this version was deleted, so don't need replicate
1432
             flag = false;
1433
         }
1434
         else
1435
         {
1436
           //insert this document as new because it is not in the local DB
1437
           action = "INSERT";
1438
           flag = true;
1439
         }
1440
       }
1441
       else
1442
       {
1443
         if(localrev == rev)
1444
         {
1445
           // Local meatacat has the same rev to remote host, don't need
1446
           // update and flag set false
1447
           flag = false;
1448
         }
1449
         else if(localrev < rev)
1450
         {
1451
           //this document needs to be updated so send an read request
1452
           action = "UPDATE";
1453
           flag = true;
1454
         }
1455
       }
1456
       
1457
       String accNumber = null;
1458
       try {
1459
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1460
       } catch (PropertyNotFoundException pnfe) {
1461
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLDocuments - error getting " 
1462
    			   + "account number separator : " + pnfe.getMessage());
1463
       }
1464
       // this is non-data file
1465
       if(flag && !dataFile)
1466
       {
1467
         try
1468
         {
1469
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1470
         }
1471
         catch(HandlerException he)
1472
         {
1473
           // skip this document
1474
           throw he;
1475
         }
1476
       }//if for non-data file
1477

    
1478
        // this is for data file
1479
       if(flag && dataFile)
1480
       {
1481
         try
1482
         {
1483
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.DOCUMENTTABLE);
1484
         }
1485
         catch(HandlerException he)
1486
         {
1487
           // skip this data file
1488
           throw he;
1489
         }
1490

    
1491
       }//for data file
1492
   }
1493
   
1494
   /*
1495
    * This method will handle doc in xml_documents table.
1496
    */
1497
   private void handleDocInXMLRevisions(String docid, int rev, String remoteServer, boolean dataFile) 
1498
                                        throws HandlerException
1499
   {
1500
       // compare the update rev and local rev to see what need happen
1501
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - In handle repliation revsion table");
1502
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - the docid is "+ docid);
1503
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - The rev is "+rev);
1504
       Vector<Integer> localrev = null;
1505
       String action = "INSERT";
1506
       boolean flag = false;
1507
       try
1508
       {
1509
      	 long revQueryStartTime = System.currentTimeMillis();
1510
         localrev = DBUtil.getRevListFromRevisionTable(docid);
1511
         long revQueryEndTime = System.currentTimeMillis();
1512
         _xmlRevQueryTime += (revQueryEndTime - revQueryStartTime);
1513
         _xmlRevQueryCount++;
1514
       }
1515
       catch (SQLException sqle)
1516
       {
1517
    	 logMetacat.error("ReplicationHandler.handleDocInXMLDocuments - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1518
         logReplication.error("ReplicationHandler.handleDocInXMLRevisions - Local rev for docid "+ docid + " could not "+
1519
                                " be found because " + sqle.getMessage());
1520
         REVERRORNUMBER++;
1521
         throw new HandlerException ("ReplicationHandler.handleDocInXMLRevisions - SQL exception getting rev list: " 
1522
        		 + sqle.getMessage());
1523
       }
1524
       logReplication.info("ReplicationHandler.handleDocInXMLRevisions - rev list in xml_revision table for docid "+ docid + " is "+
1525
                               localrev.toString());
1526
       
1527
       // if the rev is not in the xml_revision, we need insert it
1528
       if (!localrev.contains(new Integer(rev)))
1529
       {
1530
           flag = true;    
1531
       }
1532
     
1533
       String accNumber = null;
1534
       try {
1535
    	   accNumber = docid + PropertyService.getProperty("document.accNumSeparator") + rev;
1536
       } catch (PropertyNotFoundException pnfe) {
1537
    	   throw new HandlerException("ReplicationHandler.handleDocInXMLRevisions - error getting " 
1538
    			   + "account number separator : " + pnfe.getMessage());
1539
       }
1540
       // this is non-data file
1541
       if(flag && !dataFile)
1542
       {
1543
         try
1544
         {
1545
           
1546
           handleSingleXMLDocument(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1547
         }
1548
         catch(HandlerException he)
1549
         {
1550
           // skip this document
1551
           throw he;
1552
         }
1553
       }//if for non-data file
1554

    
1555
        // this is for data file
1556
       if(flag && dataFile)
1557
       {
1558
         try
1559
         {
1560
           handleSingleDataFile(remoteServer, action, accNumber, DocumentImpl.REVISIONTABLE);
1561
         }
1562
         catch(HandlerException he)
1563
         {
1564
           // skip this data file
1565
           throw he;
1566
         }
1567

    
1568
       }//for data file
1569
   }
1570
   
1571
   /*
1572
    * Return a ip address for given url
1573
    */
1574
   private String getIpFromURL(URL url)
1575
   {
1576
	   String ip = null;
1577
	   try
1578
	   {
1579
	      InetAddress address = InetAddress.getByName(url.getHost());
1580
	      ip = address.getHostAddress();
1581
	   }
1582
	   catch(UnknownHostException e)
1583
	   {
1584
		   logMetacat.error("ReplicationHandler.getIpFromURL - " + ReplicationService.METACAT_REPL_ERROR_MSG);                         
1585
		   logReplication.error("ReplicationHandler.getIpFromURL - Error in get ip address for host: "
1586
                   +e.getMessage());
1587
	   }
1588

    
1589
	   return ip;
1590
   }
1591
  
1592
}
1593

    
(3-3/7)