Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-01 15:27:58 -0800 (Fri, 01 Dec 2000) $'
11
 * '$Revision: 577 $'
12
 */
13

    
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24

    
25
import org.xml.sax.*;
26

    
27
public class MetacatReplication extends HttpServlet implements Runnable
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  private static MetaCatUtil util = new MetaCatUtil();
32
  private Vector fileLocks = new Vector();
33
  private Thread lockThread = null;
34
  
35
  /**
36
   * Initialize the servlet by creating appropriate database connections
37
   */
38
  public void init(ServletConfig config) throws ServletException 
39
  {
40
    //initialize db connections to handle any update requests
41
    MetaCatUtil util = new MetaCatUtil();
42
    deltaT = util.getOption("deltaT");
43
    //break off a thread to do the delta-T check
44
    replicationDaemon = new Timer(true);
45
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
46
    //                                      new Integer(deltaT).intValue() * 1000);
47
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
48
    //                   + " seconds");
49
    
50
  }
51
  
52
  public void destroy() 
53
  {
54
    replicationDaemon.cancel();
55
    System.out.println("Replication daemon cancelled.");
56
  }
57
  
58
  public void doGet (HttpServletRequest request, HttpServletResponse response)
59
                     throws ServletException, IOException 
60
  {
61
    // Process the data and send back the response
62
    handleGetOrPost(request, response);
63
  }
64

    
65
  public void doPost(HttpServletRequest request, HttpServletResponse response)
66
                     throws ServletException, IOException 
67
  {
68
    // Process the data and send back the response
69
    handleGetOrPost(request, response);
70
  }
71
  
72
  private void handleGetOrPost(HttpServletRequest request, 
73
                               HttpServletResponse response) 
74
                               throws ServletException, IOException 
75
  {
76
    PrintWriter out = response.getWriter();
77
    Hashtable params = new Hashtable();
78
    Enumeration paramlist = request.getParameterNames();
79
    
80
    while (paramlist.hasMoreElements()) 
81
    {
82
      String name = (String)paramlist.nextElement();
83
      String[] value = request.getParameterValues(name);
84
      params.put(name, value);  
85
    }
86
    
87
    if(params.containsKey("action"))
88
    {
89
      if(((String[])params.get("action"))[0].equals("stop"))
90
      { //stop the replication server
91
        replicationDaemon.cancel();
92
        out.println("Replication Handler Stopped");
93
        System.out.println("Replication Handler Stopped");
94
      }
95
      else if(((String[])params.get("action"))[0].equals("start"))
96
      { //start the replication server
97
        int rate;
98
        if(params.containsKey("rate"))
99
        {
100
          rate = new Integer(
101
                 new String(((String[])params.get("rate"))[0])).intValue();
102
          if(rate < 30)
103
          {
104
            out.println("Replication deltaT rate cannot be less than 30!");
105
            rate = 1000;
106
          }
107
        }
108
        else
109
        {
110
          rate = 1000;
111
        }
112
        
113
        out.println("New rate is: " + rate + " seconds.");
114
        replicationDaemon.cancel();
115
        replicationDaemon = new Timer(true);
116
        replicationDaemon.scheduleAtFixedRate(new ReplicationHandler(out), 0, 
117
                                              rate * 1000);
118
        out.println("Replication Handler Started");
119
        System.out.println("Replication Handler Started");
120
      }
121
      else if(((String[])params.get("action"))[0].equals("forcereplicate"))
122
      {
123
        handleForceReplicateRequest(out, params, response);
124
      }
125
      else if(((String[])params.get("action"))[0].equals("update"))
126
      { //request an update list from the server
127
        if(params.contains("servercheckcode"))
128
        { //the servercheckcode allows this server to check for updated 
129
          //file from other servers as well as just updated files from 
130
          //this server.
131
          System.out.println("metacatreplication: servercheckcode: " + 
132
                               ((int[])params.get("servercheckcode"))[0]);
133
          handleUpdateRequest(out, params, response, 
134
                              ((int[])params.get("servercheckcode"))[0]);
135
        }
136
        else
137
        {
138
          handleUpdateRequest2(out, params, response);
139
        }
140
      }
141
      else if(((String[])params.get("action"))[0].equals("read"))
142
      { //request a specific document from the server
143
        //note that this could be replaced by a call to metacatServlet
144
        //handleGetDocumentAction().
145
        handleGetDocumentRequest(out, params, response);
146
      }
147
      else if(((String[])params.get("action"))[0].equals("getlock"))
148
      {
149
        handleGetLockRequest(out, params, response);
150
      }
151
      else if(((String[])params.get("action"))[0].equals("getdocumentinfo"))
152
      {
153
        handleGetDocumentInfoRequest(out, params, response);
154
      }
155
      else if(((String[])params.get("action"))[0].equals("gettime"))
156
      {
157
        handleGetTimeRequest(out, params, response);
158
      }
159
    }
160
  }
