Project

General

Profile

« Previous | Next » 

Revision 9203

Added by rnahf over 9 years ago

fixes #7092: added finally clause to ReplicationHandler.update method to unconditionally close the inputStreams it keeps in a Vector. Needs testing.

View differences:

src/edu/ucsb/nceas/metacat/replication/ReplicationHandler.java
40 40
import java.text.ParseException;
41 41
import java.util.Date;
42 42
import java.util.Hashtable;
43
import java.util.Iterator;
43 44
import java.util.TimerTask;
44 45
import java.util.Vector;
45 46

  
......
144 145
   */
145 146
  private void update()
146 147
  {
147
	  
148
	  _xmlDocQueryCount = 0;
149
	  _xmlRevQueryCount = 0;
150
	  _xmlDocQueryTime = 0;
151
	  _xmlRevQueryTime = 0;
152
    /*
148
    Vector<InputStream> responses = new Vector<InputStream>();
149
    try {
150

  
151
        _xmlDocQueryCount = 0;
152
        _xmlRevQueryCount = 0;
153
        _xmlDocQueryTime = 0;
154
        _xmlRevQueryTime = 0;
155
        /*
153 156
     Pseudo-algorithm
154 157
     - request a doc list from each server in xml_replication
155 158
     - check the rev number of each of those documents agains the
......
161 164
     - update last_checked to keep track of the last time it was checked.
162 165
       (this info is theoretically not needed using this system but probably
163 166
       should be kept anyway)
164
    */
167
         */
165 168

  
166
    ReplicationServer replServer = null; // Variable to store the
167
                                        // ReplicationServer got from
168
                                        // Server list
169
    String server = null; // Variable to store server name
170
//    String update;
171
    Vector<InputStream> responses = new Vector<InputStream>();
172
    URL u;
173
    long replicationStartTime = System.currentTimeMillis();
174
    long timeToGetServerList = 0;
175
    
176
    //Check for every server in server list to get updated list and put
177
    // them in to response
178
    long startTimeToGetServers = System.currentTimeMillis();
179
    for (int i=0; i<serverList.size(); i++)
180
    {
181
        // Get ReplicationServer object from server list
182
        replServer = serverList.serverAt(i);
183
        // Get server name from ReplicationServer object
184
        server = replServer.getServerName().trim();
185
        InputStream result = null;
186
        logReplication.info("ReplicationHandler.update - full update started to: " + server);
187
        // Send command to that server to get updated docid information
188
        try
169
        ReplicationServer replServer = null; // Variable to store the
170
        // ReplicationServer got from
171
        // Server list
172
        String server = null; // Variable to store server name
173
        //    String update;
174

  
175
        URL u;
176
        long replicationStartTime = System.currentTimeMillis();
177
        long timeToGetServerList = 0;
178

  
179
        //Check for every server in server list to get updated list and put
180
        // them in to response
181
        long startTimeToGetServers = System.currentTimeMillis();
182
        for (int i=0; i<serverList.size(); i++)
189 183
        {
190
          u = new URL("https://" + server + "?server="
191
          +MetacatUtil.getLocalReplicationServerName()+"&action=update");
192
          logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
193
          result = ReplicationService.getURLStream(u);
184
            // Get ReplicationServer object from server list
185
            replServer = serverList.serverAt(i);
186
            // Get server name from ReplicationServer object
187
            server = replServer.getServerName().trim();
188
            InputStream result = null;
189
            logReplication.info("ReplicationHandler.update - full update started to: " + server);
190
            // Send command to that server to get updated docid information
191
            try
192
            {
193
                u = new URL("https://" + server + "?server="
194
                        +MetacatUtil.getLocalReplicationServerName()+"&action=update");
195
                logReplication.info("ReplicationHandler.update - Sending infomation " +u.toString());
196
                result = ReplicationService.getURLStream(u);
197
            }
198
            catch (Exception e)
199
            {
200
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
201
                logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
202
                        "for server " + server + " because "+e.getMessage());
203
                continue;
204
            }
205

  
206
            //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
207
            //check if result have error or not, if has skip it.
208
            // TODO: check for error in stream
209
            //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
210
            if (result == null) {
211
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
212
                logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
213
                        "for server " + server + " because "+result);
214
                continue;
215
            }
216
            //Add result to vector
217
            responses.add(result);
194 218
        }
195
        catch (Exception e)
219
        timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
220

  
221
        //make sure that there is updated file list
222
        //If response is null, metacat don't need do anything
223
        if (responses==null || responses.isEmpty())
196 224
        {
197
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
198
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
199
                       "for server " + server + " because "+e.getMessage());
200
          continue;
225
            logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
226
            logReplication.info( "ReplicationHandler.update - No updated doc list for "+
227
                    "every server and failed to replicate");
228
            return;
201 229
        }
202 230

  
203
        //logReplication.info("ReplicationHandler.update - docid: "+server+" "+result);
204
        //check if result have error or not, if has skip it.
205
        // TODO: check for error in stream
