Project

General

Profile

1 2732 sgarg
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *
8
 *   '$Author$'
9
 *     '$Date$'
10
 * '$Revision$'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26
27
package edu.ucsb.nceas.metacat;
28
29
import java.sql.PreparedStatement;
30
import java.sql.ResultSet;
31
import java.sql.SQLException;
32
import java.util.Vector;
33 2733 sgarg
import java.util.HashMap;
34
import java.lang.Comparable;
35 4080 daigle
36 5015 daigle
import edu.ucsb.nceas.metacat.database.DBConnection;
37
import edu.ucsb.nceas.metacat.database.DBConnectionPool;
38 5030 daigle
import edu.ucsb.nceas.metacat.properties.PropertyService;
39 4698 daigle
import edu.ucsb.nceas.metacat.util.MetacatUtil;
40 4080 daigle
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
41
42 2732 sgarg
import org.apache.log4j.Logger;
43
44
public class IndexingQueue {
45
46
	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
47 2733 sgarg
	//	 Map used to keep tracks of docids to be indexed
48
	private HashMap indexingMap = new HashMap();
49 2732 sgarg
	private Vector currentThreads = new Vector();
50 2733 sgarg
	public Vector currentDocidsBeingIndexed = new Vector();
51 2901 sgarg
	private boolean metacatRunning = true;
52 2732 sgarg
53
	private static IndexingQueue instance = null;
54
55 4080 daigle
	final static int NUMBEROFINDEXINGTHREADS;
56
	static {
57
		int numIndexingThreads = 0;
58
		try {
59
			numIndexingThreads =
60
				Integer.parseInt(PropertyService.getProperty("database.numberOfIndexingThreads"));
61
		} catch (PropertyNotFoundException pnfe) {
62
			System.err.println("Could not get property in static block: "
63
					+ pnfe.getMessage());
64
		}
65
		NUMBEROFINDEXINGTHREADS = numIndexingThreads;
66
	}
67
68
69 2732 sgarg
	private int sleepTime = 2000;
70
71 3199 tao
    private IndexingQueue() {
72 2732 sgarg
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
73
	      Thread thread = new IndexingTask();
74
	      thread.start();
75 2902 sgarg
	      currentThreads.add(thread);
76 2732 sgarg
	    }
77
    }
78
79
	public static synchronized IndexingQueue getInstance(){
80
		if (instance == null) {
81
			instance = new IndexingQueue();
82
		}
83
		return instance;
84
	}//getInstance
85
86 2733 sgarg
    public void add(String docid, String rev) {
87
    	add(new IndexingQueueObject(docid, rev, 0));
88
    }
89 2732 sgarg
90 2733 sgarg
    protected void add(IndexingQueueObject queueObject) {
91
	    synchronized (indexingMap) {
92
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
93
	    		indexingMap.put(queueObject.getDocid(), queueObject);
94
	    		indexingMap.notify();
95
	    	} else {
96
	    		IndexingQueueObject oldQueueObject =
97
	    			(IndexingQueueObject) indexingMap.get(queueObject.getDocid());
98
	    		if(oldQueueObject.compareTo(queueObject) < 0){
99
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
100
		    		indexingMap.notify();
101
	    		}
102
	    	}
103 2732 sgarg
	    }
104
	  }
105
106 2901 sgarg
	public boolean getMetacatRunning(){
107
		return this.metacatRunning;
108
	}
109 2732 sgarg
110 2901 sgarg
	public void setMetacatRunning(boolean metacatRunning){
111
		this.metacatRunning = metacatRunning;
112
113 2902 sgarg
		if(!metacatRunning){
114
			for(int count=0; count<currentThreads.size(); count++){
115
				((IndexingTask)currentThreads.get(count)).metacatRunning = false;
116
				((Thread)currentThreads.get(count)).interrupt();
117
			}
118 2901 sgarg
		}
119
	}
