Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-11-29 15:18:22 -0800 (Wed, 29 Nov 2000) $'
11
 * '$Revision: 573 $'
12
 */
13

    
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24
import oracle.xml.parser.v2.*;
25
import org.xml.sax.*;
26

    
27
public class MetacatReplication extends HttpServlet implements Runnable
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  private static MetaCatUtil util = new MetaCatUtil();
32
  private Vector fileLocks = new Vector();
33
  private Thread lockThread = null;
34
  
35
  /**
36
   * Initialize the servlet by creating appropriate database connections
37
   */
38
  public void init(ServletConfig config) throws ServletException 
39
  {
40
    //initialize db connections to handle any update requests
41
    MetaCatUtil util = new MetaCatUtil();
42
    deltaT = util.getOption("deltaT");
43
    //break off a thread to do the delta-T check
44
    replicationDaemon = new Timer(true);
45
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
46
    //                                      new Integer(deltaT).intValue() * 1000);
47
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
48
    //                   + " seconds");
49
    
50
  }
51
  
52
  public void destroy() 
53
  {
54
    replicationDaemon.cancel();
55
    System.out.println("Replication daemon cancelled.");
56
  }
57
  
58
  public void doGet (HttpServletRequest request, HttpServletResponse response)
59
                     throws ServletException, IOException 
60
  {
61
    // Process the data and send back the response
62
    handleGetOrPost(request, response);
63
  }
64

    
65
  public void doPost(HttpServletRequest request, HttpServletResponse response)
66
                     throws ServletException, IOException 
67
  {
68
    // Process the data and send back the response
69
    handleGetOrPost(request, response);
70
  }
71
  
72
  private void handleGetOrPost(HttpServletRequest request, 
73
                               HttpServletResponse response) 
74
                               throws ServletException, IOException 
75
  {
76
System.out.println("in metacatreplication");
77
    PrintWriter out = response.getWriter();
78
    Hashtable params = new Hashtable();
79
    Enumeration paramlist = request.getParameterNames();
80
    
81
    while (paramlist.hasMoreElements()) 
82
    {
83
      String name = (String)paramlist.nextElement();
84
      String[] value = request.getParameterValues(name);
85
      params.put(name, value);  
86
    }
87
    
88
    if(params.containsKey("action"))
89
    {
90
      if(((String[])params.get("action"))[0].equals("stop"))
91
      { //stop the replication server
92
        replicationDaemon.cancel();
93
        out.println("Replication Handler Stopped");
94
        System.out.println("Replication Handler Stopped");
95
      }
96
      else if(((String[])params.get("action"))[0].equals("start"))
97
      { //start the replication server
98
        int rate;
99
        if(params.containsKey("rate"))
100
        {
101
          rate = new Integer(
102
                 new String(((String[])params.get("rate"))[0])).intValue();
103
          if(rate < 30)
104
          {
105
            out.println("Replication deltaT rate cannot be less than 30!");
106
            rate = 1000;
107
          }
108
        }
109
        else
110
        {
111
          rate = 1000;
112
        }
113
        
114
        out.println("New rate is: " + rate + " seconds.");
115
        replicationDaemon.cancel();
116
        replicationDaemon = new Timer(true);
117
        replicationDaemon.scheduleAtFixedRate(new ReplicationHandler(out), 0, 
118
                                              rate * 1000);
119
        out.println("Replication Handler Started");
120
        System.out.println("Replication Handler Started");
121
      }
122
      else if(((String[])params.get("action"))[0].equals("forcereplicate"))
123
      {
124
        String server = ((String[])params.get("server"))[0];
125
        int serverCheckCode = 0;
126
        try
127
        {
128
          serverCheckCode = MetacatReplication.getServerCode(server);
129
        }
130
        catch(Exception e)
131
        {
132
          System.out.println("error in metacatReplication.handleUpdateRequest"+
133
                             ": could not get server code");
134
        }
135
        replicationDaemon.schedule(new ReplicationHandler(out, serverCheckCode),
136
                                   2000);
137
      }
138
      else if(((String[])params.get("action"))[0].equals("update"))
139
      { //request an update list from the server
140
        if(params.contains("servercheckcode"))
141
        {
142
          System.out.println("metacatreplication: servercheckcode: " + 
143
                               ((int[])params.get("servercheckcode"))[0]);
144
          handleUpdateRequest(out, params, response, 
145
                              ((int[])params.get("servercheckcode"))[0]);
146
        }
147
        else
148
        {
149
          handleUpdateRequest(out, params, response);
150
        }
151
      }
152
      else if(((String[])params.get("action"))[0].equals("read"))
153
      { //request a specific document from the server
154
        //note that this could be replaced by a call to metacatServlet
155
        //handleGetDocumentAction().
156
        handleGetDocumentRequest(out, params, response);
157
      }
158
      else if(((String[])params.get("action"))[0].equals("getlock"))
159
      {
160
        handleGetLockRequest(out, params, response);
161
      }
162
      else if(((String[])params.get("action"))[0].equals("getdocumentinfo"))
163
      {
164
        handleGetDocumentInfoRequest(out, params, response);
165
      }
166
      else if(((String[])params.get("action"))[0].equals("gettime"))
167
      {
168
        handleGetTimeRequest(out, params, response);
169
      }
170
    }
171
  }
