Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-12-06 13:26:52 -0800 (Wed, 06 Dec 2000) $'
11
 * '$Revision: 584 $'
12
 */
13

    
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24

    
25
import org.xml.sax.*;
26

    
27
public class MetacatReplication extends HttpServlet implements Runnable
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  private static MetaCatUtil util = new MetaCatUtil();
32
  private Vector fileLocks = new Vector();
33
  private Thread lockThread = null;
34
  
35
  /**
36
   * Initialize the servlet by creating appropriate database connections
37
   */
38
  public void init(ServletConfig config) throws ServletException 
39
  {
40
    //initialize db connections to handle any update requests
41
    MetaCatUtil util = new MetaCatUtil();
42
    deltaT = util.getOption("deltaT");
43
    //the default deltaT can be set from metacat.properties
44
    //create a thread to do the delta-T check but don't execute it yet
45
    replicationDaemon = new Timer(true);
46
  }
47
  
48
  public void destroy() 
49
  {
50
    replicationDaemon.cancel();
51
    System.out.println("Replication daemon cancelled.");
52
  }
53
  
54
  public void doGet (HttpServletRequest request, HttpServletResponse response)
55
                     throws ServletException, IOException 
56
  {
57
    // Process the data and send back the response
58
    handleGetOrPost(request, response);
59
  }
60

    
61
  public void doPost(HttpServletRequest request, HttpServletResponse response)
62
                     throws ServletException, IOException 
63
  {
64
    // Process the data and send back the response
65
    handleGetOrPost(request, response);
66
  }
67
  
68
  private void handleGetOrPost(HttpServletRequest request, 
69
                               HttpServletResponse response) 
70
                               throws ServletException, IOException 
71
  {
72
    PrintWriter out = response.getWriter();
73
    Hashtable params = new Hashtable();
74
    Enumeration paramlist = request.getParameterNames();
75
    
76
    while (paramlist.hasMoreElements()) 
77
    {
78
      String name = (String)paramlist.nextElement();
79
      String[] value = request.getParameterValues(name);
80
      params.put(name, value);  
81
    }
82
    
83
    if(params.containsKey("action"))
84
    {
85
      if(((String[])params.get("action"))[0].equals("stop"))
86
      { //stop the replication server
87
        replicationDaemon.cancel();
88
        out.println("Replication Handler Stopped");
89
        System.out.println("Replication Handler Stopped");
90
        MetacatReplication.replLog("deltaT handler stopped");
91
      }
92
      else if(((String[])params.get("action"))[0].equals("start"))
93
      { //start the replication server
94
        int rate;
95
        if(params.containsKey("rate"))
96
        {
97
          rate = new Integer(
98
                 new String(((String[])params.get("rate"))[0])).intValue();
99
          if(rate < 30)
100
          {
101
            out.println("Replication deltaT rate cannot be less than 30!");
102
            //deltaT<30 is a timing mess!
103
            rate = 1000;
104
          }
105
        }
106
        else
107
        {
108
          rate = 1000;
109
        }
110
        
111
        out.println("New rate is: " + rate + " seconds.");
112
        replicationDaemon.cancel();
113
        replicationDaemon = new Timer(true);
114
        replicationDaemon.scheduleAtFixedRate(new ReplicationHandler(out), 0, 
115
                                              rate * 1000);
116
        MetacatReplication.replLog("deltaT handler started with rate=" + 
117
                                    rate + " seconds");
118
        out.println("Replication Handler Started");
119
        System.out.println("Replication Handler Started");
120
      }
121
      else if(((String[])params.get("action"))[0].equals("forcereplicate"))
122
      {
123
        handleForceReplicateRequest(out, params, response);
124
      }
125
      else if(((String[])params.get("action"))[0].equals("update"))
126
      { //request an update list from the server
127
        handleUpdateRequest(out, params, response);
128
      }
129
      else if(((String[])params.get("action"))[0].equals("read"))
130
      { //request a specific document from the server
131
        //note that this could be replaced by a call to metacatServlet
132
        //handleGetDocumentAction().
133
        handleGetDocumentRequest(out, params, response);
134
      }
135
      else if(((String[])params.get("action"))[0].equals("getlock"))
136
      {
137
        handleGetLockRequest(out, params, response);
138
      }
139
      else if(((String[])params.get("action"))[0].equals("getdocumentinfo"))
140
      {
141
        handleGetDocumentInfoRequest(out, params, response);
142
      }
143
      else if(((String[])params.get("action"))[0].equals("gettime"))
144
      {
145
        handleGetTimeRequest(out, params, response);
146
      }
147
    }
148
  }
