Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *
8
 *   '$Author: leinfelder $'
9
 *     '$Date: 2012-11-26 14:28:02 -0800 (Mon, 26 Nov 2012) $'
10
 * '$Revision: 7434 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat;
28

    
29
import java.sql.PreparedStatement;
30
import java.sql.ResultSet;
31
import java.sql.SQLException;
32
import java.util.Vector;
33
import java.util.HashMap;
34
import java.lang.Comparable;
35

    
36
import edu.ucsb.nceas.metacat.database.DBConnection;
37
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
38
import edu.ucsb.nceas.metacat.properties.PropertyService;
39
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
40

    
41
import org.apache.log4j.Logger;
42

    
43
public class IndexingQueue {
44

    
45
	private static Logger logMetacat = Logger.getLogger(IndexingQueue.class);
46
	//	 Map used to keep tracks of docids to be indexed
47
	private HashMap<String, IndexingQueueObject> indexingMap = new HashMap<String, IndexingQueueObject>();     
48
	private Vector<IndexingTask> currentThreads = new Vector<IndexingTask>();
49
	public Vector<String> currentDocidsBeingIndexed = new Vector<String>();
50
	private boolean metacatRunning = true;
51
	
52
	private static IndexingQueue instance = null;
53

    
54
	final static int NUMBEROFINDEXINGTHREADS;
55
	static {
56
		int numIndexingThreads = 0;
57
		try {
58
			numIndexingThreads = 
59
				Integer.parseInt(PropertyService.getProperty("database.numberOfIndexingThreads"));
60
		} catch (PropertyNotFoundException pnfe) {
61
			logMetacat.error("Could not get property in static block: " 
62
					+ pnfe.getMessage());
63
		}
64
		NUMBEROFINDEXINGTHREADS = numIndexingThreads;
65
	}
66

    
67
    private IndexingQueue() {
68
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
69
	    	IndexingTask thread = new IndexingTask();
70
	    	thread.start();
71
	    	currentThreads.add(thread);
72
	    }
73
    }
74
	
75
	public static synchronized IndexingQueue getInstance(){
76
		if (instance == null) {
77
			instance = new IndexingQueue();
78
		}
79
		return instance;
80
	}//getInstance
81

    
82
    public void add(String docid, String rev) {
83
    	add(new IndexingQueueObject(docid, rev, 0));
84
    }
85
    
86
    protected void add(IndexingQueueObject queueObject) {
87
	    synchronized (indexingMap) {
88
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
89
	    		indexingMap.put(queueObject.getDocid(), queueObject);
90
	    		indexingMap.notify();
91
	    	} else {
92
	    		IndexingQueueObject oldQueueObject = indexingMap.get(queueObject.getDocid());
93
	    		if(oldQueueObject.compareTo(queueObject) < 0){
94
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
95
		    		indexingMap.notify();  			
96
	    		}
97
	    	}
98
	    }
99
	  }
100
    
101
	public boolean getMetacatRunning(){
102
		return this.metacatRunning;
103
	}
104
	
105
	public void setMetacatRunning(boolean metacatRunning){
106
		this.metacatRunning = metacatRunning;
107
		
108
		if(!metacatRunning){
109
			for(int count=0; count<currentThreads.size(); count++){
110
				currentThreads.get(count).metacatRunning = false;
111
				currentThreads.get(count).interrupt();
112
			}
113
		}
114
	}
115
	
116
    protected IndexingQueueObject getNext() {
117
    	IndexingQueueObject returnVal = null;
118
        synchronized (indexingMap) {
119
          while (indexingMap.isEmpty() && metacatRunning) {
120
            try {
121
            	indexingMap.wait();
122
            } catch (InterruptedException ex) {
123
              logMetacat.error("Interrupted");
124
            }
125
          }
126
          
127
          if(metacatRunning){
128
        	  String docid = indexingMap.keySet().iterator().next();
129
        	  returnVal = indexingMap.get(docid);
130
        	  indexingMap.remove(docid);
131
          }
132
        }
133
        return returnVal;
134
      }
135
    
136
    /**
137
     * Removes the Indexing Task object from the queue
138
     * for the given docid. Currently, rev is ignored
139
     * This method should be used to cancel scheduled indexing on a document
140
     * (typically if it is being deleted but indexing has not completed yet)
141
     * see http://bugzilla.ecoinformatics.org/show_bug.cgi?id=5750
142
     * @param docid the docid (without revision)
143
     * @param rev the docid's rev (ignored)
144
     */
145
    public void remove(String docid, String rev) {
146
		synchronized (indexingMap) {
147
			if (indexingMap.containsKey(docid)) {
148
				logMetacat.debug("Removing indexing queue task for docid: " + docid);
149
				indexingMap.remove(docid);
150
			}
151
		}
152
	}