172
  
173
  /**
174
   * Grants or denies a lock to a requesting host.
175
   * The servlet parameters of interrest are:
176
   * docid: the docid of the file the lock is being requested for
177
   * currentdate: the timestamp of the document on the remote server
178
   * 
179
   */
180
  private void handleGetLockRequest(PrintWriter out, Hashtable params,
181
                                    HttpServletResponse response)
182
  {
183
    java.util.Date remoteDate = new java.util.Date();
184
    java.util.Date localDate = new java.util.Date();
185
    try
186
    {
187
      Connection conn = util.openDBConnection();
188
      String docid = ((String[])params.get("docid"))[0];
189
      String remoteDateStr = ((String[])params.get("updatedate"))[0];
190
      DocumentImpl requestDoc = new DocumentImpl(conn, docid);
191

    
192
      String localDateStr = requestDoc.getUpdateDate();
193
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
194
      ParsePosition pos = new ParsePosition(0);
195
      remoteDate = formatter.parse(remoteDateStr, pos);
196
      pos = new ParsePosition(0);
197
      localDate = formatter.parse(localDateStr, pos);
198

    
199
      if(remoteDate.compareTo(localDate) >= 0)
200
      {
201
        if(!fileLocks.contains(docid))
202
        { //grant the lock
203
          fileLocks.add(0, docid); //insert at the beginning of the queue Vector
204
          //send a message back to the the remote host authorizing the insert
205
          out.println("<lockgranted><docid>" +docid+ "</docid></lockgranted>");
206
          lockThread = new Thread(this);
207
          lockThread.setPriority(Thread.MIN_PRIORITY);
208
          lockThread.start();
209
        }
210
        else
211
        { //deny the lock
212
          out.println("<filelocked><docid>" + docid + "</docid></filelocked>");
213
        }
214
      }
215
      else
216
      {//deny the lock.
217
        out.println("<outdatedfile><docid>" + docid + "</docid></filelocked>");
218
      }
219
      conn.close();
220
    }
221
    catch(Exception e)
222
    {
223
      System.out.println("error requesting file lock: " + e.getMessage());
224
      e.printStackTrace(System.out);
225
    }
226
  }
227
  
228
  /**
229
   * Sends all of the xml_documents information encoded in xml to a requestor
230
   */
231
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
232
                                        HttpServletResponse response)
233
  {
234
    String docid = ((String[])(params.get("docid")))[0];
235
    StringBuffer sb = new StringBuffer();
236
    try
237
    {
238
      Connection conn = util.openDBConnection();
239
      DocumentImpl doc = new DocumentImpl(conn, docid);
240
      sb.append("<documentinfo><docid>").append(docid);
241
      sb.append("</docid><docname>").append(doc.getDocname());
242
      sb.append("</docname><doctype>").append(doc.getDoctype());
243
      sb.append("</doctype><doctitle>").append(doc.getDocTitle());
244
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
245
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
246
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
247
      sb.append("</public_access></documentinfo>");
248
      response.setContentType("text/xml");
249
      out.println(sb.toString());
250
      conn.close();
251
    }
252
    catch (Exception e)
253
    {
254
      System.out.println("error in metacatReplication.handlegetdocumentinforequest: " + 
255
      e.getMessage());
256
    }
257
    
258
  }
259
  
260
  /**
261
   * Sends a document to a remote host
262
   */
263
  private void handleGetDocumentRequest(PrintWriter out, Hashtable params, 
264
                                        HttpServletResponse response)
265
  {
266
    try
267
    {
268
      String docid = ((String[])(params.get("docid")))[0];
269
      System.out.println("incoming get request for document: " +
270
                         docid);
271
      Connection conn = util.openDBConnection();
272
      DocumentImpl di = new DocumentImpl(conn, docid);
273
      response.setContentType("text/xml");
274
      out.print(di.toString());
275
      conn.close();
276
    }
277
    catch(Exception e)
278
    {
279
      System.out.println("error getting document: " + e.getMessage());
280
    }
281
    
282
  }