149
  
150
  /**
151
   * when a forcereplication request comes in, this method sends a read request
152
   * to the requesting server for the specified docid.
153
   */
154
  private void handleForceReplicateRequest(PrintWriter out, Hashtable params,
155
                                           HttpServletResponse response)
156
  {
157
    //System.out.println("in handleforcereplicaterequest");
158
    String server = ((String[])params.get("server"))[0];
159
    //the server that the request came from
160
    String docid = ((String[])params.get("docid"))[0];
161
    //the docid of the document to get
162
    String dbaction = "UPDATE";
163
    //default action is update
164
    boolean override = false;
165
    int serverCode = 1;
166
    
167
    try
168
    {
169
      if(params.containsKey("dbaction"))
170
      { //if the url contains a dbaction then the default action is overridden
171
        dbaction = ((String[])params.get("dbaction"))[0];
172
        serverCode = MetacatReplication.getServerCode(server);
173
        override = true; //we are now overriding the default action
174
      }
175
      //System.out.println("action in forcereplicate is: " + dbaction);
176
      //System.out.println("serverCode in forcereplicate is: " + serverCode);
177
      MetacatReplication.replLog("force replication request from " + server); 
178
      
179
      int serverCheckCode = MetacatReplication.getServerCode(server);
180
      URL u = new URL("http://" + server + "?action=read&docid=" + docid);
181
      //System.out.println("sending message: " + u.toString());
182
      String xmldoc = MetacatReplication.getURLContent(u);
183
      //get the document to write
184
      URL docinfourl = new URL("http://" + server + 
185
                               "?action=getdocumentinfo&docid=" +
186
                               docid);
187
      //we need to get the document's info so we can set the correct user
188
      //and group once we get the document and write it to our DB
189
      //System.out.println("sending message: " + docinfourl.toString());
190
      String docInfoStr = MetacatReplication.getURLContent(docinfourl);
191
      DocInfoHandler dih = new DocInfoHandler();
192
      //dih is the parser for the docinfo xml format
193
      XMLReader docinfoParser = ReplicationHandler.initParser(dih);
194
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
195
      Hashtable docinfoHash = dih.getDocInfo();
196
      String user = (String)docinfoHash.get("user_owner");
197
      String group = new String(user);
198
      //right now the user and group are the same.
199
      Connection conn = util.openDBConnection();
200
      DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid, 
201
                         user, group, serverCode, override);
202
      MetacatReplication.replLog("document " + docid + " added to DB with " +
203
                                 "action " + dbaction);
204
      conn.close();
205
    }
206
    catch(Exception e)
207
    {
208
      System.out.println("error in metacatReplication.handleForceReplicate" +
209
                         "Request: " + e.getMessage());
210
    }
211
  }
212
  
213
  /**
214
   * Grants or denies a lock to a requesting host.
215
   * The servlet parameters of interrest are:
216
   * docid: the docid of the file the lock is being requested for
217
   * currentdate: the timestamp of the document on the remote server
218
   * 
219
   */
220
  private void handleGetLockRequest(PrintWriter out, Hashtable params,
221
                                    HttpServletResponse response)
