Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that implements replication for metacat
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: jones $'
10
 *     '$Date: 2000-11-27 13:55:08 -0800 (Mon, 27 Nov 2000) $'
11
 * '$Revision: 566 $'
12
 */
13

    
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.util.*;
17
import java.io.*;
18
import java.sql.*;
19
import java.net.*;
20
import java.lang.*;
21
import java.text.*;
22
import javax.servlet.*;
23
import javax.servlet.http.*;
24
import oracle.xml.parser.v2.*;
25
import org.xml.sax.*;
26

    
27
public class MetacatReplication extends HttpServlet implements Runnable
28
{  
29
  private String deltaT;
30
  Timer replicationDaemon;
31
  private static MetaCatUtil util = new MetaCatUtil();
32
  private Vector fileLocks = new Vector();
33
  private Thread lockThread = null;
34
  
35
  /**
36
   * Initialize the servlet by creating appropriate database connections
37
   */
38
  public void init(ServletConfig config) throws ServletException 
39
  {
40
    //initialize db connections to handle any update requests
41
    MetaCatUtil util = new MetaCatUtil();
42
    deltaT = util.getOption("deltaT");
43
    //break off a thread to do the delta-T check
44
    replicationDaemon = new Timer(true);
45
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
46
    //                                      new Integer(deltaT).intValue() * 1000);
47
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
48
    //                   + " seconds");
49
    
50
  }
51
  
52
  public void destroy() 
53
  {
54
    replicationDaemon.cancel();
55
    System.out.println("Replication daemon cancelled.");
56
  }
57
  
58
  public void doGet (HttpServletRequest request, HttpServletResponse response)
59
                     throws ServletException, IOException 
60
  {
61
    // Process the data and send back the response
62
    handleGetOrPost(request, response);
63
  }
64

    
65
  public void doPost(HttpServletRequest request, HttpServletResponse response)
66
                     throws ServletException, IOException 
67
  {
68
    // Process the data and send back the response
69
    handleGetOrPost(request, response);
70
  }
71
  
72
  private void handleGetOrPost(HttpServletRequest request, 
73
                               HttpServletResponse response) 
74
                               throws ServletException, IOException 
75
  {
76
    PrintWriter out = response.getWriter();
77
    
78
    Hashtable params = new Hashtable();
79
    Enumeration paramlist = request.getParameterNames();
80
    
81
    while (paramlist.hasMoreElements()) 
82
    {
83
      String name = (String)paramlist.nextElement();
84
      String[] value = request.getParameterValues(name);
85
      params.put(name, value);  
86
    }
87
    
88
    if(params.containsKey("action"))
89
    {
90
      if(((String[])params.get("action"))[0].equals("stop"))
91
      { //stop the replication server
92
        replicationDaemon.cancel();
93
        out.println("Replication Handler Stopped");
94
        System.out.println("Replication Handler Stopped");
95
      }
96
      else if(((String[])params.get("action"))[0].equals("start"))
97
      { //start the replication server
98
        int rate;
99
        if(params.containsKey("rate"))
100
        {
101
          rate = new Integer(
102
                 new String(((String[])params.get("rate"))[0])).intValue();
103
          if(rate < 30)
104
          {
105
            out.println("Replication deltaT rate cannot be less than 30!");
106
            rate = 1000;
107
          }
108
        }
109
        else
110
        {
111
          rate = 1000;
112
        }
113
        
114
        out.println("New rate is: " + rate + " seconds.");
115
        replicationDaemon.cancel();
116
        replicationDaemon = new Timer(true);
117
        replicationDaemon.scheduleAtFixedRate(new ReplicationHandler(out), 0, 
118
                                              rate * 1000);
119
        out.println("Replication Handler Started");
120
      }
121
      else if(((String[])params.get("action"))[0].equals("update"))
122
      { //request an update list from the server
123
        handleUpdateRequest(out, params, response);
124
      }
125
      else if(((String[])params.get("action"))[0].equals("read"))
126
      { //request a specific document from the server
127
        //note that this could be replaced by a call to metacatServlet
128
        //handleGetDocumentAction().
129
        handleGetDocumentRequest(out, params, response);
130
      }
131
      else if(((String[])params.get("action"))[0].equals("getlock"))
132
      {
133
        handleGetLockRequest(out, params, response);
134
      }
135
      else if(((String[])params.get("action"))[0].equals("getdocumentinfo"))
136
      {
137
        handleGetDocumentInfoRequest(out, params, response);
138
      }
139
    }
140
  }
141
  
142
  /**
143
   * Grants or denies a lock to a requesting host.
144
   * The servlet parameters of interrest are:
145
   * docid: the docid of the file the lock is being requested for
146
   * currentdate: the timestamp of the document on the remote server
147
   * 
148
   */
149
  private void handleGetLockRequest(PrintWriter out, Hashtable params,
150
                                    HttpServletResponse response)
151
  {
152
    java.util.Date remoteDate = new java.util.Date();
153
    java.util.Date localDate = new java.util.Date();
154
    try
155
    {
156
      Connection conn = util.openDBConnection();
157
      String docid = ((String[])params.get("docid"))[0];
158
      String remoteDateStr = ((String[])params.get("updatedate"))[0];
159
      DocumentImpl requestDoc = new DocumentImpl(conn, docid);
160
    
161
      String localDateStr = requestDoc.getUpdateDate();
162
      SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
163
      ParsePosition pos = new ParsePosition(0);
164
      remoteDate = formatter.parse(remoteDateStr, pos);
165
      localDate = formatter.parse(localDateStr, pos);
166
      if(remoteDate.compareTo(localDate) >= 0)
167
      {
168
        if(!fileLocks.contains(docid))
169
        { //grant the lock
170
          fileLocks.add(0, docid); //insert at the beginning of the queue Vector
171
          //send a message back to the the remote host authorizing the insert
172
          out.println("<lockgranted><docid>" +docid+ "</docid></lockgranted>");
173
          lockThread = new Thread(this);
174
          lockThread.setPriority(Thread.MIN_PRIORITY);
175
          lockThread.start();
176
        }
177
        else
178
        { //deny the lock
179
          out.println("<filelocked><docid>" + docid + "</docid></filelocked>");
180
        }
181
      }
182
      else
183
      {//deny the lock.
184
        out.println("<outdatedfile><docid>" + docid + "</docid></filelocked>");
185
      }
186
      conn.close();
187
    }
188
    catch(Exception e)
189
    {
190
      System.out.println("error requesting file lock: " + e.getMessage());
191
    }
192
  }
193
  
194
  /**
195
   * Sends all of the xml_documents information encoded in xml to a requestor
196
   */
197
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
198
                                        HttpServletResponse response)
