Project

General

Profile

« Previous | Next » 

Revision 577

Added by berkley about 24 years ago

added replication_on_insert handling. Changed replication from date_updated base replication to revision number replication.

View differences:

src/edu/ucsb/nceas/metacat/MetacatReplication.java
125 125
      else if(((String[])params.get("action"))[0].equals("update"))
126 126
      { //request an update list from the server
127 127
        if(params.contains("servercheckcode"))
128
        {
128
        { //the servercheckcode allows this server to check for updated 
129
          //file from other servers as well as just updated files from 
130
          //this server.
129 131
          System.out.println("metacatreplication: servercheckcode: " + 
130 132
                               ((int[])params.get("servercheckcode"))[0]);
131 133
          handleUpdateRequest(out, params, response, 
......
133 135
        }
134 136
        else
135 137
        {
136
          handleUpdateRequest(out, params, response);
138
          handleUpdateRequest2(out, params, response);
137 139
        }
138 140
      }
139 141
      else if(((String[])params.get("action"))[0].equals("read"))
......
163 165
    System.out.println("in handleforcereplicaterequest");
164 166
    String server = ((String[])params.get("server"))[0];
165 167
    String docid = ((String[])params.get("docid"))[0];
168
    String dbaction = "UPDATE";
169
    boolean override = false;
170
    int serverCode = 1;
171
    
166 172
    try
167 173
    {
174
      if(params.containsKey("dbaction"))
175
      {
176
        dbaction = ((String[])params.get("dbaction"))[0];
177
        serverCode = MetacatReplication.getServerCode(server);
178
        override = true;
179
      }
180
      System.out.println("action in forcereplicate is: " + dbaction);
181
      System.out.println("serverCode in forcereplicate is: " + serverCode);
182
      
168 183
      int serverCheckCode = MetacatReplication.getServerCode(server);
169 184
      URL u = new URL("http://" + server + "?action=read&docid=" + docid);
170 185
      System.out.println("sending message: " + u.toString());
......
181 196
      String user = (String)docinfoHash.get("user_owner");
182 197
      String group = new String(user);
183 198
      Connection conn = util.openDBConnection();
184
      DocumentImpl.write(conn, new StringReader(xmldoc), "UPDATE", docid, user,
185
                         group, 1);
199
      DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid, 
200
                         user, group, serverCode, override);
186 201
      conn.close();
187 202
    }
188 203
    catch(Exception e)
......
426 441
  }
427 442
  
428 443
  /**
444
   * Sends an update list based on rev numbers instead of dates.
445
   */
446
  private void handleUpdateRequest2(PrintWriter out, Hashtable params, 
447
                                    HttpServletResponse response)
448
  {
449
    System.out.println("in handleUpdateRequest2");
450
    try
451
    {
452
      System.out.println("received update request");
453
      StringBuffer docsql = new StringBuffer();
454
      StringBuffer doclist = new StringBuffer();
455
      
456
      //get all docs that reside on this server
457
      doclist.append("<?xml version=\"1.0\"?><replication>");
458
      doclist.append("<server>").append(util.getOption("server"));
459
      doclist.append(util.getOption("replicationpath"));
460
      doclist.append("</server><updates>");
461
      
462
      docsql.append("select docid, rev from xml_documents where "); 
463
      docsql.append("server_location = 1");
464
      
465
      //get any deleted documents
466
      StringBuffer delsql = new StringBuffer();
467
      delsql.append("select distinct docid from ");
468
      delsql.append("xml_revisions where docid not in (select docid from ");
469
      delsql.append("xml_documents) and server_location = 1");
470
      
471
      Connection conn = util.openDBConnection();
472
      PreparedStatement pstmt = conn.prepareStatement(docsql.toString());
473
      pstmt.execute();
474
      ResultSet rs = pstmt.getResultSet();
475
      boolean tablehasrows = rs.next();
476
      while(tablehasrows)
477
      {
478
        doclist.append("<updatedDocument>");
479
        doclist.append("<docid>").append(rs.getString(1));
480
        doclist.append("</docid><rev>").append(rs.getInt(2));
481
        doclist.append("</rev>");
482
        doclist.append("</updatedDocument>");
483
        tablehasrows = rs.next();
484
      }
485
      
486
      pstmt = conn.prepareStatement(delsql.toString());
487
      pstmt.execute();
488
      rs = pstmt.getResultSet();
489
      tablehasrows = rs.next();
490
      while(tablehasrows)
491
      { //handle the deleted documents
492
        doclist.append("<deletedDocument><docid>").append(rs.getString(1));
493
        doclist.append("</docid><rev></rev></deletedDocument>");
494
        tablehasrows = rs.next();
495
      }
496
      
497
      doclist.append("</updates></replication>");
498
      //System.out.println("doclist: " + doclist.toString());
499
      conn.close();
500
      response.setContentType("text/xml");
501
      out.println(doclist.toString());
502
    }
503
    catch(Exception e)
504
    {
505
      System.out.println("error in handleupdaterequest2: " + e.getMessage());
506
      e.printStackTrace(System.out);
507
    }
508
    
509
  }