161
  
162
  private void handleForceReplicateRequest(PrintWriter out, Hashtable params,
163
                                           HttpServletResponse response)
164
  {
165
    System.out.println("in handleforcereplicaterequest");
166
    String server = ((String[])params.get("server"))[0];
167
    String docid = ((String[])params.get("docid"))[0];
168
    String dbaction = "UPDATE";
169
    boolean override = false;
170
    int serverCode = 1;
171
    
172
    try
173
    {
174
      if(params.containsKey("dbaction"))
175
      {
176
        dbaction = ((String[])params.get("dbaction"))[0];
177
        serverCode = MetacatReplication.getServerCode(server);
178
        override = true;
179
      }
180
      System.out.println("action in forcereplicate is: " + dbaction);
181
      System.out.println("serverCode in forcereplicate is: " + serverCode);
182
      
183
      int serverCheckCode = MetacatReplication.getServerCode(server);
184
      URL u = new URL("http://" + server + "?action=read&docid=" + docid);
185
      System.out.println("sending message: " + u.toString());
186
      String xmldoc = MetacatReplication.getURLContent(u);
187
      URL docinfourl = new URL("http://" + server + 
188
                               "?action=getdocumentinfo&docid=" +
189
                               docid);
190
      System.out.println("sending message: " + docinfourl.toString());
191
      String docInfoStr = MetacatReplication.getURLContent(docinfourl);
192
      DocInfoHandler dih = new DocInfoHandler();
193
      XMLReader docinfoParser = ReplicationHandler.initParser(dih);
194
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
195
      Hashtable docinfoHash = dih.getDocInfo();
196
      String user = (String)docinfoHash.get("user_owner");
197
      String group = new String(user);
198
      Connection conn = util.openDBConnection();
199
      DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid, 
200
                         user, group, serverCode, override);
201
      conn.close();
202
    }
203
    catch(Exception e)
204
    {
205
      System.out.println("error in metacatReplication.handleForceReplicate" +
206
                         "Request: " + e.getMessage());
207
    }
208
  }
209
  
210
  /**
211
   * Grants or denies a lock to a requesting host.
212
   * The servlet parameters of interrest are:
213
   * docid: the docid of the file the lock is being requested for
214
   * currentdate: the timestamp of the document on the remote server
215
   * 
216
   */
217
  private void handleGetLockRequest(PrintWriter out, Hashtable params,
218
                                    HttpServletResponse response)
219
  {
220
    System.out.println("in handlegetlockrequest");
221
    java.util.Date remoteDate = new java.util.Date();
222
    java.util.Date localDate = new java.util.Date();
223
    try
224
    {
225
      Connection conn = util.openDBConnection();
226
      String docid = ((String[])params.get("docid"))[0];
227
      String remoteDateStr = ((String[])params.get("updatedate"))[0];
228
      DocumentImpl requestDoc = new DocumentImpl(conn, docid);
229

    
230
      String localDateStr = requestDoc.getUpdateDate();
231
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
232
      ParsePosition pos = new ParsePosition(0);
233
      remoteDate = formatter.parse(remoteDateStr, pos);
234
      pos = new ParsePosition(0);
235
      localDate = formatter.parse(localDateStr, pos);
236

    
237
      if(remoteDate.compareTo(localDate) >= 0)
238
      {
239
        if(!fileLocks.contains(docid))
240
        { //grant the lock
241
          fileLocks.add(0, docid); //insert at the beginning of the queue Vector
242
          //send a message back to the the remote host authorizing the insert
243
          out.println("<lockgranted><docid>" +docid+ "</docid></lockgranted>");
244
          lockThread = new Thread(this);
245
          lockThread.setPriority(Thread.MIN_PRIORITY);
246
          lockThread.start();
247
        }
248
        else
249
        { //deny the lock
250
          out.println("<filelocked><docid>" + docid + "</docid></filelocked>");
251
        }
252
      }
253
      else
254
      {//deny the lock.
255
        out.println("<outdatedfile><docid>" + docid + "</docid></filelocked>");
256
      }
257
      conn.close();
258
    }
259
    catch(Exception e)
260
    {
261
      System.out.println("error requesting file lock: " + e.getMessage());
262
      e.printStackTrace(System.out);
263
    }
264
  }