206
        //if (result.indexOf("<error>") != -1 && result.indexOf("</error>") != -1) {
207
        if (result == null) {
208
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
209
          logReplication.error( "ReplicationHandler.update - Failed to get updated doc list "+
210
                       "for server " + server + " because "+result);
211
          continue;
212
        }
213
        //Add result to vector
214
        responses.add(result);
215
    }
216
    timeToGetServerList = System.currentTimeMillis() - startTimeToGetServers;
217 231

  
218
    //make sure that there is updated file list
219
    //If response is null, metacat don't need do anything
220
    if (responses==null || responses.isEmpty())
221
    {
222
    	logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
223
        logReplication.info( "ReplicationHandler.update - No updated doc list for "+
224
                           "every server and failed to replicate");
225
        return;
226
    }
232
        //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
233
        //               "document information: "+ responses.toString());
227 234

  
235
        long totalServerListParseTime = 0;
236
        // go through response vector(it contains updated vector and delete vector
237
        for(int i=0; i<responses.size(); i++)
238
        {
239
            long startServerListParseTime = System.currentTimeMillis();
240
            XMLReader parser;
241
            ReplMessageHandler message = new ReplMessageHandler();
242
            try
243
            {
244
                parser = initParser(message);
245
            }
246
            catch (Exception e)
247
            {
248
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
249
                logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
250
                        " initParser for message and " +e.getMessage());
251
                // stop replication
252
                return;
253
            }
228 254

  
229
    //logReplication.info("ReplicationHandler.update - Responses from remote metacat about updated "+
230
    //               "document information: "+ responses.toString());
231
    
232
    long totalServerListParseTime = 0;
233
    // go through response vector(it contains updated vector and delete vector
234
    for(int i=0; i<responses.size(); i++)
235
    {
236
    	long startServerListParseTime = System.currentTimeMillis();
237
    	XMLReader parser;
238
    	ReplMessageHandler message = new ReplMessageHandler();
239
    	try
255
            try
256
            {
257
                parser.parse(new InputSource(responses.elementAt(i)));
258
            }
259
            catch(Exception e)
260
            {
261
                logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
262
                logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
263
                        "because "+ e.getMessage());
264
                continue;
265
            }
266
            finally 
267
            {
268
                IOUtils.closeQuietly(responses.elementAt(i));
269
            }
270
            //v is the list of updated documents
271
            Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
272
            logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
273
            //d is the list of deleted documents
274
            Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
275
            logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
276
            logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
277
            logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
278
            // go though every element in updated document vector
279
            handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
280
            //handle deleted docs
281
            for(int k=0; k<deleteList.size(); k++)
282
            { //delete the deleted documents;
283
                Vector<String> w = new Vector<String>(deleteList.elementAt(k));
284
                String docId = (String)w.elementAt(0);
285
                try
286
                {
287
                    handleDeleteSingleDocument(docId, server);
288
                }
289
                catch (Exception ee)
290
                {
291
                    continue;
292
                }
293
            }//for delete docs
294

  
295
            // handle replicate doc in xml_revision
296
            Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
297
            logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
298
            handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
299
            DOCINSERTNUMBER = 1;
300
            DOCERRORNUMBER  = 1;
301
            REVINSERTNUMBER = 1;
302
            REVERRORNUMBER  = 1;
303

  
304
            // handle system metadata
305
            Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
306
            for(int k = 0; k < systemMetadataList.size(); k++) { 
307
                Vector<String> w = systemMetadataList.elementAt(k);
308
                String guid = (String) w.elementAt(0);
309
                String remoteserver = (String) w.elementAt(1);
310
                try {
311
                    handleSystemMetadata(remoteserver, guid);
312
                }
313
                catch (Exception ee) {
314
                    logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
315
                    continue;
316
                }
317
            }
318

  
319
            totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
320
        }//for response
321

  
322
        //updated last_checked
323
        for (int i=0;i<serverList.size(); i++)
240 324
        {
241
          parser = initParser(message);
242
        }
243
        catch (Exception e)
244
        {
245
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
246
          logReplication.error("ReplicationHandler.update - Failed to replicate becaue couldn't " +
247
                                " initParser for message and " +e.getMessage());
248
           // stop replication
249
           return;
250
        }
251
    	
252
        try
253
        {
254
          parser.parse(new InputSource(responses.elementAt(i)));
255
        }
256
        catch(Exception e)
257
        {
258
          logMetacat.error("ReplicationHandler.update - " + ReplicationService.METACAT_REPL_ERROR_MSG);
259
          logReplication.error("ReplicationHandler.update - Couldn't parse one responses "+
260
                                   "because "+ e.getMessage());
261
          continue;
262
        }
263
        finally 
264
        {
265
            IOUtils.closeQuietly(responses.elementAt(i));
266
        }
267
        //v is the list of updated documents
268
        Vector<Vector<String>> updateList = new Vector<Vector<String>>(message.getUpdatesVect());
269
        logReplication.info("ReplicationHandler.update - The document list size is "+updateList.size()+ " from "+message.getServerName());