222
  {
223
    try
224
    {
225
      Connection conn = util.openDBConnection();
226
      String docid = ((String[])params.get("docid"))[0];
227
      String remoteRev = ((String[])params.get("updaterev"))[0];
228
      DocumentImpl requestDoc = new DocumentImpl(conn, docid);
229
      MetacatReplication.replLog("lock request for " + docid);
230
      int localRevInt = requestDoc.getRev();
231
      int remoteRevInt = Integer.parseInt(remoteRev);
232
      
233
      if(remoteRevInt >= localRevInt)
234
      {
235
        if(!fileLocks.contains(docid))
236
        { //grant the lock if it is not already locked
237
          fileLocks.add(0, docid); //insert at the beginning of the queue Vector
238
          //send a message back to the the remote host authorizing the insert
239
          out.println("<lockgranted><docid>" +docid+ "</docid></lockgranted>");
240
          lockThread = new Thread(this);
241
          lockThread.setPriority(Thread.MIN_PRIORITY);
242
          lockThread.start();
243
          MetacatReplication.replLog("lock granted for " + docid);
244
        }
245
        else
246
        { //deny the lock
247
          out.println("<filelocked><docid>" + docid + "</docid></filelocked>");
248
          MetacatReplication.replLog("lock denied for " + docid + 
249
                                     "reason: file already locked");
250
        }
251
      }
252
      else
253
      {//deny the lock.
254
        out.println("<outdatedfile><docid>" + docid + "</docid></filelocked>");
255
        MetacatReplication.replLog("lock denied for " + docid + 
256
                                   "reason: client has outdated file");
257
      }
258
      conn.close();
259
    }
260
    catch(Exception e)
261
    {
262
      System.out.println("error requesting file lock: " + e.getMessage());
263
      e.printStackTrace(System.out);
264
    }
265
  }
266
  
267
  /**
268
   * Sends all of the xml_documents information encoded in xml to a requestor
269
   * the format is:
270
   * <!ELEMENT documentinfo (docid, docname, doctype, doctitle, user_owner,
271
   *                         user_updated, public_access, rev)
272
   * all of the subelements of document info are #PCDATA
273
   */
274
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
275
                                        HttpServletResponse response)
276
  {
277
    String docid = ((String[])(params.get("docid")))[0];
278
    StringBuffer sb = new StringBuffer();
279
    try
280
    {
281
      Connection conn = util.openDBConnection();
282
      DocumentImpl doc = new DocumentImpl(conn, docid);
283
      sb.append("<documentinfo><docid>").append(docid);
284
      sb.append("</docid><docname>").append(doc.getDocname());
285
      sb.append("</docname><doctype>").append(doc.getDoctype());
286
      sb.append("</doctype><doctitle>").append(doc.getDocTitle());
287
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
288
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
289
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
290
      sb.append("</public_access><rev>").append(doc.getRev());
291
      sb.append("</rev></documentinfo>");
292
      response.setContentType("text/xml");
293
      out.println(sb.toString());
294
      conn.close();
295
    }
296
    catch (Exception e)
297
    {
298
      System.out.println("error in metacatReplication.handlegetdocumentinforequest: " + 
299
      e.getMessage());
300
    }
301
    
302
  }
303
  
304
  /**
305
   * Sends a document to a remote host
306
   */
307
  private void handleGetDocumentRequest(PrintWriter out, Hashtable params, 
308
                                        HttpServletResponse response)
309
  {
310
    try
311
    {
312
      String docid = ((String[])(params.get("docid")))[0];
313
      //System.out.println("incoming get request for document: " + docid);
314
      Connection conn = util.openDBConnection();
315
      DocumentImpl di = new DocumentImpl(conn, docid);
316
      response.setContentType("text/xml");
317
      out.print(di.toString());
318
      conn.close();
319
      MetacatReplication.replLog("document " + docid + " sent");
320
    }
321
    catch(Exception e)
322
    {
323
      System.out.println("error getting document: " + e.getMessage());
324
    }
325
    
326
  }
327
  
328
  /**
329
   * Sends a list of all of the documents on this sever along with their
330
   * revision numbers.  
331
   * The format is:
332
   * <!ELEMENT replication (server, updates)>
333
   * <!ELEMENT server (#PCDATA)>
334
   * <!ELEMENT updates ((updatedDocument | deleteDocument)*)>
335
   * <!ELEMENT updatedDocument (docid, rev)>
336
   * <!ELEMENT deletedDocument (docid, rev)>
337
   * <!ELEMENT docid (#PCDATA)>
338
   * <!ELEMENT rev (#PCDATA)>
339
   * note that the rev in deletedDocument is always empty.  I just left
340
   * it in there to make the parser implementation easier.
341
   */