265
  
266
  /**
267
   * Sends all of the xml_documents information encoded in xml to a requestor
268
   */
269
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
270
                                        HttpServletResponse response)
271
  {
272
    String docid = ((String[])(params.get("docid")))[0];
273
    StringBuffer sb = new StringBuffer();
274
    try
275
    {
276
      Connection conn = util.openDBConnection();
277
      DocumentImpl doc = new DocumentImpl(conn, docid);
278
      sb.append("<documentinfo><docid>").append(docid);
279
      sb.append("</docid><docname>").append(doc.getDocname());
280
      sb.append("</docname><doctype>").append(doc.getDoctype());
281
      sb.append("</doctype><doctitle>").append(doc.getDocTitle());
282
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
283
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
284
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
285
      sb.append("</public_access></documentinfo>");
286
      response.setContentType("text/xml");
287
      out.println(sb.toString());
288
      conn.close();
289
    }
290
    catch (Exception e)
291
    {
292
      System.out.println("error in metacatReplication.handlegetdocumentinforequest: " + 
293
      e.getMessage());
294
    }
295
    
296
  }
297
  
298
  /**
299
   * Sends a document to a remote host
300
   */
301
  private void handleGetDocumentRequest(PrintWriter out, Hashtable params, 
302
                                        HttpServletResponse response)
303
  {
304
    try
305
    {
306
      String docid = ((String[])(params.get("docid")))[0];
307
      System.out.println("incoming get request for document: " +
308
                         docid);
309
      Connection conn = util.openDBConnection();
310
      DocumentImpl di = new DocumentImpl(conn, docid);
311
      response.setContentType("text/xml");
312
      out.print(di.toString());
313
      conn.close();
314
    }
315
    catch(Exception e)
316
    {
317
      System.out.println("error getting document: " + e.getMessage());
318
    }
319
    
320
  }
321
  
322
  /**
323
   * Does default handling for update requests
324
   */
325
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
326
                                   HttpServletResponse response)
327
  {
328
    handleUpdateRequest(out, params, response, 1);
329
  }
330

    
331
  /**
332
   * Initiates an update of all server in the xml_replication table.
333
   * the remote host sends an update date.  The local host (this method)
334
   * queries the db for any document that was updated after the update date
335
   * and returns a list of those documents to the remote host.  It also 
336
   * sends back a list of the files that were locally deleted.
337
   
338
   * serverCheckCode allows the requestor to request documents from this 
339
   * server that reside on a different server.  Normally, this server
340
   * should only replicate files that it owns but in the case of updating 
341
   * a file that does not belong to this server, it is needed.  See 
342
   * DocumentImpl.write().
343
   */
344
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
345
                                   HttpServletResponse response, 
346
                                   int serverCheckCode)
