Project

General

Profile

« Previous | Next » 

Revision 583

Added by berkley over 23 years ago

cleaned up code, added more complete documentation of replication algorithms and datastructures.

View differences:

src/edu/ucsb/nceas/metacat/MetacatReplication.java
40 40
    //initialize db connections to handle any update requests
41 41
    MetaCatUtil util = new MetaCatUtil();
42 42
    deltaT = util.getOption("deltaT");
43
    //break off a thread to do the delta-T check
43
    //the default deltaT can be set from metacat.properties
44
    //create a thread to do the delta-T check but don't execute it yet
44 45
    replicationDaemon = new Timer(true);
45
    //replicationDaemon.scheduleAtFixedRate(new replicationHandler(), 0, 
46
    //                                      new Integer(deltaT).intValue() * 1000);
47
    //System.out.println("timer scheduled at: " + new Integer(deltaT).intValue() 
48
    //                   + " seconds");
49
    
50 46
  }
51 47
  
52 48
  public void destroy() 
......
102 98
          if(rate < 30)
103 99
          {
104 100
            out.println("Replication deltaT rate cannot be less than 30!");
101
            //deltaT<30 is a timing mess!
105 102
            rate = 1000;
106 103
          }
107 104
        }
......
124 121
      }
125 122
      else if(((String[])params.get("action"))[0].equals("update"))
126 123
      { //request an update list from the server
127
        if(params.contains("servercheckcode"))
128
        { //the servercheckcode allows this server to check for updated 
129
          //file from other servers as well as just updated files from 
130
          //this server.
131
          System.out.println("metacatreplication: servercheckcode: " + 
132
                               ((int[])params.get("servercheckcode"))[0]);
133
          handleUpdateRequest(out, params, response, 
134
                              ((int[])params.get("servercheckcode"))[0]);
135
        }
136
        else
137
        {
138
          handleUpdateRequest2(out, params, response);
139
        }
124
        handleUpdateRequest(out, params, response);
140 125
      }
141 126
      else if(((String[])params.get("action"))[0].equals("read"))
142 127
      { //request a specific document from the server
......
159 144
    }
160 145
  }
161 146
  
147
  /**
148
   * when a forcereplication request comes in, this method sends a read request
149
   * to the requesting server for the specified docid.
150
   */
162 151
  private void handleForceReplicateRequest(PrintWriter out, Hashtable params,
163 152
                                           HttpServletResponse response)