199
  {
200
    String docid = ((String[])(params.get("docid")))[0];
201
    StringBuffer sb = new StringBuffer();
202
    try
203
    {
204
      Connection conn = util.openDBConnection();
205
      DocumentImpl doc = new DocumentImpl(conn, docid);
206
      sb.append("<documentinfo><docid>").append(docid);
207
      sb.append("</docid><docname>").append(doc.getDocname());
208
      sb.append("</docname><doctype>").append(doc.getDoctype());
209
      sb.append("</doctype><doctitle>").append(doc.getDocTitle());
210
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
211
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
212
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
213
      sb.append("</public_access></documentinfo>");
214
      response.setContentType("text/xml");
215
      out.println(sb.toString());
216
      conn.close();
217
    }
218
    catch (Exception e)
219
    {
220
      System.out.println("error in metacatReplication.handlegetdocumentinforequest: " + 
221
      e.getMessage());
222
    }
223
    
224
  }
225
  
226
  /**
227
   * Sends a document to a remote host
228
   */
229
  private void handleGetDocumentRequest(PrintWriter out, Hashtable params, 
230
                                        HttpServletResponse response)
231
  {
232
    try
233
    {
234
      String docid = ((String[])(params.get("docid")))[0];
235
      System.out.println("incoming get request for document: " +
236
                         docid);
237
      Connection conn = util.openDBConnection();
238
      DocumentImpl di = new DocumentImpl(conn, docid);
239
      response.setContentType("text/xml");
240
      out.print(di.toString());
241
      conn.close();
242
    }
243
    catch(Exception e)
244
    {
245
      System.out.println("error getting document: " + e.getMessage());
246
    }
247
    
248
  }
249
  
250
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
251
                                   HttpServletResponse response)