283
  
284
  /**
285
   * Does default handling for update requests
286
   */
287
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
288
                                   HttpServletResponse response)
289
  {
290
    handleUpdateRequest(out, params, response, 1);
291
  }
292

    
293
  /**
294
   * Initiates an update of all server in the xml_replication table.
295
   * the remote host sends an update date.  The local host (this method)
296
   * queries the db for any document that was updated after the update date
297
   * and returns a list of those documents to the remote host.  It also 
298
   * sends back a list of the files that were locally deleted.
299
   
300
   * serverCheckCode allows the requestor to request documents from this 
301
   * server that reside on a different server.  Normally, this server
302
   * should only replicate files that it owns but in the case of updating 
303
   * a file that does not belong to this server, it is needed.  See 
304
   * DocumentImpl.write().
305
   */
306
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
307
                                   HttpServletResponse response, 
308
                                   int serverCheckCode)
309
  { 
310
    System.out.println("incoming update request for dt/time " + 
311
                       ((String[])params.get("date"))[0] +
312
                       " from external metacat");
313
    response.setContentType("text/xml");
314
    StringBuffer returnXML = new StringBuffer();
315
    returnXML.append("<?xml version=\"1.0\"?><replication>");
316
    
317
    StringBuffer sql = new StringBuffer();
318
    String updateStr = ((String[])params.get("date"))[0];
319
    updateStr = updateStr.replace('+', ' ');
320
    //pseudo algorithm:
321
    ///////////////////////////////////////////////////////////////////////
322
    //get the date/time from the requestor, query the db for any documents
323
    //that have an update date later than the requested date and send
324
    //those docids back to the requestor.  If there are no newer documents
325
    //then send back a null message.
326
    ///////////////////////////////////////////////////////////////////////
327
    
328
    //Timestamp update = Timestamp.valueOf(updateStr);
329
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
330
    java.util.Date update = new java.util.Date();
331
    ParsePosition pos = new ParsePosition(0);
332
    update = formatter.parse(updateStr, pos);
333
    String dateString = formatter.format(update);
334
    sql.append("select docid, date_updated, server_location from ");
335
    sql.append("xml_documents where date_updated > ");
336
    sql.append("to_date('").append(dateString);
337
    sql.append("','YY-MM-DD HH24:MI:SS')");
338
    //System.out.println("sql: " + sql.toString());
339
    
340
    //get any recently deleted documents
341
    StringBuffer delsql = new StringBuffer();
342
    delsql.append("select docid, date_updated, server_location from ");
343
    delsql.append("xml_revisions where docid not in (select docid from ");
344
    delsql.append("xml_documents) and date_updated > to_date('");
345
    delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
346

    
347
    try
348
    {
349
      Connection conn = util.openDBConnection();
350
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
351
      pstmt.execute();
352
      ResultSet rs = pstmt.getResultSet();
353
      boolean tablehasrows = rs.next();
354
      
355
      //if a '1' should not represent localhost, add code here to query
356
      //xml_replication for the proper serverid number
357
      
358
      returnXML.append("<server>").append(util.getOption("server"));
359
      returnXML.append(util.getOption("replicationpath"));
360
      returnXML.append("</server><updates>");
361
      while(tablehasrows)
362
      {
363
        String docid = rs.getString(1);
364
        String dateUpdated = rs.getString(2);
365
        int serverCode = rs.getInt(3);
366
        if(serverCode == serverCheckCode)
367
        { //check that this document is from this server.
368
          //servers only replicate their own documents!
369
          returnXML.append("<updatedDocument><docid>").append(docid);
370
          returnXML.append("</docid><date_updated>").append(dateUpdated);
371
          returnXML.append("</date_updated></updatedDocument>");
372
        }
373
        tablehasrows = rs.next();
374
      }
375
      
376
      pstmt = conn.prepareStatement(delsql.toString());
377
      pstmt.execute();
378
      rs = pstmt.getResultSet();
379
      tablehasrows = rs.next();
380
      while(tablehasrows)
381
      { //handle the deleted documents
382
        String docid = rs.getString(1);
383
        String dateUpdated = rs.getString(2);
384
        int serverCode = rs.getInt(3);
385
        if(serverCode == 1)
386
        {
387
          returnXML.append("<deletedDocument><docid>").append(docid);
388
          returnXML.append("</docid><date_updated>").append(dateUpdated);
389
          returnXML.append("</date_updated></deletedDocument>");
390
        }
391
        tablehasrows = rs.next();
392
      }
393
      
394
      returnXML.append("</updates></replication>");
395
      conn.close();
396
      //System.out.println(returnXML.toString());
397
      out.print(returnXML.toString());
398
    }
399
    catch(Exception e)
400
    {
401
      System.out.println("Exception in metacatReplication: " + e.getMessage());
402
    }
403
  }
