Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class to asyncronously do delta-T replication checking
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Chad Berkley
7
 *    Release: @release@
8
 *
9
 *   '$Author: berkley $'
10
 *     '$Date: 2000-11-09 13:41:35 -0800 (Thu, 09 Nov 2000) $'
11
 * '$Revision: 522 $'
12
 */
13
 
14
package edu.ucsb.nceas.metacat;
15

    
16
import java.sql.*;
17
import java.util.*;
18
import java.lang.Thread; 
19
import java.io.*;
20
import java.net.*;
21
import org.xml.sax.AttributeList;
22
import org.xml.sax.ContentHandler;
23
import org.xml.sax.DTDHandler;
24
import org.xml.sax.EntityResolver;
25
import org.xml.sax.ErrorHandler;
26
import org.xml.sax.InputSource;
27
import org.xml.sax.XMLReader;
28
import org.xml.sax.SAXException;
29
import org.xml.sax.SAXParseException;
30
import org.xml.sax.helpers.XMLReaderFactory;
31

    
32
/**
33
 * This class handles deltaT replication checking.  Whenever this TimerTask
34
 * is fired it checks each server in xml_replication for updates and updates
35
 * the local db as needed.
36
 */
37
public class ReplicationHandler extends TimerTask
38
{
39
  MetaCatUtil util = new MetaCatUtil();
40
  Hashtable serverList = new Hashtable(); 
41
  Connection conn;
42
  PrintWriter out;
43
  
44
  public ReplicationHandler(PrintWriter o)
45
  {
46
    this.out = o;
47
  }
48
  
49
  /**
50
   * Method that implements TimerTask.run().  It runs whenever the timer is 
51
   * fired.
52
   */
53
  public void run()
54
  {
55
    System.out.println("replicationHandler is running");
56
    //find out the last_checked time of each server in the server list and
57
    //send a query to each server to see if there are any documents in 
58
    //xml_documents with an update_date > last_checked
59
    try
60
    {
61
      conn = util.openDBConnection();
62
      serverList = buildServerList(conn);
63
      System.out.println("Server list: " + serverList.toString());
64
      update(serverList, conn);
65
    }
66
    catch (Exception e)
67
    {
68
      System.out.println("Error in replicationHandler.run(): " + e.getMessage());
69
    }
70
  }
71
  
72
  /**
73
   * Method to check each server in the servetList for updates
74
   */
75
  private void update(Hashtable serverList, Connection conn)
76
  {
77
    /*
78
     Pseudo-algorithm:
79
     -build a list of servers that need to be checked
80
     -check each server to see if any documents were modified after the 
81
      server's update_date
82
     -if there are documents that need to be updated pull them here
83
      and update the one currently in the database or add a new one
84
      if it is not already here.
85
     -update each servers' update_date to the current time
86
    */
87
    PreparedStatement pstmt;
88
    Enumeration keys;
89
    int istreamInt;
90
    char istreamChar;
91
    StringBuffer serverResponse = new StringBuffer();
92
    String server;
93
    String update;
94
    Vector responses = new Vector();
95
    ReplMessageHandler message = new ReplMessageHandler();
96
    
97
    try
98
    {
99
      //build a list of servers with updated documents.  Choose the newest
100
      //one out of the list, update this server, update last_checked
101
      
102
      keys = serverList.keys();
103
      while(keys.hasMoreElements())
104
      { //update from one server at a time
105
        server = (String)(keys.nextElement());
106
        update = (String)(serverList.get(server));
107
        //send the server a date and it will send back any docid that has 
108
        //been modified after that date
109
        
110
        update = update.replace(' ', '+');
111
        
112
        URL u = new URL("http://" + server + "?update=" + update);
113
        InputStreamReader istream = new InputStreamReader(u.openStream());
114
        while((istreamInt = istream.read()) != -1)
115
        {
116
          istreamChar = (char)istreamInt;
117
          serverResponse.append(istreamChar);
118
        }
119
        responses.add(serverResponse.toString()); //list of updates
120
      }
121
      //System.out.println("responses: " + responses.toString());
122

    
123
      //initialize the parser
124
      XMLReader parser = initParser(message);
125
      for(int i=0; i<responses.size(); i++)
126
      { //parse the xml and get the result
127
        parser.parse(new InputSource(
128
                     new StringReader(
129
                     (String)(responses.elementAt(i)))));
130
        Vector v = new Vector(message.getResultVect());
131
        for(int j=0; j<v.size(); j++)
132
        {
133
          Vector w = new Vector((Vector)(v.elementAt(j)));
134
          System.out.print("param " + j + ": " + w.toString());
135
          //so now we have a list of the documents that need to be updated, so 
136
          //now we need to request them. from the server and update them here  
137
        }
138
        System.out.println("");
139
      }
140
      
141
      
142
    }
143
    catch(Exception e)
144
    {
145
      System.out.println("Error in replicationHandler.update(): " + 
146
                         e.getMessage());
147
    }
148
  }
149
  
150
  /**
151
   * Method to initialize the message parser
152
   */
153
  private static XMLReader initParser(ReplMessageHandler rmh)
154
          throws Exception
155
  {
156
    XMLReader parser = null;
157

    
158
    try {
159
      ContentHandler chandler = rmh;
160

    
161
      // Get an instance of the parser
162
      MetaCatUtil util = new MetaCatUtil();
163
      String parserName = util.getOption("saxparser");
164
      parser = XMLReaderFactory.createXMLReader(parserName);
165

    
166
      // Turn off validation
167
      parser.setFeature("http://xml.org/sax/features/validation", false);
168
      
169
      parser.setContentHandler((ContentHandler)chandler);
170
      parser.setErrorHandler((ErrorHandler)chandler);
171

    
172
    } catch (Exception e) {
173
      throw e;
174
    }
175

    
176
    return parser;
177
  }
178
  
179
  /**
180
   * Method to query xml_replication and build a hashtable of each server
181
   * and it's last update time.
182
   * @param conn a connection to the database
183
   */
184
  private Hashtable buildServerList(Connection conn)
185
  {
186
    Hashtable sl = new Hashtable();
187
    PreparedStatement pstmt;   
188
    try
189
    {
190
      pstmt = conn.prepareStatement("select server, last_checked from " +
191
                                    "xml_replication");
192
      pstmt.execute();
193
      ResultSet rs = pstmt.getResultSet();
194
      boolean tableHasRows = rs.next();
195
      while(tableHasRows)
196
      {
197
        sl.put(rs.getString(1), rs.getString(2));
198
        tableHasRows = rs.next();
199
      }
200
    }
201
    catch(Exception e)
202
    {
203
      System.out.println("error in replicationHandler.buildServerList(): " +
204
                         e.getMessage());
205
    }
206
    return sl;
207
  }
208
}
(33-33/37)