Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-11-28 08:30:59 -0800 (Tue, 28 Nov 2000) $'
11
 * '$Revision: 568 $'
12
 */
13

    
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24
import oracle.xml.parser.v2.*;
25
import org.xml.sax.*;
26

    
27
public class MetacatReplication extends HttpServlet implements Runnable
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  private static MetaCatUtil util = new MetaCatUtil();
32
  private Vector fileLocks = new Vector();
33
  private Thread lockThread = null;
34
  
35
  /**
36
   * Initialize the servlet by creating appropriate database connections
37
   */
38
  public void init(ServletConfig config) throws ServletException 
39
  {
40
    //initialize db connections to handle any update requests
41
    MetaCatUtil util = new MetaCatUtil();
42
    deltaT = util.getOption("deltaT");
43
    //break off a thread to do the delta-T check
44
    replicationDaemon = new Timer(true);
45
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
46
    //                                      new Integer(deltaT).intValue() * 1000);
47
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
48
    //                   + " seconds");
49
    
50
  }
51
  
52
  public void destroy() 
53
  {
54
    replicationDaemon.cancel();
55
    System.out.println("Replication daemon cancelled.");
56
  }
57
  
58
  public void doGet (HttpServletRequest request, HttpServletResponse response)
59
                     throws ServletException, IOException 
60
  {
61
    // Process the data and send back the response
62
    handleGetOrPost(request, response);
63
  }
64

    
65
  public void doPost(HttpServletRequest request, HttpServletResponse response)
66
                     throws ServletException, IOException 
67
  {
68
    // Process the data and send back the response
69
    handleGetOrPost(request, response);
70
  }
71
  
72
  private void handleGetOrPost(HttpServletRequest request, 
73
                               HttpServletResponse response) 
74
                               throws ServletException, IOException 
75
  {
76
    PrintWriter out = response.getWriter();
77
    
78
    Hashtable params = new Hashtable();
79
    Enumeration paramlist = request.getParameterNames();
80
    
81
    while (paramlist.hasMoreElements()) 
82
    {
83
      String name = (String)paramlist.nextElement();
84
      String[] value = request.getParameterValues(name);
85
      params.put(name, value);  
86
    }
87
    
88
    if(params.containsKey("action"))
89
    {
90
      if(((String[])params.get("action"))[0].equals("stop"))
91
      { //stop the replication server
92
        replicationDaemon.cancel();
93
        out.println("Replication Handler Stopped");
94
        System.out.println("Replication Handler Stopped");
95
      }
96
      else if(((String[])params.get("action"))[0].equals("start"))
97
      { //start the replication server
98
        int rate;
99
        if(params.containsKey("rate"))
100
        {
101
          rate = new Integer(
102
                 new String(((String[])params.get("rate"))[0])).intValue();
103
          if(rate < 30)
104
          {
105
            out.println("Replication deltaT rate cannot be less than 30!");
106
            rate = 1000;
107
          }
108
        }
109
        else
110
        {
111
          rate = 1000;
112
        }
113
        
114
        out.println("New rate is: " + rate + " seconds.");
115
        replicationDaemon.cancel();
116
        replicationDaemon = new Timer(true);
117
        replicationDaemon.scheduleAtFixedRate(new ReplicationHandler(out), 0, 
118
                                              rate * 1000);
119
        out.println("Replication Handler Started");
120
      }
121
      else if(((String[])params.get("action"))[0].equals("update"))
122
      { //request an update list from the server
123
        handleUpdateRequest(out, params, response);
124
      }
125
      else if(((String[])params.get("action"))[0].equals("read"))
126
      { //request a specific document from the server
127
        //note that this could be replaced by a call to metacatServlet
128
        //handleGetDocumentAction().
129
        handleGetDocumentRequest(out, params, response);
130
      }
131
      else if(((String[])params.get("action"))[0].equals("getlock"))
132
      {
133
        handleGetLockRequest(out, params, response);
134
      }
135
      else if(((String[])params.get("action"))[0].equals("getdocumentinfo"))
136
      {
137
        handleGetDocumentInfoRequest(out, params, response);
138
      }
139
      else if(((String[])params.get("action"))[0].equals("gettime"))
140
      {
141
        handleGetTimeRequest(out, params, response);
142
      }
143
    }
144
  }
145
  
146
  /**
147
   * Grants or denies a lock to a requesting host.
148
   * The servlet parameters of interrest are:
149
   * docid: the docid of the file the lock is being requested for
150
   * currentdate: the timestamp of the document on the remote server
151
   * 
152
   */
153
  private void handleGetLockRequest(PrintWriter out, Hashtable params,
154
                                    HttpServletResponse response)