342
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
343
                                    HttpServletResponse response)
344
  {
345
    try
346
    {
347
      //System.out.println("received update request");
348
      StringBuffer docsql = new StringBuffer();
349
      StringBuffer doclist = new StringBuffer();
350
      
351
      //get all docs that reside on this server
352
      doclist.append("<?xml version=\"1.0\"?><replication>");
353
      doclist.append("<server>").append(util.getOption("server"));
354
      doclist.append(util.getOption("replicationpath"));
355
      doclist.append("</server><updates>");
356
      
357
      docsql.append("select docid, rev from xml_documents where "); 
358
      docsql.append("server_location = 1");
359
      
360
      //get any deleted documents
361
      StringBuffer delsql = new StringBuffer();
362
      delsql.append("select distinct docid from ");
363
      delsql.append("xml_revisions where docid not in (select docid from ");
364
      delsql.append("xml_documents) and server_location = 1");
365
      
366
      Connection conn = util.openDBConnection();
367
      PreparedStatement pstmt = conn.prepareStatement(docsql.toString());
368
      pstmt.execute();
369
      ResultSet rs = pstmt.getResultSet();
370
      boolean tablehasrows = rs.next();
371
      while(tablehasrows)
372
      {
373
        doclist.append("<updatedDocument>");
374
        doclist.append("<docid>").append(rs.getString(1));
375
        doclist.append("</docid><rev>").append(rs.getInt(2));
376
        doclist.append("</rev>");
377
        doclist.append("</updatedDocument>");
378
        tablehasrows = rs.next();
379
      }
380
      
381
      pstmt = conn.prepareStatement(delsql.toString());
382
      pstmt.execute();
383
      rs = pstmt.getResultSet();
384
      tablehasrows = rs.next();
385
      while(tablehasrows)
386
      { //handle the deleted documents
387
        doclist.append("<deletedDocument><docid>").append(rs.getString(1));
388
        doclist.append("</docid><rev></rev></deletedDocument>");
389
        //note that rev is always empty for deleted docs
390
        tablehasrows = rs.next();
391
      }
392
      
393
      doclist.append("</updates></replication>");
394
      //System.out.println("doclist: " + doclist.toString());
395
      conn.close();
396
      response.setContentType("text/xml");
397
      out.println(doclist.toString());
398
      MetacatReplication.replLog("update request handled");
399
    }
400
    catch(Exception e)
401
    {
402
      System.out.println("error in handleupdaterequest2: " + e.getMessage());
403
      e.printStackTrace(System.out);
404
    }
405
    
406
  }
407
  
408
  /**
409
   * Sends the current system date to the remote server.  Using this action
410
   * for replication gets rid of any problems with syncronizing clocks 
411
   * because a time specific to a document is always kept on its home server.
412
   */
413
  private void handleGetTimeRequest(PrintWriter out, Hashtable params, 
414
                                    HttpServletResponse response)
415
  {
416
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
417
    java.util.Date localtime = new java.util.Date();
418
    String dateString = formatter.format(localtime);
419
    response.setContentType("text/xml");
420
    
421
    out.println("<timestamp>" + dateString + "</timestamp>");
422
  }
423
  
424
  /**
425
   * this method handles the timeout for a file lock.  when a lock is 
426
   * granted it is granted for 30 seconds.  When this thread runs out
427
   * it deletes the docid from the queue, thus eliminating the lock.
428
   */
429
  public void run()