120
121 2732 sgarg
    protected IndexingQueueObject getNext() {
122
    	IndexingQueueObject returnVal = null;
123 2733 sgarg
        synchronized (indexingMap) {
124 2901 sgarg
          while (indexingMap.isEmpty() && metacatRunning) {
125 2732 sgarg
            try {
126 2733 sgarg
            	indexingMap.wait();
127 2732 sgarg
            } catch (InterruptedException ex) {
128
              System.err.println("Interrupted");
129
            }
130
          }
131 2901 sgarg
132
          if(metacatRunning){
133
        	  String docid = (String) indexingMap.keySet().iterator().next();
134
        	  returnVal = (IndexingQueueObject)indexingMap.get(docid);
135
        	  indexingMap.remove(docid);
136
          }
137 2732 sgarg
        }
138
        return returnVal;
139
      }
140
141
}
142
143
class IndexingTask extends Thread {
144
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
145 4080 daigle
146
147
      protected final static long MAXIMUMINDEXDELAY;
148
      static {
149
    	  long maxIndexDelay = 0;
150
    	  try {
151
    		  maxIndexDelay =
152
    			  Integer.parseInt(PropertyService.getProperty("database.maximumIndexDelay"));
153
    	  } catch (PropertyNotFoundException pnfe) {
154
    		  System.err.println("Could not get property in static block: "
155
  					+ pnfe.getMessage());
156
    	  }
157
    	  MAXIMUMINDEXDELAY = maxIndexDelay;
158
      }
159
160 2902 sgarg
      protected boolean metacatRunning = true;
161
162 2732 sgarg
	  public void run() {
163 2902 sgarg
	    while (metacatRunning) {
164 2732 sgarg
	      // blocks until job
165
	      IndexingQueueObject returnVal =
166
	    	  IndexingQueue.getInstance().getNext();
167 2733 sgarg
168 2901 sgarg
	      if(returnVal != null){
169
	    	  if(!IndexingQueue.getInstance().
170 2733 sgarg
	    			currentDocidsBeingIndexed.contains(returnVal.getDocid())){
171
    		  try {
172
    			  IndexingQueue.getInstance().
173
    			  		currentDocidsBeingIndexed.add(returnVal.getDocid());
174
    		      String docid = returnVal.getDocid() + "." + returnVal.getRev();
175 2764 sgarg
    			  if(checkDocumentTable(docid, "xml_documents")){
176
    				  logMetacat.warn("Calling buildIndex for " + docid);
177
    				  DocumentImpl doc = new DocumentImpl(docid, false);
178
    				  doc.buildIndex();
179 3199 tao
    				  logMetacat.warn("finish building index for doicd "+docid);
180 2764 sgarg
    			  } else {
181
    				  logMetacat.warn("Couldn't find the docid:" + docid
182
	                			+ " in xml_documents table");
183
	                  sleep(MAXIMUMINDEXDELAY);
184
	                  throw(new Exception("Couldn't find the docid:" + docid
185
	                			+ " in xml_documents table"));
186
    			  }
187 2901 sgarg
    		  	} catch (Exception e) {
188 2733 sgarg
    			  logMetacat.warn("Exception: " + e);
189
    			  e.printStackTrace();
190 2732 sgarg
191 2733 sgarg
    			  if(returnVal.getCount() < 25){
192
    				  returnVal.setCount(returnVal.getCount()+1);
193
    				  // add the docid back to the list
194
    				  IndexingQueue.getInstance().add(returnVal);
195
    			  } else {
196
    				  logMetacat.fatal("Docid " + returnVal.getDocid()
197 2732 sgarg
	        			+ " has been inserted to IndexingQueue "
198 2733 sgarg
	        			+ "more than 25 times. Not adding the docid to"
199
	        			+ " the queue again.");
200
    			  }
201 2901 sgarg
    		  	} finally {
202
    			  	IndexingQueue.getInstance().currentDocidsBeingIndexed
203
    			  		.remove(returnVal.getDocid());
204
    		  	}
205
	    	  } else {
206
	    		  returnVal.setCount(returnVal.getCount()+1);
207
	    		  IndexingQueue.getInstance().add(returnVal);
208
	    	  }
209
	      }
210 2732 sgarg
	    }
211
	  }
212
213 2764 sgarg
      private boolean checkDocumentTable(String docid, String tablename) throws Exception{
214 2732 sgarg
	        DBConnection dbConn = null;
215
	        int serialNumber = -1;
216 2764 sgarg
	        boolean inxmldoc = false;
217
218 2732 sgarg
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
219 6595 leinfelder
	        int rev = Integer.parseInt(revision);
220 2732 sgarg
	        docid = docid.substring(0,docid.lastIndexOf("."));
221
222 4743 walbridge
	        logMetacat.info("Checking if document exists in xml_documents: docid is "
223 2764 sgarg
	        		+ docid + " and revision is " + revision);
224 2732 sgarg
225
	        try {
226
	            // Opening separate db connection for writing XML Index
227
	            dbConn = DBConnectionPool
228
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
229
	            serialNumber = dbConn.getCheckOutSerialNumber();
230
231 2764 sgarg
	            String xmlDocumentsCheck = "SELECT distinct docid FROM " + tablename
232 6595 leinfelder
	                        + " WHERE docid = ? "
233
	                        + " AND rev = ?";
234 2732 sgarg
235 2764 sgarg
                PreparedStatement xmlDocCheck = dbConn.prepareStatement(xmlDocumentsCheck);
236 6595 leinfelder
                xmlDocCheck.setString(1, docid);
237
                xmlDocCheck.setInt(2, rev);
238 2764 sgarg
                // Increase usage count
239
                dbConn.increaseUsageCount(1);
240
	            xmlDocCheck.execute();
241
	            ResultSet doccheckRS = xmlDocCheck.getResultSet();
242
	            boolean tableHasRows = doccheckRS.next();
243
	            if (tableHasRows) {
244
	            	inxmldoc = true;
245
	            }
246
	            doccheckRS.close();
247
	            xmlDocCheck.close();
248 2732 sgarg
	        } catch (SQLException e) {
249
	        	   e.printStackTrace();
250
	        } finally {
251
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
252
	        }//finally
253 2764 sgarg
254
	        return inxmldoc;
255
      }
256 2732 sgarg
	}