155
  {
156
    java.util.Date remoteDate = new java.util.Date();
157
    java.util.Date localDate = new java.util.Date();
158
    try
159
    {
160
      Connection conn = util.openDBConnection();
161
      String docid = ((String[])params.get("docid"))[0];
162
System.out.println("docid: " + docid);
163
      String remoteDateStr = ((String[])params.get("updatedate"))[0];
164
      DocumentImpl requestDoc = new DocumentImpl(conn, docid);
165

    
166
      String localDateStr = requestDoc.getUpdateDate();
167
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
168
      ParsePosition pos = new ParsePosition(0);
169
      remoteDate = formatter.parse(remoteDateStr, pos);
170
      pos = new ParsePosition(0);
171
      localDate = formatter.parse(localDateStr, pos);
172
System.out.println("remotedate: " + remoteDate.toString());
173
System.out.println("localdate: " + localDate.toString());
174
      if(remoteDate.compareTo(localDate) >= 0)
175
      {
176
        if(!fileLocks.contains(docid))
177
        { //grant the lock
178
          fileLocks.add(0, docid); //insert at the beginning of the queue Vector
179
          //send a message back to the the remote host authorizing the insert
180
          out.println("<lockgranted><docid>" +docid+ "</docid></lockgranted>");
181
          lockThread = new Thread(this);
182
          lockThread.setPriority(Thread.MIN_PRIORITY);
183
          lockThread.start();
184
        }
185
        else
186
        { //deny the lock
187
          out.println("<filelocked><docid>" + docid + "</docid></filelocked>");
188
        }
189
      }
190
      else
191
      {//deny the lock.
192
        out.println("<outdatedfile><docid>" + docid + "</docid></filelocked>");
193
      }
194
      conn.close();
195
    }
196
    catch(Exception e)
197
    {
198
      System.out.println("error requesting file lock: " + e.getMessage());
199
      e.printStackTrace(System.out);
200
    }
201
  }
202
  
203
  /**
204
   * Sends all of the xml_documents information encoded in xml to a requestor
205
   */
206
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
207
                                        HttpServletResponse response)
208
  {
209
    String docid = ((String[])(params.get("docid")))[0];
210
    StringBuffer sb = new StringBuffer();
211
    try
212
    {
213
      Connection conn = util.openDBConnection();
214
      DocumentImpl doc = new DocumentImpl(conn, docid);
215
      sb.append("<documentinfo><docid>").append(docid);
216
      sb.append("</docid><docname>").append(doc.getDocname());
217
      sb.append("</docname><doctype>").append(doc.getDoctype());
218
      sb.append("</doctype><doctitle>").append(doc.getDocTitle());
219
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
220
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
221
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
222
      sb.append("</public_access></documentinfo>");
223
      response.setContentType("text/xml");
224
      out.println(sb.toString());
225
      conn.close();
226
    }
227
    catch (Exception e)
228
    {
229
      System.out.println("error in metacatReplication.handlegetdocumentinforequest: " + 
230
      e.getMessage());
231
    }
232
    
233
  }
234
  
235
  /**
236
   * Sends a document to a remote host
237
   */
238
  private void handleGetDocumentRequest(PrintWriter out, Hashtable params, 
239
                                        HttpServletResponse response)
240
  {
241
    try
242
    {
243
      String docid = ((String[])(params.get("docid")))[0];
244
      System.out.println("incoming get request for document: " +
245
                         docid);
246
      Connection conn = util.openDBConnection();
247
      DocumentImpl di = new DocumentImpl(conn, docid);
248
      response.setContentType("text/xml");
249
      out.print(di.toString());
250
      conn.close();
251
    }
252
    catch(Exception e)
253
    {
254
      System.out.println("error getting document: " + e.getMessage());
255
    }
256
    
257
  }
258
  
259
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
260
                                   HttpServletResponse response)