164 153
  {
165
    System.out.println("in handleforcereplicaterequest");
154
    //System.out.println("in handleforcereplicaterequest");
166 155
    String server = ((String[])params.get("server"))[0];
156
    //the server that the request came from
167 157
    String docid = ((String[])params.get("docid"))[0];
158
    //the docid of the document to get
168 159
    String dbaction = "UPDATE";
160
    //default action is update
169 161
    boolean override = false;
170 162
    int serverCode = 1;
171 163
    
172 164
    try
173 165
    {
174 166
      if(params.containsKey("dbaction"))
175
      {
167
      { //if the url contains a dbaction then the default action is overridden
176 168
        dbaction = ((String[])params.get("dbaction"))[0];
177 169
        serverCode = MetacatReplication.getServerCode(server);
178
        override = true;
170
        override = true; //we are now overriding the default action
179 171
      }
180
      System.out.println("action in forcereplicate is: " + dbaction);
181
      System.out.println("serverCode in forcereplicate is: " + serverCode);
172
      //System.out.println("action in forcereplicate is: " + dbaction);
173
      //System.out.println("serverCode in forcereplicate is: " + serverCode);
182 174
      
183 175
      int serverCheckCode = MetacatReplication.getServerCode(server);
184 176
      URL u = new URL("http://" + server + "?action=read&docid=" + docid);
185
      System.out.println("sending message: " + u.toString());
177
      //System.out.println("sending message: " + u.toString());
186 178
      String xmldoc = MetacatReplication.getURLContent(u);
179
      //get the document to write
187 180
      URL docinfourl = new URL("http://" + server + 
188 181
                               "?action=getdocumentinfo&docid=" +
189 182
                               docid);
190
      System.out.println("sending message: " + docinfourl.toString());
183
      //we need to get the document's info so we can set the correct user
184
      //and group once we get the document and write it to our DB
185
      //System.out.println("sending message: " + docinfourl.toString());
191 186
      String docInfoStr = MetacatReplication.getURLContent(docinfourl);
192 187
      DocInfoHandler dih = new DocInfoHandler();
188
      //dih is the parser for the docinfo xml format
193 189
      XMLReader docinfoParser = ReplicationHandler.initParser(dih);
194 190
      docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
195 191
      Hashtable docinfoHash = dih.getDocInfo();
196 192
      String user = (String)docinfoHash.get("user_owner");
197 193
      String group = new String(user);
194
      //right now the user and group are the same.
198 195
      Connection conn = util.openDBConnection();
199 196
      DocumentImpl.write(conn, new StringReader(xmldoc), null, dbaction, docid, 
200 197
                         user, group, serverCode, override);
......
258 255
  
259 256
  /**
260 257
   * Sends all of the xml_documents information encoded in xml to a requestor
258
   * the format is:
259
   * <!ELEMENT documentinfo (docid, docname, doctype, doctitle, user_owner,
260
   *                         user_updated, public_access, rev)
261
   * all of the subelements of document info are #PCDATA
261 262
   */
262 263
  private void handleGetDocumentInfoRequest(PrintWriter out, Hashtable params, 
263 264
                                        HttpServletResponse response)
......
275 276
      sb.append("</doctitle><user_owner>").append(doc.getUserowner());
276 277
      sb.append("</user_owner><user_updated>").append(doc.getUserupdated());
277 278
      sb.append("</user_updated><public_access>").append(doc.getPublicaccess());
278
      sb.append("</public_access></documentinfo>");
279
      sb.append("</public_access><rev>").append(doc.getRev());
280
      sb.append("</rev></documentinfo>");
279 281
      response.setContentType("text/xml");
280 282
      out.println(sb.toString());
281 283
      conn.close();
......
313 315
  }
314 316
  
315 317
  /**
316
   * Does default handling for update requests
318
   * Sends a list of all of the documents on this sever along with their
319
   * revision numbers.  
320
   * The format is:
321
   * <!ELEMENT replication (server, updates)>
322
   * <!ELEMENT server (#PCDATA)>
323
   * <!ELEMENT updates ((updatedDocument | deleteDocument)*)>
324
   * <!ELEMENT updatedDocument (docid, rev)>
325
   * <!ELEMENT deletedDocument (docid, rev)>
326
   * <!ELEMENT docid (#PCDATA)>
327
   * <!ELEMENT rev (#PCDATA)>
328
   * note that the rev in deletedDocument is always empty.  I just left
329
   * it in there to make the parser implementation easier.
317 330
   */
318 331
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
319
                                   HttpServletResponse response)
320
  {
321
    handleUpdateRequest(out, params, response, 1);
322
  }
323

  
324
  /**
325
   * Initiates an update of all server in the xml_replication table.
326
   * the remote host sends an update date.  The local host (this method)
327
   * queries the db for any document that was updated after the update date
328
   * and returns a list of those documents to the remote host.  It also 
329
   * sends back a list of the files that were locally deleted.
330
   
331
   * serverCheckCode allows the requestor to request documents from this 
332
   * server that reside on a different server.  Normally, this server
333
   * should only replicate files that it owns but in the case of updating 
334
   * a file that does not belong to this server, it is needed.  See 
335
   * DocumentImpl.write().
336
   */
337
  private void handleUpdateRequest(PrintWriter out, Hashtable params, 
338
                                   HttpServletResponse response, 
339
                                   int serverCheckCode)
340
  { 
341
    System.out.println("incoming update request for dt/time " + 
342
                       ((String[])params.get("date"))[0] +
343
                       " from external metacat");
344
    response.setContentType("text/xml");
345
    StringBuffer returnXML = new StringBuffer();
346
    returnXML.append("<?xml version=\"1.0\"?><replication>");
347
    
348
    StringBuffer sql = new StringBuffer();
349
    String updateStr = ((String[])params.get("date"))[0];
350
    updateStr = updateStr.replace('+', ' ');
351
    //pseudo algorithm:
352
    ///////////////////////////////////////////////////////////////////////
353
    //get the date/time from the requestor, query the db for any documents
354
    //that have an update date later than the requested date and send
355
    //those docids back to the requestor.  If there are no newer documents
356
    //then send back a null message.
357
    ///////////////////////////////////////////////////////////////////////
358
    
359
    //Timestamp update = Timestamp.valueOf(updateStr);
360
    SimpleDateFormat formatter = new SimpleDateFormat ("yy-MM-dd HH:mm:ss");
361
    java.util.Date update = new java.util.Date();
362
    ParsePosition pos = new ParsePosition(0);
363
    update = formatter.parse(updateStr, pos);
364
    String dateString = formatter.format(update);
365
    sql.append("select docid, date_updated, server_location from ");
366
    sql.append("xml_documents where date_updated > ");
367
    sql.append("to_date('").append(dateString);
368
    sql.append("','YY-MM-DD HH24:MI:SS')");
369
    //System.out.println("sql: " + sql.toString());
370
    
371
    //get any recently deleted documents
372
    StringBuffer delsql = new StringBuffer();
373
    delsql.append("select docid, date_updated, server_location from ");
374
    delsql.append("xml_revisions where docid not in (select docid from ");
375
    delsql.append("xml_documents) and date_updated > to_date('");
376
    delsql.append(dateString).append("','YY-MM-DD HH24:MI:SS')");
377

  
378
    try
379
    {
380
      Connection conn = util.openDBConnection();
381
      PreparedStatement pstmt = conn.prepareStatement(sql.toString());
382
      pstmt.execute();
383
      ResultSet rs = pstmt.getResultSet();
384
      boolean tablehasrows = rs.next();
385
      
386
      //if a '1' should not represent localhost, add code here to query
387
      //xml_replication for the proper serverid number
388
      
389
      returnXML.append("<server>").append(util.getOption("server"));
390
      returnXML.append(util.getOption("replicationpath"));
391
      returnXML.append("</server><updates>");
392
      while(tablehasrows)
393
      {
394
        String docid = rs.getString(1);
395
        String dateUpdated = rs.getString(2);
396
        int serverCode = rs.getInt(3);
397
        if(serverCode == serverCheckCode)
398
        { //check that this document is from this server.
399
          //servers only replicate their own documents!
400
          returnXML.append("<updatedDocument><docid>").append(docid);
401
          returnXML.append("</docid><date_updated>").append(dateUpdated);
402
          returnXML.append("</date_updated></updatedDocument>");
403
        }
404
        tablehasrows = rs.next();
405
      }
406
      
407
      pstmt = conn.prepareStatement(delsql.toString());
408
      pstmt.execute();
409
      rs = pstmt.getResultSet();
410
      tablehasrows = rs.next();
411
      while(tablehasrows)
412
      { //handle the deleted documents
413
        String docid = rs.getString(1);
414
        String dateUpdated = rs.getString(2);
415
        int serverCode = rs.getInt(3);
416
        if(serverCode == 1)
417
        {
418
          returnXML.append("<deletedDocument><docid>").append(docid);
419
          returnXML.append("</docid><date_updated>").append(dateUpdated);
420
          returnXML.append("</date_updated></deletedDocument>");
421
        }
422
        tablehasrows = rs.next();
423
      }
424
      
425
      returnXML.append("</updates></replication>");
426
      conn.close();
427
      //System.out.println(returnXML.toString());
428
      out.print(returnXML.toString());
429
    }
430
    catch(Exception e)
431
    {
432
      System.out.println("Exception in metacatReplication: " + e.getMessage());
433
    }
434
  }
435
  
436
  /**
437
   * Sends an update list based on rev numbers instead of dates.
438
   */
439
  private void handleUpdateRequest2(PrintWriter out, Hashtable params, 
440 332
                                    HttpServletResponse response)
441 333
  {
442 334
    System.out.println("in handleUpdateRequest2");
......
484 376
      { //handle the deleted documents
485 377
        doclist.append("<deletedDocument><docid>").append(rs.getString(1));
486 378
        doclist.append("</docid><rev></rev></deletedDocument>");
379
        //note that rev is always empty for deleted docs
487 380
        tablehasrows = rs.next();
488 381
      }
489 382
      
......
518 411
  }
519 412
  
520 413
  /**
521
   * This method is what is run when a seperate thread is broken off to handle
522
   * inserting a document from a remote replicated server.
414
   * this method handles the timeout for a file lock.  when a lock is 
415
   * granted it is granted for 30 seconds.  When this thread runs out
416
   * it deletes the docid from the queue, thus eliminating the lock.
523 417
   */
524 418
  public void run()
525 419
  {
526 420
    try
527 421
    {
528
      System.out.println("thread started for docid: " + 
529
                         (String)fileLocks.elementAt(0));
422
      //System.out.println("thread started for docid: " + 
423
      //                   (String)fileLocks.elementAt(0));
530 424
      Thread.sleep(30000); //the lock will expire in 30 seconds
531
      System.out.println("thread for docid: " + 
532
                         (String)fileLocks.elementAt(fileLocks.size() - 1) + 
533
                         " exiting.");
425
      //System.out.println("thread for docid: " + 
426
      //                   (String)fileLocks.elementAt(fileLocks.size() - 1) + 
427
      //                   " exiting.");
534 428
      fileLocks.remove(fileLocks.size() - 1);
535 429
      //fileLocks is treated as a FIFO queue.  If there are more than one lock
536 430
      //in the vector, the first one inserted will be removed.
src/edu/ucsb/nceas/metacat/ReplicationHandler.java
70 70
    {
71 71
      conn = util.openDBConnection();
72 72
      serverList = buildServerList(conn);
73
      update2(serverList, conn);
73
      update(serverList, conn);
74 74
      conn.close();
75 75
    }
76 76
    catch (Exception e)
......
82 82
  /**
83 83
   * Method that uses revision taging for replication instead of update_date.
84 84
   */
85
  private void update2(Hashtable serverList, Connection conn)
85
  private void update(Hashtable serverList, Connection conn)
86 86
  {
87 87
    /*
88 88
     Pseudo-algorithm
......
312 312
  }
313 313
  
314 314
  /**
315
   * Method to check each server in the servetList for updates
316
   */
317
  private void update(Hashtable serverList, Connection conn)
318
  {
319
    /*
320
     Pseudo-algorithm:
321
     -build a list of servers that need to be checked
322
     -check each server to see if any documents were modified after the 
323
      server's update_date
324
     -if there are documents that need to be updated pull them here
325
      and update the one currently in the database or add a new one
326
      if it is not already here.
327
     -update each servers' update_date to the current time
328
    */
329
    PreparedStatement pstmt;
330
    Enumeration keys;
331
    int istreamInt;
332
    char istreamChar;
333
    StringBuffer serverResponse = new StringBuffer();
334
    String server;
335
    String update;
336
    Vector responses = new Vector();
337
    ReplMessageHandler message = new ReplMessageHandler();
338
    Hashtable updateDocs = new Hashtable();
339
    URL u;
340
    InputStreamReader istream;
341
    
342
    try
343
    {
344
      //build a list of servers with updated documents.  Choose the newest
345
      //one out of the list, update this server, update last_checked
346
      
347
      keys = serverList.keys();
348
      while(keys.hasMoreElements())
349
      { //update from one server at a time
350
        server = (String)(keys.nextElement());
351
        update = (String)(serverList.get(server));
352
        //send the server a date and it will send back any docid that has 
353
        //been modified after that date
354
        
355
        update = update.replace(' ', '+'); 
356
        
357
        if(serverCheckCode != 1)
358
        {
359
          u = new URL("http://" + server + "?action=update&date=" + update +
360
                      "&servercheckcode=" + serverCheckCode);
361
        }
362
        else
363
        {
364
          u = new URL("http://" + server + "?action=update&date=" + update);
365
        }
366
        responses.add(MetacatReplication.getURLContent(u)); //list of updates
367
      }
368

  
369
      //initialize the parser
370
      XMLReader parser = initParser(message);
371
      //System.out.println("response: " + responses.toString());
372
      //System.out.println("<--end response-->");
373
      for(int i=0; i<responses.size(); i++)
374
      { //parse the xml and get the result
375
        parser.parse(new InputSource(
376
                     new StringReader(
377
                     (String)(responses.elementAt(i)))));
378
        Vector v = new Vector(message.getUpdatesVect());
379
        Vector d = new Vector(message.getDeletesVect());
380
        for(int j=0; j<v.size(); j++)
381
        { //go through each update vector and update or insert
382
          //a new document
383
          Vector w = new Vector((Vector)(v.elementAt(j)));
384
          String docid = (String)w.elementAt(0);
385
          String docServer = (String)w.elementAt(2);
386

  
387
          //send a message to the server requesting each document
388
          URL getDocURL = new URL("http://" + docServer + 
389
                                  "?action=read&docid="+ docid);
390
          String srvrResponseStr = MetacatReplication.getURLContent(getDocURL);
391
          //System.out.println("<<<<<<document>>>>>");
392
          //System.out.println(serverResponse.toString());
393
          //System.out.println("<<<<<<end document>>>>>");
394
          
395
          int serverCode = MetacatReplication.getServerCode(docServer);
396
          
397
          if(serverCode != 0)
398
          {
399
            //update the document into the DB
400
            String action = getAction(docid);
401
            //System.out.println("action: " + action + " docid: " + docid);
402

  
403
            DocInfoHandler dih = new DocInfoHandler();
404
            XMLReader docinfoParser = initParser(dih);
405
            URL docinfoUrl = new URL("http://" + docServer + 
406
                                     "?action=getdocumentinfo&docid=" +
407
                                     docid);
408
            String docInfoStr = MetacatReplication.getURLContent(docinfoUrl);
409
            docinfoParser.parse(new InputSource(new StringReader(docInfoStr)));
410
            Hashtable docinfoHash = dih.getDocInfo();
411
            String newDocid = DocumentImpl.write(conn, 
412
                               new StringReader(srvrResponseStr),
413
                               action, 
414
                               docid, (String)docinfoHash.get("user_owner"), 
415
                               (String)docinfoHash.get("user_owner"), 
416
                               serverCode);
417
            System.out.println("newDocid: " + newDocid + " " + action + "ED");
418
          }
419
        }
420
        
421
        for(int k=0; k<d.size(); k++)
422
        {
423
          Vector w = new Vector((Vector)d.elementAt(k));
424
          String docid = (String)w.elementAt(0);
425
          
426
          DocumentImpl.delete(conn, docid, null, null);
427
          System.out.println("Document " + docid + " deleted.");
428
        }
429
      }
430

  
431
      keys = null;
432
      keys = serverList.keys();
433
      while(keys.hasMoreElements())
434
      {
435
        server = (String)(keys.nextElement()); 
436
        URL dateurl = new URL("http://" + server + "?action=gettime");
437
        String datexml = MetacatReplication.getURLContent(dateurl);
438
        String datestr = datexml.substring(11, datexml.indexOf('<', 11));
439
        StringBuffer sql = new StringBuffer();
440
        sql.append("update xml_replication set last_checked = to_date('");
441
        sql.append(datestr).append("', 'YY-MM-DD HH24:MI:SS') where ");
442
        sql.append("server like '").append(server).append("'");
443
        //System.out.println("sql: " + sql.toString());
444
        pstmt = conn.prepareStatement(sql.toString());
445
        pstmt.executeUpdate();
446
        //conn.commit();
447
        System.out.println("last_checked updated to " + datestr + " on " +
448
                            server);
449
      }
450
  
451
    }
452
    catch(Exception e)
453
    {
454
      System.out.println("Error in replicationHandler.update(): " + 
455
                         e.getMessage());
456
      e.printStackTrace(System.out);
457
    }
458
  }
459
  
460
  /**
461 315
   * Checks to see if a document is already in the DB.  Returns
462 316
   * "UPDATE" if it is, "INSERT" if it isn't
463 317
   */

Also available in: Unified diff