257
258 2733 sgarg
class IndexingQueueObject implements Comparable{
259 2732 sgarg
	// the docid of the document to be indexed.
260
	private String docid;
261 2733 sgarg
	// the docid of the document to be indexed.
262
	private String rev;
263 2732 sgarg
	// the count of number of times the document has been in the queue
264
	private int count;
265
266 2733 sgarg
	IndexingQueueObject(String docid, String rev, int count){
267
		this.docid = docid;
268
		this.rev = rev;
269 2732 sgarg
		this.count = count;
270
	}
271
272
	public int getCount(){
273
		return count;
274
	}
275
276
	public String getDocid(){
277
		return docid;
278
	}
279
280 2733 sgarg
	public String getRev(){
281
		return rev;
282
	}
283
284 2732 sgarg
	public void setCount(int count){
285
		this.count = count;
286
	}
287
288
	public void setDocid(String docid){
289
		this.docid = docid;
290
	}
291 2733 sgarg
292
	public void setRev(String rev){
293
		this.rev = rev;
294
	}
295
296
	public int compareTo(Object o){
297
		if(o instanceof IndexingQueueObject){
298
			int revision = Integer.parseInt(rev);
299
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
300
301
			if(revision == oRevision) {
302
				return 0;
303
			} else if (revision > oRevision) {
304
				return 1;
305
			} else {
306
				return -1;
307
			}
308
		} else {
309
			throw new java.lang.ClassCastException();
310
		}
311
	}
312 3077 jones
}