510
  
511
  /**
429 512
   * Sends the current system date to the remote server.  Using this action
430 513
   * for replication gets rid of any problems with syncronizing clocks 
431 514
   * because a time specific to a document is always kept on its home server.
src/edu/ucsb/nceas/metacat/DocumentImpl.java
26 26
import java.util.Iterator;
27 27
import java.util.Stack;
28 28
import java.util.TreeSet;
29
import java.util.Enumeration;
29 30

  
30 31
import org.xml.sax.AttributeList;
31 32
import org.xml.sax.ContentHandler;
......
728 729
                              String group, int serverCode )
729 730
                              throws Exception
730 731
  {
731
    return write(conn, xml, null,
732
                              action, docid, user,
733
                              group, serverCode); 
732
    return write(conn, xml, null, action, docid, user, group, serverCode); 
734 733
  }
735 734
  
735
  public static String write( Connection conn, Reader xml, Reader acl,
736
                              String action, String docid, String user,
737
                              String group, int serverCode) throws Exception
738
  {
739
    return write(conn, xml, acl, action, docid, user, group, serverCode, false);
740
  }
741
  
736 742
  /**
737 743
   * Write an XML file to the database, given a Reader
738 744
   *
......
744 750

  
745 751
  public static String write( Connection conn, Reader xml, Reader acl,
746 752
                              String action, String docid, String user,
747
                              String group, int serverCode )
748
                throws Exception 
753
                              String group, int serverCode, boolean override)
754
                              throws Exception 
749 755
  {
750 756
    System.out.println("in write");
751 757
    MetaCatUtil util = new MetaCatUtil();
......
754 760
    String newdocid = ac.generate(docid, action);
755 761
    
756 762
    System.out.println("action: " + action + " servercode: " + 
757
                        serverCode);
758
    if(serverCode != 1 && action.equals("UPDATE"))
763
                        serverCode + " override: " + override);
764
    if((serverCode != 1 && action.equals("UPDATE")) && !override)
759 765
    { //if this document being written is not a resident of this server then
760 766
      //we need to try to get a lock from it's resident server.  If the
761 767
      //resident server will not give a lock then we send the user a message
......
841 847
                            "merge your changes and try again.");
842 848
      }
843 849
    }
844
    System.out.println("======THIS GOT TOO FAR=======");
845 850
    
846 851
    if ( action.equals("UPDATE") ) {
847 852
      // check for 'write' permission for 'user' to update this document
......
859 864
      conn.commit();
860 865
      if ( acl != null ) 
861 866
      {
862
        if ( action.equals("UPDATE") ) 
867
        if ( action.equals("UPDATE") )  
863 868
        {
864 869
          Statement stmt = conn.createStatement();
865 870
          stmt.execute("DELETE FROM xml_access WHERE docid='"+newdocid +"'");
......
877 882
      conn.setAutoCommit(true);
878 883
      throw e;
879 884
    }
885
    
886
    //force replicate out the new document to each server in our server list.
887
    if(serverCode == 1)
888
    {
889
      Enumeration keys = (ReplicationHandler.buildServerList(conn)).keys();
890
      while(keys.hasMoreElements())
891
      {
892
        String server = (String)(keys.nextElement());
893
        URL comeAndGetIt = new URL("http://" + server + 
894
                                   "?action=forcereplicate&server=" + 
895
                                   util.getOption("server") + 
896
                                   util.getOption("replicationpath") +
897
                                   "&docid=" + newdocid + "&dbaction=" +
898
                                   action);
899
        System.out.println("sending message: " + comeAndGetIt.toString());
900
        String message = MetacatReplication.getURLContent(comeAndGetIt);
901
      }
902
    }
880 903
      
881 904
    if ( (docid != null) && !(newdocid.equals(docid)) ) 
882 905
    {
src/edu/ucsb/nceas/metacat/ReplMessageHandler.java
96 96
      indivDelete.add(new String(ch, start, length));
97 97
    }
98 98
    
99
    if(currentTag.equals("rev") && update)
100
    {
101
      indivUpdate.add(new String(ch, start, length));
102
      indivUpdate.add(server);
103
    }
104
    else if(currentTag.equals("rev") && delete)
105
    {
106
      indivDelete.add(new String(ch, start, length));
107
      indivDelete.add(server);
108
    }
109
    
99 110
    if(currentTag.equals("date_updated") && update)
100 111
    {
101 112
      indivUpdate.add(new String(ch, start, length));
......
122 133
  {
123 134
    return deletes;
124 135
  }
125
  
126 136
}
src/edu/ucsb/nceas/metacat/ReplicationHandler.java
70 70
    {
71 71
      conn = util.openDBConnection();
72 72
      serverList = buildServerList(conn);
73
      update(serverList, conn);
73
      update2(serverList, conn);
74 74
      conn.close();
75 75
    }
76 76
    catch (Exception e)
......
80 80
  }
81 81
  
82 82
  /**
83
   * Method that uses revision taging for replication instead of update_date.
84
   */