270
        //d is the list of deleted documents
271
        Vector<Vector<String>> deleteList = new Vector<Vector<String>>(message.getDeletesVect());
272
        logReplication.info("ReplicationHandler.update - Update vector size: "+ updateList.size()+" from "+message.getServerName());
273
        logReplication.info("ReplicationHandler.update - Delete vector size: "+ deleteList.size()+" from "+message.getServerName());
274
        logReplication.info("ReplicationHandler.update - The delete document list size is "+deleteList.size()+" from "+message.getServerName());
275
        // go though every element in updated document vector
276
        handleDocList(updateList, DocumentImpl.DOCUMENTTABLE);
277
        //handle deleted docs
278
        for(int k=0; k<deleteList.size(); k++)
279
        { //delete the deleted documents;
280
          Vector<String> w = new Vector<String>(deleteList.elementAt(k));
281
          String docId = (String)w.elementAt(0);
282
          try
283
          {
284
            handleDeleteSingleDocument(docId, server);
285
          }
286
          catch (Exception ee)
287
          {
288
            continue;
289
          }
290
        }//for delete docs
291
        
292
        // handle replicate doc in xml_revision
293
        Vector<Vector<String>> revisionList = new Vector<Vector<String>>(message.getRevisionsVect());
294
        logReplication.info("ReplicationHandler.update - The revision document list size is "+revisionList.size()+ " from "+message.getServerName());
295
        handleDocList(revisionList, DocumentImpl.REVISIONTABLE);
296
        DOCINSERTNUMBER = 1;
297
        DOCERRORNUMBER  = 1;
298
        REVINSERTNUMBER = 1;
299
        REVERRORNUMBER  = 1;
300
        
301
        // handle system metadata
302
        Vector<Vector<String>> systemMetadataList = message.getSystemMetadataVect();
303
        for(int k = 0; k < systemMetadataList.size(); k++) { 
304
        	Vector<String> w = systemMetadataList.elementAt(k);
305
        	String guid = (String) w.elementAt(0);
306
        	String remoteserver = (String) w.elementAt(1);
307
        	try {
308
        		handleSystemMetadata(remoteserver, guid);
309
        	}
310
        	catch (Exception ee) {
311
        		logMetacat.error("Error replicating system metedata for guid: " + guid, ee);
312
        		continue;
313
        	}
314
        }
315
        
316
        totalServerListParseTime += (System.currentTimeMillis() - startServerListParseTime);
317
    }//for response
325
            // Get ReplicationServer object from server list
326
            replServer = serverList.serverAt(i);
327
            try
328
            {
329
                updateLastCheckTimeForSingleServer(replServer);
330
            }
331
            catch(Exception e)
332
            {
333
                continue;
334
            }
335
        }//for
318 336

  
319
    //updated last_checked
320
    for (int i=0;i<serverList.size(); i++)
321
    {
322
       // Get ReplicationServer object from server list
323
       replServer = serverList.serverAt(i);
324
       try
325
       {
326
         updateLastCheckTimeForSingleServer(replServer);
327
       }
328
       catch(Exception e)
329
       {
330
         continue;
331
       }
332
    }//for
333
    
334
    long replicationEndTime = System.currentTimeMillis();
335
    logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
336
    		(replicationEndTime - replicationStartTime));
337
    logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
338
    		timeToGetServerList);
339
    logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
340
    		totalServerListParseTime);
341
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
342
    		_xmlDocQueryCount);
343
    logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
344
    		_xmlDocQueryTime + " ms");
345
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
346
    		_xmlRevQueryCount);
347
    logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
348
    		_xmlRevQueryTime + " ms");;
337
        long replicationEndTime = System.currentTimeMillis();
338
        logMetacat.debug("ReplicationHandler.update - Total replication time: " + 
339
                (replicationEndTime - replicationStartTime));
340
        logMetacat.debug("ReplicationHandler.update - time to get server list: " + 
341
                timeToGetServerList);
342
        logMetacat.debug("ReplicationHandler.update - server list parse time: " + 
343
                totalServerListParseTime);
344
        logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query count: " + 
345
                _xmlDocQueryCount);
346
        logMetacat.debug("ReplicationHandler.update - 'in xml_documents' total query time: " + 
347
                _xmlDocQueryTime + " ms");
348
        logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query count: " + 
349
                _xmlRevQueryCount);
350
        logMetacat.debug("ReplicationHandler.update - 'in xml_revisions' total query time: " + 
351
                _xmlRevQueryTime + " ms");;
349 352

  
350
  }//update
353
    } finally { // need to close all inputstreams unconditionally
354
        Iterator<InputStream> isit = responses.iterator();
355
        while (isit.hasNext()) {
356
            IOUtils.closeQuietly(isit.next());
357
        }
358
    }
359
    }//update
351 360

  
352 361
  /* Handle replicate single xml document*/
353 362
  private void handleSingleXMLDocument(String remoteserver, String actions,

Also available in: Unified diff