153

    
154
}
155

    
156
class IndexingTask extends Thread {
157
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
158
  	  
159
  	  
160
      protected final static long MAXIMUMINDEXDELAY;
161
      static {
162
    	  long maxIndexDelay = 0;
163
    	  try {
164
    		  maxIndexDelay = 
165
    			  Integer.parseInt(PropertyService.getProperty("database.maximumIndexDelay")); 
166
    	  } catch (PropertyNotFoundException pnfe) {
167
    		  System.err.println("Could not get property in static block: " 
168
  					+ pnfe.getMessage());
169
    	  }
170
    	  MAXIMUMINDEXDELAY = maxIndexDelay;
171
      }
172
      
173
      protected boolean metacatRunning = true;
174
      	
175
	  public void run() {
176
	    while (metacatRunning) {
177
	      // blocks until job
178
	      IndexingQueueObject indexQueueObject = 
179
	    	  IndexingQueue.getInstance().getNext();
180

    
181
	      if(indexQueueObject != null){
182
	    	  if(!IndexingQueue.getInstance().
183
	    			currentDocidsBeingIndexed.contains(indexQueueObject.getDocid())){
184
    		  try {
185
    			  IndexingQueue.getInstance().
186
    			  		currentDocidsBeingIndexed.add(indexQueueObject.getDocid());
187
    		      String docid = indexQueueObject.getDocid() + "." + indexQueueObject.getRev();
188
    			  if(checkDocumentTable(docid, "xml_documents")){
189
    				  logMetacat.warn("Calling buildIndex for " + docid);
190
    				  DocumentImpl doc = new DocumentImpl(docid, false);
191
    				  doc.buildIndex();
192
    				  logMetacat.warn("finish building index for doicd "+docid);
193
    			  } else {
194
    				  logMetacat.warn("Couldn't find the docid:" + docid 
195
	                			+ " in xml_documents table");
196
	                  sleep(MAXIMUMINDEXDELAY);
197
	                  throw(new Exception("Couldn't find the docid:" + docid 
198
	                			+ " in xml_documents table"));
199
    			  }
200
    		  	} catch (Exception e) {
201
    			  logMetacat.warn("Exception: " + e);
202
    			  e.printStackTrace();
203
	        
204
    			  if(indexQueueObject.getCount() < 25){
205
    				  indexQueueObject.setCount(indexQueueObject.getCount()+1);
206
    				  // add the docid back to the list
207
    				  IndexingQueue.getInstance().add(indexQueueObject);
208
    			  } else {
209
    				  logMetacat.fatal("Docid " + indexQueueObject.getDocid() 
210
	        			+ " has been inserted to IndexingQueue "
211
	        			+ "more than 25 times. Not adding the docid to"
212
	        			+ " the queue again.");
213
    			  }
214
    		  	} finally {
215
    			  	IndexingQueue.getInstance().currentDocidsBeingIndexed
216
    			  		.remove(indexQueueObject.getDocid());	    	  
217
    		  	}
218
	    	  } else {
219
	    		  indexQueueObject.setCount(indexQueueObject.getCount()+1);
220
	    		  IndexingQueue.getInstance().add(indexQueueObject);    		  
221
	    	  }
222
	      }
223
	    }
224
	  }
225
	  
226
      private boolean checkDocumentTable(String docid, String tablename) throws Exception{
227
	        DBConnection dbConn = null;
228
	        int serialNumber = -1;
229
	        boolean inxmldoc = false;
230
            
231
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
232
	        int rev = Integer.parseInt(revision);
233
	        docid = docid.substring(0,docid.lastIndexOf("."));
234

    
235
	        logMetacat.info("Checking if document exists in xml_documents: docid is " 
236
	        		+ docid + " and revision is " + revision);
237

    
238
	        try {
239
	            // Opening separate db connection for writing XML Index
240
	            dbConn = DBConnectionPool
241
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
242
	            serialNumber = dbConn.getCheckOutSerialNumber();
243

    
244
	            String xmlDocumentsCheck = "SELECT distinct docid FROM " + tablename
245
	                        + " WHERE docid = ? " 
246
	                        + " AND rev = ?";
247

    
248
                PreparedStatement xmlDocCheck = dbConn.prepareStatement(xmlDocumentsCheck);
249
                xmlDocCheck.setString(1, docid);
250
                xmlDocCheck.setInt(2, rev);
251
                // Increase usage count
252
                dbConn.increaseUsageCount(1);
253
	            xmlDocCheck.execute();
254
	            ResultSet doccheckRS = xmlDocCheck.getResultSet();
255
	            boolean tableHasRows = doccheckRS.next();
256
	            if (tableHasRows) {
257
	            	inxmldoc = true;
258
	            }
259
	            doccheckRS.close();
260
	            xmlDocCheck.close();
261
	        } catch (SQLException e) {
262
	        	   e.printStackTrace();
263
	        } finally {
264
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
265
	        }//finally
266
	        
267
	        return inxmldoc;
268
      }
269
	}
270

    
271
class IndexingQueueObject implements Comparable{
272
	// the docid of the document to be indexed. 
273
	private String docid;
274
	// the docid of the document to be indexed. 
275
	private String rev;
276
	// the count of number of times the document has been in the queue
277
	private int count;
278
	
279
	IndexingQueueObject(String docid, String rev, int count){
280
		this.docid = docid;
281
		this.rev = rev;
282
		this.count = count;
283
	}
284
	
285
	public int getCount(){
286
		return count;
287
	}
288

    
289
	public String getDocid(){
290
		return docid;
291
	}
292

    
293
	public String getRev(){
294
		return rev;
295
	}
296

    
297
	public void setCount(int count){
298
		this.count = count;
299
	}
300
	
301
	public void setDocid(String docid){
302
		this.docid = docid;
303
	}
304

    
305
	public void setRev(String rev){
306
		this.rev = rev;
307
	}
308
	
309
	public int compareTo(Object o){
310
		if(o instanceof IndexingQueueObject){
311
			int revision = Integer.parseInt(rev);
312
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
313
			
314
			if(revision == oRevision) {
315
				return 0;
316
			} else if (revision > oRevision) {
317
				return 1;
318
			} else {
319
				return -1;
320
			}
321
		} else {
322
			throw new java.lang.ClassCastException();
323
		}
324
	}
325
}
(37-37/63)