Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *
8
 *   '$Author: tao $'
9
 *     '$Date: 2013-08-26 16:30:00 -0700 (Mon, 26 Aug 2013) $'
10
 * '$Revision: 8164 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.metacat;
28

    
29
import java.sql.PreparedStatement;
30
import java.sql.ResultSet;
31
import java.sql.SQLException;
32
import java.util.Vector;
33
import java.util.HashMap;
34
import java.lang.Comparable;
35

    
36
import edu.ucsb.nceas.metacat.common.query.EnabledQueryEngines;
37
import edu.ucsb.nceas.metacat.database.DBConnection;
38
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
39
import edu.ucsb.nceas.metacat.properties.PropertyService;
40
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
41

    
42
import org.apache.log4j.Logger;
43

    
44
public class IndexingQueue {
45

    
46
	private static Logger logMetacat = Logger.getLogger(IndexingQueue.class);
47
	//	 Map used to keep tracks of docids to be indexed
48
	private HashMap<String, IndexingQueueObject> indexingMap = new HashMap<String, IndexingQueueObject>();     
49
	private Vector<IndexingTask> currentThreads = new Vector<IndexingTask>();
50
	public Vector<String> currentDocidsBeingIndexed = new Vector<String>();
51
	private boolean metacatRunning = true;
52
	
53
	private static IndexingQueue instance = null;
54

    
55
	final static int NUMBEROFINDEXINGTHREADS;
56
	static {
57
		int numIndexingThreads = 0;
58
		try {
59
			numIndexingThreads = 
60
				Integer.parseInt(PropertyService.getProperty("database.numberOfIndexingThreads"));
61
		} catch (PropertyNotFoundException pnfe) {
62
			logMetacat.error("Could not get property in static block: " 
63
					+ pnfe.getMessage());
64
		}
65
		NUMBEROFINDEXINGTHREADS = numIndexingThreads;
66
	}
67

    
68
    private IndexingQueue() {
69
        if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.PATHQUERYENGINE)) {
70
            return;
71
        }
72
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
73
	    	IndexingTask thread = new IndexingTask();
74
	    	thread.start();
75
	    	currentThreads.add(thread);
76
	    }
77
    }
78
	
79
	public static synchronized IndexingQueue getInstance(){
80
		if (instance == null) {
81
			instance = new IndexingQueue();
82
		}
83
		return instance;
84
	}//getInstance
85

    
86
    public void add(String docid, String rev) {
87
        if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.PATHQUERYENGINE)) {
88
            return;
89
        }
90
    	add(new IndexingQueueObject(docid, rev, 0));
91
    }
92
    
93
    protected void add(IndexingQueueObject queueObject) {
94
	    synchronized (indexingMap) {
95
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
96
	    		indexingMap.put(queueObject.getDocid(), queueObject);
97
	    		indexingMap.notify();
98
	    	} else {
99
	    		IndexingQueueObject oldQueueObject = indexingMap.get(queueObject.getDocid());
100
	    		if(oldQueueObject.compareTo(queueObject) < 0){
101
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
102
		    		indexingMap.notify();  			
103
	    		}
104
	    	}
105
	    }
106
	  }
107
    
108
	public boolean getMetacatRunning(){
109
		return this.metacatRunning;
110
	}
111
	
112
	public void setMetacatRunning(boolean metacatRunning){
113
	    if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.PATHQUERYENGINE)) {
114
            return;
115
        }
116
		this.metacatRunning = metacatRunning;
117
		
118
		if(!metacatRunning){
119
			for(int count=0; count<currentThreads.size(); count++){
120
				currentThreads.get(count).metacatRunning = false;
121
				currentThreads.get(count).interrupt();
122
			}
123
		}
124
	}
125
	