85
  private void update2(Hashtable serverList, Connection conn)
86
  {
87
    /*
88
     Pseudo-algorithm
89
     - request a doc list from each server in xml_replication
90
     - check the rev number of each of those documents agains the 
91
       documents in the local database
92
     - pull any documents that have a lesser rev number on the local server
93
       from the remote server
94
     - delete any documents that still exist in the local xml_documents but
95
       are in the deletedDocuments tag of the remote host response.
96
     - update last_checked to keep track of the last time it was checked.
97
       (this info is theoretically not needed using this system but probably 
98
       should be kept anyway)
99
    */
100
    Enumeration keys;
101
    String server;
102
    String update;
103
    ReplMessageHandler message = new ReplMessageHandler();
104
    Vector responses = new Vector();
105
    PreparedStatement pstmt;
106
    ResultSet rs;
107
    boolean tablehasrows;
108
    boolean flag=false;
109
    String action = new String();
110
    XMLReader parser;
111
    URL u;
112
    
113
    //System.out.println("in update2");
114
    
115
    try
116
    {
117
      //System.out.println("init parser");
118
      parser = initParser(message);
119
      keys = serverList.keys();
120
      while(keys.hasMoreElements())
121
      {
122
        //System.out.println("get responses");
123
        server = (String)(keys.nextElement());
124
        u = new URL("http://" + server + "?action=update");
125
        String result = MetacatReplication.getURLContent(u);
126
        //System.out.println(result);
127
        responses.add(result);
128
      }
129
      
130
      //System.out.println("responses: " + responses.toString());
131
      
132
      for(int i=0; i<responses.size(); i++)
133
      { //check each server for updated files
134
        //System.out.println("parsing responses");
135
        parser.parse(new InputSource(
136
                     new StringReader(
137
                     (String)(responses.elementAt(i)))));
138
        Vector v = new Vector(message.getUpdatesVect());
139
        //System.out.println("v: " + v.toString());
140
        Vector d = new Vector(message.getDeletesVect());
141
        //check the revs in u to see if there are any newer ones than
142
        //in the local DB.
143
        for(int j=0; j<v.size(); j++)
144
        {
145
          Vector w = new Vector((Vector)(v.elementAt(j)));
146
          //System.out.println("w: " + w.toString());
147
          String docid = (String)w.elementAt(0);
148
          //System.out.println("docid: " + docid);
149
          int rev = Integer.parseInt((String)w.elementAt(1));
150
          //System.out.println("rev: " + rev);
151
          String docServer = (String)w.elementAt(2);
152
          //System.out.println("docServer: " + docServer);
153
    
154
          pstmt = conn.prepareStatement("select rev from xml_documents where "+
155
                                        "docid like '" + docid + "'");
156
          pstmt.execute();
157
          rs = pstmt.getResultSet();
158
          tablehasrows = rs.next();
159
          if(tablehasrows)
160
          { //check the revs for an update because this document is in the
161
            //local DB, it just might be out of date.
162
            int localrev = rs.getInt(1);
163
            if(localrev == rev)
164
            {
165
              flag = false;
166
            }
167
            else if(localrev < rev)
168
            {//this document needs to be updated so send an read request
169
              action = "UPDATE";
170
              flag = true;
171
            }
172
          }
173
          else
174
          { //insert this document as new because it is not in the local DB
175
            action = "INSERT";
176
            flag = true;
177
          }
178
          
179
          if(flag)
180
          { //if the document needs to be updated or inserted, this is executed
181
            u = new URL("http://" + docServer + "?action=read&docid=" +
182
                          docid);
183
            String newxmldoc = MetacatReplication.getURLContent(u);
184
            DocInfoHandler dih = new DocInfoHandler();
185
            XMLReader docinfoParser = initParser(dih);
186
            URL docinfoUrl = new URL("http://" + docServer + 
187
                                   "?action=getdocumentinfo&docid=" +
188
                                   docid);
189
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
190
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
191
            Hashtable docinfoHash = dih.getDocInfo();
192
            int serverCode = MetacatReplication.getServerCode(docServer);
193
            System.out.println("updating doc: " + docid + " action: "+ action);
194
            String newDocid = DocumentImpl.write(conn, 
195
                              new StringReader(newxmldoc),
196
                              null,
197
                              action, 
198
                              docid, 
199
                              (String)docinfoHash.get("user_owner"),
200
                              (String)docinfoHash.get("user_owner"), 
201
                              serverCode, 
202
                              true);
203
          }
204
        }
205
        
206
        for(int k=0; k<d.size(); k++)
207
        { //delete the deleted documents;
208
          Vector w = new Vector((Vector)d.elementAt(k));
209
          String docid = (String)w.elementAt(0);
210
          
211
          DocumentImpl.delete(conn, docid, null, null);
212
          System.out.println("Document " + docid + " deleted.");
213
        }
214
        
215
        keys = serverList.keys();
216
        while(keys.hasMoreElements())
217
        {
218
          server = (String)(keys.nextElement()); 
219
          URL dateurl = new URL("http://" + server + "?action=gettime");
220
          String datexml = MetacatReplication.getURLContent(dateurl);
221
          String datestr = datexml.substring(11, datexml.indexOf('<', 11));
222
          StringBuffer sql = new StringBuffer();
223
          sql.append("update xml_replication set last_checked = to_date('");
224
          sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
225
          sql.append("server like '").append(server).append("'");
226
          //System.out.println("sql: " + sql.toString());
227
          pstmt = conn.prepareStatement(sql.toString());
228
          pstmt.executeUpdate();
229
          //conn.commit();
230
          System.out.println("last_checked updated to " + datestr + " on " +
231
                            server);
232
        }
233
      }
234
    }
235
    catch(Exception e)
236
    {
237
      System.out.println("error in update2: " + e.getMessage());
238
      e.printStackTrace(System.out);
239
    }
240
  }
241
  
242
  /**
83 243
   * Method to check each server in the servetList for updates
84 244
   */
85 245
  private void update(Hashtable serverList, Connection conn)
......
265 425
   * and it's last update time.
266 426
   * @param conn a connection to the database
267 427
   */
268
  private Hashtable buildServerList(Connection conn)
428
  public static Hashtable buildServerList(Connection conn)
269 429
  {
270 430
    Hashtable sl = new Hashtable();
271 431
    PreparedStatement pstmt;   

Also available in: Unified diff