347
  { 
348
    System.out.println("incoming update request for dt/time " + 
349
                       ((String[])params.get("date"))[0] +
350
                       " from external metacat");
351
    response.setContentType("text/xml");
352
    StringBuffer returnXML = new StringBuffer();
353
    returnXML.append("<?xml version=\"1.0\"?><replication>");
354
    
355
    StringBuffer sql = new StringBuffer();
356
    String updateStr = ((String[])params.get("date"))[0];
357
    updateStr = updateStr.replace('+', ' ');
358
    //pseudo algorithm:
359
    ///////////////////////////////////////////////////////////////////////
360
    //get the date/time from the requestor, query the db for any documents
361
    //that have an update date later than the requested date and send
362
    //those docids back to the requestor.  If there are no newer documents
363
    //then send back a null message.
364
    ///////////////////////////////////////////////////////////////////////
365
    
366
    //Timestamp update = Timestamp.valueOf(updateStr);
367
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
368
    java.util.Date update = new java.util.Date();
369
    ParsePosition pos = new ParsePosition(0);
370
    update = formatter.parse(updateStr, pos);
371
    String dateString = formatter.format(update);
372
    sql.append("select docid, date_updated, server_location from ");
373
    sql.append("xml_documents where date_updated > ");
374
    sql.append("to_date('").append(dateString);
375
    sql.append("','YY-MM-DD HH24:MI:SS')");
376
    //System.out.println("sql: " + sql.toString());
377
    
378
    //get any recently deleted documents
379
    StringBuffer delsql = new StringBuffer();
380
    delsql.append("select docid, date_updated, server_location from ");
381
    delsql.append("xml_revisions where docid not in (select docid from ");
382
    delsql.append("xml_documents) and date_updated > to_date('");
383
    delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
384

    
385
    try
386
    {
387
      Connection conn = util.openDBConnection();
388
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
389
      pstmt.execute();
390
      ResultSet rs = pstmt.getResultSet();
391
      boolean tablehasrows = rs.next();
392
      
393
      //if a '1' should not represent localhost, add code here to query
394
      //xml_replication for the proper serverid number
395
      
396
      returnXML.append("<server>").append(util.getOption("server"));
397
      returnXML.append(util.getOption("replicationpath"));
398
      returnXML.append("</server><updates>");
399
      while(tablehasrows)
400
      {
401
        String docid = rs.getString(1);
402
        String dateUpdated = rs.getString(2);
403
        int serverCode = rs.getInt(3);
404
        if(serverCode == serverCheckCode)
405
        { //check that this document is from this server.
406
          //servers only replicate their own documents!
407
          returnXML.append("<updatedDocument><docid>").append(docid);
408
          returnXML.append("</docid><date_updated>").append(dateUpdated);
409
          returnXML.append("</date_updated></updatedDocument>");
410
        }
411
        tablehasrows = rs.next();
412
      }
413
      
414
      pstmt = conn.prepareStatement(delsql.toString());
415
      pstmt.execute();
416
      rs = pstmt.getResultSet();
417
      tablehasrows = rs.next();
418
      while(tablehasrows)
419
      { //handle the deleted documents
420
        String docid = rs.getString(1);
421
        String dateUpdated = rs.getString(2);
422
        int serverCode = rs.getInt(3);
423
        if(serverCode == 1)
424
        {
425
          returnXML.append("<deletedDocument><docid>").append(docid);
426
          returnXML.append("</docid><date_updated>").append(dateUpdated);
427
          returnXML.append("</date_updated></deletedDocument>");
428
        }
429
        tablehasrows = rs.next();
430
      }
431
      
432
      returnXML.append("</updates></replication>");
433
      conn.close();
434
      //System.out.println(returnXML.toString());
435
      out.print(returnXML.toString());
436
    }
437
    catch(Exception e)
438
    {
439
      System.out.println("Exception in metacatReplication: " + e.getMessage());
440
    }
441
  }
442
  
443
  /**
444
   * Sends an update list based on rev numbers instead of dates.
445
   */
446
  private void handleUpdateRequest2(PrintWriter out, Hashtable params, 
447
                                    HttpServletResponse response)
448
  {
449
    System.out.println("in handleUpdateRequest2");
450
    try
451
    {
452
      System.out.println("received update request");
453
      StringBuffer docsql = new StringBuffer();
454
      StringBuffer doclist = new StringBuffer();
455
      
456
      //get all docs that reside on this server
457
      doclist.append("<?xml version=\"1.0\"?><replication>");
458
      doclist.append("<server>").append(util.getOption("server"));
459
      doclist.append(util.getOption("replicationpath"));
460
      doclist.append("</server><updates>");
461
      
462
      docsql.append("select docid, rev from xml_documents where "); 
463
      docsql.append("server_location = 1");
464
      
465
      //get any deleted documents
466
      StringBuffer delsql = new StringBuffer();
467
      delsql.append("select distinct docid from ");
468
      delsql.append("xml_revisions where docid not in (select docid from ");
469
      delsql.append("xml_documents) and server_location = 1");
470
      
471
      Connection conn = util.openDBConnection();
472
      PreparedStatement pstmt = conn.prepareStatement(docsql.toString());
473
      pstmt.execute();
474
      ResultSet rs = pstmt.getResultSet();
475
      boolean tablehasrows = rs.next();
476
      while(tablehasrows)
477
      {
478
        doclist.append("<updatedDocument>");
479
        doclist.append("<docid>").append(rs.getString(1));
480
        doclist.append("</docid><rev>").append(rs.getInt(2));
481
        doclist.append("</rev>");
482
        doclist.append("</updatedDocument>");
483
        tablehasrows = rs.next();
484
      }
485
      
486
      pstmt = conn.prepareStatement(delsql.toString());
487
      pstmt.execute();
488
      rs = pstmt.getResultSet();
489
      tablehasrows = rs.next();
490
      while(tablehasrows)
491
      { //handle the deleted documents
492
        doclist.append("<deletedDocument><docid>").append(rs.getString(1));
493
        doclist.append("</docid><rev></rev></deletedDocument>");
494
        tablehasrows = rs.next();
495
      }
496
      
497
      doclist.append("</updates></replication>");
498
      //System.out.println("doclist: " + doclist.toString());
499
      conn.close();
500
      response.setContentType("text/xml");
501
      out.println(doclist.toString());
502
    }
503
    catch(Exception e)
504
    {
505
      System.out.println("error in handleupdaterequest2: " + e.getMessage());
506
      e.printStackTrace(System.out);
507
    }
508
    
509
  }