126
    protected IndexingQueueObject getNext() {
127
    	IndexingQueueObject returnVal = null;
128
        synchronized (indexingMap) {
129
          while (indexingMap.isEmpty() && metacatRunning) {
130
            try {
131
            	indexingMap.wait();
132
            } catch (InterruptedException ex) {
133
              logMetacat.error("Interrupted");
134
            }
135
          }
136
          
137
          if(metacatRunning){
138
        	  String docid = indexingMap.keySet().iterator().next();
139
        	  returnVal = indexingMap.get(docid);
140
        	  indexingMap.remove(docid);
141
          }
142
        }
143
        return returnVal;
144
      }
145
    
146
    /**
147
     * Removes the Indexing Task object from the queue
148
     * for the given docid. Currently, rev is ignored
149
     * This method should be used to cancel scheduled indexing on a document
150
     * (typically if it is being deleted but indexing has not completed yet)
151
     * see http://bugzilla.ecoinformatics.org/show_bug.cgi?id=5750
152
     * @param docid the docid (without revision)
153
     * @param rev the docid's rev (ignored)
154
     */
155
    public void remove(String docid, String rev) {
156
        if(!EnabledQueryEngines.getInstance().isEnabled(EnabledQueryEngines.PATHQUERYENGINE)) {
157
            return;
158
        }
159
		synchronized (indexingMap) {
160
			if (indexingMap.containsKey(docid)) {
161
				logMetacat.debug("Removing indexing queue task for docid: " + docid);
162
				indexingMap.remove(docid);
163
			}
164
		}
165
	}