252
  {
253
    System.out.println("incoming update request for dt/time " + 
254
                       ((String[])params.get("date"))[0] +
255
                       " from external metacat");
256
    response.setContentType("text/xml");
257
    StringBuffer returnXML = new StringBuffer();
258
    returnXML.append("<?xml version=\"1.0\"?><replication>");
259
    
260
    StringBuffer sql = new StringBuffer();
261
    String updateStr = ((String[])params.get("date"))[0];
262
    updateStr = updateStr.replace('+', ' ');
263
    //pseudo algorithm:
264
    ///////////////////////////////////////////////////////////////////////
265
    //get the date/time from the requestor, query the db for any documents
266
    //that have an update date later than the requested date and send
267
    //those docids back to the requestor.  If there are no newer documents
268
    //then send back a null message.
269
    ///////////////////////////////////////////////////////////////////////
270
    
271
    //Timestamp update = Timestamp.valueOf(updateStr);
272
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
273
    java.util.Date update = new java.util.Date();
274
    ParsePosition pos = new ParsePosition(0);
275
    update = formatter.parse(updateStr, pos);
276
    String dateString = formatter.format(update);
277
    sql.append("select docid, date_updated, server_location from ");
278
    sql.append("xml_documents where date_updated > ");
279
    sql.append("to_date('").append(dateString);
280
    sql.append("','YY-MM-DD HH24:MI:SS')");
281
    //System.out.println("sql: " + sql.toString());
282
    
283
    //get any recently deleted documents
284
    StringBuffer delsql = new StringBuffer();
285
    delsql.append("select docid, date_updated, server_location from ");
286
    delsql.append("xml_revisions where docid not in (select docid from ");
287
    delsql.append("xml_documents) and date_updated > to_date('");
288
    delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
289

    
290
    try
291
    {
292
      Connection conn = util.openDBConnection();
293
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
294
      pstmt.execute();
295
      ResultSet rs = pstmt.getResultSet();
296
      boolean tablehasrows = rs.next();
297
      
298
      //if a '1' should not represent localhost, add code here to query
299
      //xml_replication for the proper serverid number
300
      
301
      returnXML.append("<server>").append(util.getOption("server"));
302
      returnXML.append(util.getOption("replicationpath"));
303
      returnXML.append("</server><updates>");
304
      while(tablehasrows)
305
      {
306
        String docid = rs.getString(1);
307
        String dateUpdated = rs.getString(2);
308
        int serverCode = rs.getInt(3);
309
        if(serverCode == 1)
310
        { //check that this document is from this server.
311
          //servers only replicate their own documents!
312
          returnXML.append("<updatedDocument><docid>").append(docid);
313
          returnXML.append("</docid><date_updated>").append(dateUpdated);
314
          returnXML.append("</date_updated></updatedDocument>");
315
        }
316
        tablehasrows = rs.next();
317
      }
318
      
319
      pstmt = conn.prepareStatement(delsql.toString());
320
      pstmt.execute();
321
      rs = pstmt.getResultSet();
322
      tablehasrows = rs.next();
323
      while(tablehasrows)
324
      { //handle the deleted documents
325
        String docid = rs.getString(1);
326
        String dateUpdated = rs.getString(2);
327
        int serverCode = rs.getInt(3);
328
        if(serverCode == 1)
329
        {
330
          returnXML.append("<deletedDocument><docid>").append(docid);
331
          returnXML.append("</docid><date_updated>").append(dateUpdated);
332
          returnXML.append("</date_updated></deletedDocument>");
333
        }
334
        tablehasrows = rs.next();
335
      }
336
      
337
      returnXML.append("</updates></replication>");
338
      conn.close();
339
      //System.out.println(returnXML.toString());
340
      out.print(returnXML.toString());
341
    }
342
    catch(Exception e)
343
    {
344
      System.out.println("Exception in metacatReplication: " + e.getMessage());
345
    }
346
  }
347
  
348
  /**
349
   * This method is what is run when a seperate thread is broken off to handle
350
   * inserting a document from a remote replicated server.
351
   */
352
  public void run()
353
  {
354
    try
355
    {
356
      Thread.sleep(30000); //the lock will expire in 30 seconds
357
      fileLocks.remove(fileLocks.size() - 1);
358
      //the vector is treated as a FIFO queue.  If there are more than one lock
359
      //in the vector, the first one inserted will be removed.
360
    }
361
    catch(Exception e)
362
    {
363
      System.out.println("error in file lock thread: " + e.getMessage());
364
    }
365
  }
366
  
367
  /**
368
   * Returns the name of a server given a serverCode
369
   * @param serverCode the serverid of the server
370
   * @return the servername or null if the specified serverCode does not
371
   *         exist.
372
   */
373
  public static String getServer(int serverCode)
374
  {
375
    try
376
    {
377
      Connection conn = util.openDBConnection();
378
      PreparedStatement pstmt = conn.prepareStatement("select server from " +
379
                              "xml_replication where serverid = " + 
380
                              serverCode);
381
      pstmt.execute();
382
      ResultSet rs = pstmt.getResultSet();
383
      boolean tablehasrows = rs.next();
384
      if(tablehasrows)
385
      {
386
        conn.close();
387
        return rs.getString(1);
388
      }
389
      conn.close();
390
    }
391
    catch(Exception e)
392
    {
393
      System.out.println("Error in MetacatReplication.getServer: " + 
394
                          e.getMessage());
395
    }
396
    return null;
397
      //return null if the server does not exist
398
  }
399
}
(27-27/36)