404
  
405
  /**
406
   * Sends the current system date to the remote server.  Using this action
407
   * for replication gets rid of any problems with syncronizing clocks 
408
   * because a time specific to a document is always kept on its home server.
409
   */
410
  private void handleGetTimeRequest(PrintWriter out, Hashtable params, 
411
                                    HttpServletResponse response)
412
  {
413
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
414
    java.util.Date localtime = new java.util.Date();
415
    String dateString = formatter.format(localtime);
416
    response.setContentType("text/xml");
417
    
418
    out.println("<timestamp>" + dateString + "</timestamp>");
419
  }
420
  
421
  /**
422
   * This method is what is run when a seperate thread is broken off to handle
423
   * inserting a document from a remote replicated server.
424
   */
425
  public void run()
426
  {
427
    try
428
    {
429
      System.out.println("thread started for docid: " + 
430
                         (String)fileLocks.elementAt(0));
431
      Thread.sleep(30000); //the lock will expire in 30 seconds
432
      System.out.println("thread for docid: " + 
433
                         (String)fileLocks.elementAt(fileLocks.size() - 1) + 
434
                         " exiting.");
435
      fileLocks.remove(fileLocks.size() - 1);
436
      //fileLocks is treated as a FIFO queue.  If there are more than one lock
437
      //in the vector, the first one inserted will be removed.
438
    }
439
    catch(Exception e)
440
    {
441
      System.out.println("error in file lock thread: " + e.getMessage());
442
    }
443
  }
444
  
445
  /**
446
   * Returns the name of a server given a serverCode
447
   * @param serverCode the serverid of the server
448
   * @return the servername or null if the specified serverCode does not
449
   *         exist.
450
   */
451
  public static String getServer(int serverCode)
452
  {
453
    //System.out.println("serverid: " + serverCode);
454
    try
455
    {
456
      Connection conn = util.openDBConnection();
457
      String sql = new String("select server from " +
458
                              "xml_replication where serverid = " + 
459
                              serverCode);
460
      PreparedStatement pstmt = conn.prepareStatement(sql);
461
      //System.out.println("getserver sql: " + sql);
462
      pstmt.execute();
463
      ResultSet rs = pstmt.getResultSet();
464
      boolean tablehasrows = rs.next();
465
      if(tablehasrows)
466
      {
467
        //System.out.println("server: " + rs.getString(1));
468
        return rs.getString(1);
469
      }
470
      conn.close();
471
    }
472
    catch(Exception e)
473
    {
474
      System.out.println("Error in MetacatReplication.getServer: " + 
475
                          e.getMessage());
476
    }
477
    return null;
478
      //return null if the server does not exist
479
  }
480
  
481
  /**
482
   * Returns a server code given a server name
483
   * @param server the name of the server
484
   * @return integer > 0 representing the code of the server, 0 if the server
485
   *  does not exist.
486
   */
487
  public static int getServerCode(String server) throws Exception
488
  {
489
    try
490
    {
491
      Connection conn = util.openDBConnection();
492
      PreparedStatement pstmt = conn.prepareStatement("select serverid from " +
493
                                         "xml_replication where server " +
494
                                         "like '" + server + "'");
495
      pstmt.execute();
496
      ResultSet rs = pstmt.getResultSet();
497
      boolean tablehasrows = rs.next();
498
      int serverCode = 0;
499
      if(tablehasrows)
500
      {
501
         return rs.getInt(1);
502
      }
503
      else
504
      {
505
        return 0;
506
      }
507
    }
508
    catch(Exception e)
509
    {
510
      throw e;
511
    }
512
  }
513
  
514
  /**
515
   * This method returns the content of a url
516
   * @param u the url to return the content from
517
   * @return a string representing the content of the url
518
   * @throws java.io.IOException
519
   */
520
  public static String getURLContent(URL u) throws java.io.IOException
521
  {
522
    //System.out.println("url: " + u.toString());
523
    char istreamChar;
524
    int istreamInt;
525
    InputStreamReader istream = new InputStreamReader(u.openStream());
526
    StringBuffer serverResponse = new StringBuffer();
527
    while((istreamInt = istream.read()) != -1)
528
    {
529
      istreamChar = (char)istreamInt;
530
      serverResponse.append(istreamChar);
531
    }
532
    
533
    return serverResponse.toString();
534
  }
535
}
(27-27/36)