166

    
167
}
168

    
169
class IndexingTask extends Thread {
170
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
171
  	  
172
  	  
173
      protected final static long MAXIMUMINDEXDELAY;
174
      static {
175
    	  long maxIndexDelay = 0;
176
    	  try {
177
    		  maxIndexDelay = 
178
    			  Integer.parseInt(PropertyService.getProperty("database.maximumIndexDelay")); 
179
    	  } catch (PropertyNotFoundException pnfe) {
180
    		  System.err.println("Could not get property in static block: " 
181
  					+ pnfe.getMessage());
182
    	  }
183
    	  MAXIMUMINDEXDELAY = maxIndexDelay;
184
      }
185
      
186
      protected boolean metacatRunning = true;
187
      	
188
	  public void run() {
189
	    while (metacatRunning) {
190
	      // blocks until job
191
	      IndexingQueueObject indexQueueObject = 
192
	    	  IndexingQueue.getInstance().getNext();
193

    
194
	      if(indexQueueObject != null){
195
	    	  if(!IndexingQueue.getInstance().
196
	    			currentDocidsBeingIndexed.contains(indexQueueObject.getDocid())){
197
    		  try {
198
    			  IndexingQueue.getInstance().
199
    			  		currentDocidsBeingIndexed.add(indexQueueObject.getDocid());
200
    		      String docid = indexQueueObject.getDocid() + "." + indexQueueObject.getRev();
201
    			  if(checkDocumentTable(docid, "xml_documents")){
202
    				  logMetacat.warn("Calling buildIndex for " + docid);
203
    				  DocumentImpl doc = new DocumentImpl(docid, false);
204
    				  doc.buildIndex();
205
    				  logMetacat.warn("finish building index for doicd "+docid);
206
    			  } else {
207
    				  logMetacat.warn("Couldn't find the docid:" + docid 
208
	                			+ " in xml_documents table");
209
	                  sleep(MAXIMUMINDEXDELAY);
210
	                  throw(new Exception("Couldn't find the docid:" + docid 
211
	                			+ " in xml_documents table"));
212
    			  }
213
    		  	} catch (Exception e) {
214
    			  logMetacat.warn("Exception: " + e);
215
    			  e.printStackTrace();
216
	        
217
    			  if(indexQueueObject.getCount() < 25){
218
    				  indexQueueObject.setCount(indexQueueObject.getCount()+1);
219
    				  // add the docid back to the list
220
    				  IndexingQueue.getInstance().add(indexQueueObject);
221
    			  } else {
222
    				  logMetacat.fatal("Docid " + indexQueueObject.getDocid() 
223
	        			+ " has been inserted to IndexingQueue "
224
	        			+ "more than 25 times. Not adding the docid to"
225
	        			+ " the queue again.");
226
    			  }
227
    		  	} finally {
228
    			  	IndexingQueue.getInstance().currentDocidsBeingIndexed
229
    			  		.remove(indexQueueObject.getDocid());	    	  
230
    		  	}
231
	    	  } else {
232
	    		  indexQueueObject.setCount(indexQueueObject.getCount()+1);
233
	    		  IndexingQueue.getInstance().add(indexQueueObject);    		  
234
	    	  }
235
	      }
236
	    }
237
	  }
238
	  
239
      private boolean checkDocumentTable(String docid, String tablename) throws Exception{
240
	        DBConnection dbConn = null;
241
	        int serialNumber = -1;
242
	        boolean inxmldoc = false;
243
            
244
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
245
	        int rev = Integer.parseInt(revision);
246
	        docid = docid.substring(0,docid.lastIndexOf("."));
247

    
248
	        logMetacat.info("Checking if document exists in xml_documents: docid is " 
249
	        		+ docid + " and revision is " + revision);
250

    
251
	        try {
252
	            // Opening separate db connection for writing XML Index
253
	            dbConn = DBConnectionPool
254
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
255
	            serialNumber = dbConn.getCheckOutSerialNumber();
256

    
257
	            String xmlDocumentsCheck = "SELECT distinct docid FROM " + tablename
258
	                        + " WHERE docid = ? " 
259
	                        + " AND rev = ?";
260

    
261
                PreparedStatement xmlDocCheck = dbConn.prepareStatement(xmlDocumentsCheck);
262
                xmlDocCheck.setString(1, docid);
263
                xmlDocCheck.setInt(2, rev);
264
                // Increase usage count
265
                dbConn.increaseUsageCount(1);
266
	            xmlDocCheck.execute();
267
	            ResultSet doccheckRS = xmlDocCheck.getResultSet();
268
	            boolean tableHasRows = doccheckRS.next();
269
	            if (tableHasRows) {
270
	            	inxmldoc = true;
271
	            }
272
	            doccheckRS.close();
273
	            xmlDocCheck.close();
274
	        } catch (SQLException e) {
275
	        	   e.printStackTrace();
276
	        } finally {
277
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
278
	        }//finally
279
	        
280
	        return inxmldoc;
281
      }
282
	}
283

    
284
class IndexingQueueObject implements Comparable{
285
	// the docid of the document to be indexed. 
286
	private String docid;
287
	// the docid of the document to be indexed. 
288
	private String rev;
289
	// the count of number of times the document has been in the queue
290
	private int count;
291
	
292
	IndexingQueueObject(String docid, String rev, int count){
293
		this.docid = docid;
294
		this.rev = rev;
295
		this.count = count;
296
	}
297
	
298
	public int getCount(){
299
		return count;
300
	}
301

    
302
	public String getDocid(){
303
		return docid;
304
	}
305

    
306
	public String getRev(){
307
		return rev;
308
	}
309

    
310
	public void setCount(int count){
311
		this.count = count;
312
	}
313
	
314
	public void setDocid(String docid){
315
		this.docid = docid;
316
	}
317

    
318
	public void setRev(String rev){
319
		this.rev = rev;
320
	}
321
	
322
	public int compareTo(Object o){
323
		if(o instanceof IndexingQueueObject){
324
			int revision = Integer.parseInt(rev);
325
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
326
			
327
			if(revision == oRevision) {
328
				return 0;
329
			} else if (revision > oRevision) {
330
				return 1;
331
			} else {
332
				return -1;
333
			}
334
		} else {
335
			throw new java.lang.ClassCastException();
336
		}
337
	}
338
}
(37-37/64)