510
  
511
  /**
512
   * Sends the current system date to the remote server.  Using this action
513
   * for replication gets rid of any problems with syncronizing clocks 
514
   * because a time specific to a document is always kept on its home server.
515
   */
516
  private void handleGetTimeRequest(PrintWriter out, Hashtable params, 
517
                                    HttpServletResponse response)
518
  {
519
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
520
    java.util.Date localtime = new java.util.Date();
521
    String dateString = formatter.format(localtime);
522
    response.setContentType("text/xml");
523
    
524
    out.println("<timestamp>" + dateString + "</timestamp>");
525
  }
526
  
527
  /**
528
   * This method is what is run when a seperate thread is broken off to handle
529
   * inserting a document from a remote replicated server.
530
   */
531
  public void run()
532
  {
533
    try
534
    {
535
      System.out.println("thread started for docid: " + 
536
                         (String)fileLocks.elementAt(0));
537
      Thread.sleep(30000); //the lock will expire in 30 seconds
538
      System.out.println("thread for docid: " + 
539
                         (String)fileLocks.elementAt(fileLocks.size() - 1) + 
540
                         " exiting.");
541
      fileLocks.remove(fileLocks.size() - 1);
542
      //fileLocks is treated as a FIFO queue.  If there are more than one lock
543
      //in the vector, the first one inserted will be removed.
544
    }
545
    catch(Exception e)
546
    {
547
      System.out.println("error in file lock thread: " + e.getMessage());
548
    }
549
  }
550
  
551
  /**
552
   * Returns the name of a server given a serverCode
553
   * @param serverCode the serverid of the server
554
   * @return the servername or null if the specified serverCode does not
555
   *         exist.
556
   */
557
  public static String getServer(int serverCode)
558
  {
559
    //System.out.println("serverid: " + serverCode);
560
    try
561
    {
562
      Connection conn = util.openDBConnection();
563
      String sql = new String("select server from " +
564
                              "xml_replication where serverid = " + 
565
                              serverCode);
566
      PreparedStatement pstmt = conn.prepareStatement(sql);
567
      //System.out.println("getserver sql: " + sql);
568
      pstmt.execute();
569
      ResultSet rs = pstmt.getResultSet();
570
      boolean tablehasrows = rs.next();
571
      if(tablehasrows)
572
      {
573
        //System.out.println("server: " + rs.getString(1));
574
        return rs.getString(1);
575
      }
576
      conn.close();
577
    }
578
    catch(Exception e)
579
    {
580
      System.out.println("Error in MetacatReplication.getServer: " + 
581
                          e.getMessage());
582
    }
583
    return null;
584
      //return null if the server does not exist
585
  }
586
  
587
  /**
588
   * Returns a server code given a server name
589
   * @param server the name of the server
590
   * @return integer > 0 representing the code of the server, 0 if the server
591
   *  does not exist.
592
   */
593
  public static int getServerCode(String server) throws Exception
594
  {
595
    try
596
    {
597
      Connection conn = util.openDBConnection();
598
      PreparedStatement pstmt = conn.prepareStatement("select serverid from " +
599
                                         "xml_replication where server " +
600
                                         "like '" + server + "'");
601
      pstmt.execute();
602
      ResultSet rs = pstmt.getResultSet();
603
      boolean tablehasrows = rs.next();
604
      int serverCode = 0;
605
      if(tablehasrows)
606
      {
607
         return rs.getInt(1);
608
      }
609
      else
610
      {
611
        return 0;
612
      }
613
    }
614
    catch(Exception e)
615
    {
616
      throw e;
617
    }
618
  }
619
  
620
  /**
621
   * This method returns the content of a url
622
   * @param u the url to return the content from
623
   * @return a string representing the content of the url
624
   * @throws java.io.IOException
625
   */
626
  public static String getURLContent(URL u) throws java.io.IOException
627
  {
628
    //System.out.println("url: " + u.toString());
629
    char istreamChar;
630
    int istreamInt;
631
    InputStreamReader istream = new InputStreamReader(u.openStream());
632
    StringBuffer serverResponse = new StringBuffer();
633
    while((istreamInt = istream.read()) != -1)
634
    {
635
      istreamChar = (char)istreamInt;
636
      serverResponse.append(istreamChar);
637
    }
638
    
639
    return serverResponse.toString();
640
  }
641
}
(27-27/36)