261
  {
262
    System.out.println("incoming update request for dt/time " + 
263
                       ((String[])params.get("date"))[0] +
264
                       " from external metacat");
265
    response.setContentType("text/xml");
266
    StringBuffer returnXML = new StringBuffer();
267
    returnXML.append("<?xml version=\"1.0\"?><replication>");
268
    
269
    StringBuffer sql = new StringBuffer();
270
    String updateStr = ((String[])params.get("date"))[0];
271
    updateStr = updateStr.replace('+', ' ');
272
    //pseudo algorithm:
273
    ///////////////////////////////////////////////////////////////////////
274
    //get the date/time from the requestor, query the db for any documents
275
    //that have an update date later than the requested date and send
276
    //those docids back to the requestor.  If there are no newer documents
277
    //then send back a null message.
278
    ///////////////////////////////////////////////////////////////////////
279
    
280
    //Timestamp update = Timestamp.valueOf(updateStr);
281
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
282
    java.util.Date update = new java.util.Date();
283
    ParsePosition pos = new ParsePosition(0);
284
    update = formatter.parse(updateStr, pos);
285
    String dateString = formatter.format(update);
286
    sql.append("select docid, date_updated, server_location from ");
287
    sql.append("xml_documents where date_updated > ");
288
    sql.append("to_date('").append(dateString);
289
    sql.append("','YY-MM-DD HH24:MI:SS')");
290
    //System.out.println("sql: " + sql.toString());
291
    
292
    //get any recently deleted documents
293
    StringBuffer delsql = new StringBuffer();
294
    delsql.append("select docid, date_updated, server_location from ");
295
    delsql.append("xml_revisions where docid not in (select docid from ");
296
    delsql.append("xml_documents) and date_updated > to_date('");
297
    delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
298

    
299
    try
300
    {
301
      Connection conn = util.openDBConnection();
302
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
303
      pstmt.execute();
304
      ResultSet rs = pstmt.getResultSet();
305
      boolean tablehasrows = rs.next();
306
      
307
      //if a '1' should not represent localhost, add code here to query
308
      //xml_replication for the proper serverid number
309
      
310
      returnXML.append("<server>").append(util.getOption("server"));
311
      returnXML.append(util.getOption("replicationpath"));
312
      returnXML.append("</server><updates>");
313
      while(tablehasrows)
314
      {
315
        String docid = rs.getString(1);
316
        String dateUpdated = rs.getString(2);
317
        int serverCode = rs.getInt(3);
318
        if(serverCode == 1)
319
        { //check that this document is from this server.
320
          //servers only replicate their own documents!
321
          returnXML.append("<updatedDocument><docid>").append(docid);
322
          returnXML.append("</docid><date_updated>").append(dateUpdated);
323
          returnXML.append("</date_updated></updatedDocument>");
324
        }
325
        tablehasrows = rs.next();
326
      }
327
      
328
      pstmt = conn.prepareStatement(delsql.toString());
329
      pstmt.execute();
330
      rs = pstmt.getResultSet();
331
      tablehasrows = rs.next();
332
      while(tablehasrows)
333
      { //handle the deleted documents
334
        String docid = rs.getString(1);
335
        String dateUpdated = rs.getString(2);
336
        int serverCode = rs.getInt(3);
337
        if(serverCode == 1)
338
        {
339
          returnXML.append("<deletedDocument><docid>").append(docid);
340
          returnXML.append("</docid><date_updated>").append(dateUpdated);
341
          returnXML.append("</date_updated></deletedDocument>");
342
        }
343
        tablehasrows = rs.next();
344
      }
345
      
346
      returnXML.append("</updates></replication>");
347
      conn.close();
348
      //System.out.println(returnXML.toString());
349
      out.print(returnXML.toString());
350
    }
351
    catch(Exception e)
352
    {
353
      System.out.println("Exception in metacatReplication: " + e.getMessage());
354
    }
355
  }
356
  
357
  /**
358
   * Sends the current system date to the remote server.  Using this action
359
   * for replication gets rid of any problems with syncronizing clocks 
360
   * because a time specific to a document is always kept on its home server.
361
   */
362
  private void handleGetTimeRequest(PrintWriter out, Hashtable params, 
363
                                    HttpServletResponse response)
364
  {
365
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
366
    java.util.Date localtime = new java.util.Date();
367
    String dateString = formatter.format(localtime);
368
    response.setContentType("text/xml");
369
    
370
    out.println("<timestamp>" + dateString + "</timestamp>");
371
  }
372
  
373
  /**
374
   * This method is what is run when a seperate thread is broken off to handle
375
   * inserting a document from a remote replicated server.
376
   */
377
  public void run()
378
  {
379
    try
380
    {
381
      System.out.println("thread started for docid: " + 
382
                         (String)fileLocks.elementAt(0));
383
      Thread.sleep(30000); //the lock will expire in 30 seconds
384
      System.out.println("thread for docid: " + 
385
                         (String)fileLocks.elementAt(fileLocks.size() - 1) + 
386
                         " exiting.");
387
      fileLocks.remove(fileLocks.size() - 1);
388
      //fileLocks is treated as a FIFO queue.  If there are more than one lock
389
      //in the vector, the first one inserted will be removed.
390
    }
391
    catch(Exception e)
392
    {
393
      System.out.println("error in file lock thread: " + e.getMessage());
394
    }
395
  }
396
  
397
  /**
398
   * Returns the name of a server given a serverCode
399
   * @param serverCode the serverid of the server
400
   * @return the servername or null if the specified serverCode does not
401
   *         exist.
402
   */
403
  public static String getServer(int serverCode)
404
  {
405
    try
406
    {
407
      Connection conn = util.openDBConnection();
408
      PreparedStatement pstmt = conn.prepareStatement("select server from " +
409
                              "xml_replication where serverid = " + 
410
                              serverCode);
411
      pstmt.execute();
412
      ResultSet rs = pstmt.getResultSet();
413
      boolean tablehasrows = rs.next();
414
      if(tablehasrows)
415
      {
416
        conn.close();
417
        return rs.getString(1);
418
      }
419
      conn.close();
420
    }
421
    catch(Exception e)
422
    {
423
      System.out.println("Error in MetacatReplication.getServer: " + 
424
                          e.getMessage());
425
    }
426
    return null;
427
      //return null if the server does not exist
428
  }
429
}
(27-27/36)