430
  {
431
    try
432
    {
433
      //System.out.println("thread started for docid: " + 
434
      //                   (String)fileLocks.elementAt(0));
435
      Thread.sleep(30000); //the lock will expire in 30 seconds
436
      //System.out.println("thread for docid: " + 
437
      //                   (String)fileLocks.elementAt(fileLocks.size() - 1) + 
438
      //                   " exiting.");
439
      fileLocks.remove(fileLocks.size() - 1);
440
      //fileLocks is treated as a FIFO queue.  If there are more than one lock
441
      //in the vector, the first one inserted will be removed.
442
    }
443
    catch(Exception e)
444
    {
445
      System.out.println("error in file lock thread: " + e.getMessage());
446
    }
447
  }
448
  
449
  /**
450
   * Returns the name of a server given a serverCode
451
   * @param serverCode the serverid of the server
452
   * @return the servername or null if the specified serverCode does not
453
   *         exist.
454
   */
455
  public static String getServer(int serverCode)
456
  {
457
    //System.out.println("serverid: " + serverCode);
458
    try
459
    {
460
      Connection conn = util.openDBConnection();
461
      String sql = new String("select server from " +
462
                              "xml_replication where serverid = " + 
463
                              serverCode);
464
      PreparedStatement pstmt = conn.prepareStatement(sql);
465
      //System.out.println("getserver sql: " + sql);
466
      pstmt.execute();
467
      ResultSet rs = pstmt.getResultSet();
468
      boolean tablehasrows = rs.next();
469
      if(tablehasrows)
470
      {
471
        //System.out.println("server: " + rs.getString(1));
472
        return rs.getString(1);
473
      }
474
      conn.close();
475
    }
476
    catch(Exception e)
477
    {
478
      System.out.println("Error in MetacatReplication.getServer: " + 
479
                          e.getMessage());
480
    }
481
    return null;
482
      //return null if the server does not exist
483
  }
484
  
485
  /**
486
   * Returns a server code given a server name
487
   * @param server the name of the server
488
   * @return integer > 0 representing the code of the server, 0 if the server
489
   *  does not exist.
490
   */
491
  public static int getServerCode(String server) throws Exception
492
  {
493
    try
494
    {
495
      Connection conn = util.openDBConnection();
496
      PreparedStatement pstmt = conn.prepareStatement("select serverid from " +
497
                                         "xml_replication where server " +
498
                                         "like '" + server + "'");
499
      pstmt.execute();
500
      ResultSet rs = pstmt.getResultSet();
501
      boolean tablehasrows = rs.next();
502
      int serverCode = 0;
503
      if(tablehasrows)
504
      {
505
         return rs.getInt(1);
506
      }
507
      else
508
      {
509
        return 0;
510
      }
511
    }
512
    catch(Exception e)
513
    {
514
      throw e;
515
    }
516
  }
517
  
518
  /**
519
   * This method returns the content of a url
520
   * @param u the url to return the content from
521
   * @return a string representing the content of the url
522
   * @throws java.io.IOException
523
   */
524
  public static String getURLContent(URL u) throws java.io.IOException
525
  {
526
    //System.out.println("url: " + u.toString());
527
    char istreamChar;
528
    int istreamInt;
529
    InputStreamReader istream = new InputStreamReader(u.openStream());
530
    StringBuffer serverResponse = new StringBuffer();
531
    while((istreamInt = istream.read()) != -1)
532
    {
533
      istreamChar = (char)istreamInt;
534
      serverResponse.append(istreamChar);
535
    }
536
    
537
    return serverResponse.toString();
538
  }
539
  
540
  /**
541
   * Method for writing replication messages to a log file specified in 
542
   * metacat.properties
543
   */
544
  public static void replLog(String message)
545
  {
546
    try
547
    {
548
      FileOutputStream fos = new FileOutputStream(
549
                                 util.getOption("replicationlog"), true);
550
      PrintWriter pw = new PrintWriter(fos);
551
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
552
      java.util.Date localtime = new java.util.Date();
553
      String dateString = formatter.format(localtime);
554
      dateString += " :: " + message;
555
      //time stamp each entry
556
      pw.println(dateString);
557
      pw.flush();
558
    }
559
    catch(Exception e)
560
    {
561
      System.out.println("error writing to replication log");
562
      e.printStackTrace(System.out);
563
    }
564
  }
565
}
(29-29/38)