Project

General

Profile

1 2732 sgarg
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
28
package edu.ucsb.nceas.metacat;
29
30
import java.sql.PreparedStatement;
31
import java.sql.ResultSet;
32
import java.sql.SQLException;
33
import java.util.Vector;
34 2733 sgarg
import java.util.HashMap;
35
import java.lang.Comparable;
36 2732 sgarg
import edu.ucsb.nceas.metacat.MetaCatUtil;
37
import org.apache.log4j.Logger;
38
39
public class IndexingQueue {
40
41
	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
42 2733 sgarg
	//	 Map used to keep tracks of docids to be indexed
43
	private HashMap indexingMap = new HashMap();
44 2732 sgarg
	private Vector currentThreads = new Vector();
45 2733 sgarg
	public Vector currentDocidsBeingIndexed = new Vector();
46 2901 sgarg
	private boolean metacatRunning = true;
47 2732 sgarg
48
	private static IndexingQueue instance = null;
49
50
	final static int NUMBEROFINDEXINGTHREADS =
51
		Integer.parseInt(MetaCatUtil.getOption("numberOfIndexingThreads"));
52
	private int sleepTime = 2000;
53
54
    public IndexingQueue() {
55
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
56
	      Thread thread = new IndexingTask();
57
	      thread.start();
58 2902 sgarg
	      currentThreads.add(thread);
59 2732 sgarg
	    }
60
    }
61
62
	public static synchronized IndexingQueue getInstance(){
63
		if (instance == null) {
64
			instance = new IndexingQueue();
65
		}
66
		return instance;
67
	}//getInstance
68
69 2733 sgarg
    public void add(String docid, String rev) {
70
    	add(new IndexingQueueObject(docid, rev, 0));
71
    }
72 2732 sgarg
73 2733 sgarg
    protected void add(IndexingQueueObject queueObject) {
74
	    synchronized (indexingMap) {
75
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
76
	    		indexingMap.put(queueObject.getDocid(), queueObject);
77
	    		indexingMap.notify();
78
	    	} else {
79
	    		IndexingQueueObject oldQueueObject =
80
	    			(IndexingQueueObject) indexingMap.get(queueObject.getDocid());
81
	    		if(oldQueueObject.compareTo(queueObject) < 0){
82
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
83
		    		indexingMap.notify();
84
	    		}
85
	    	}
86 2732 sgarg
	    }
87
	  }
88
89 2901 sgarg
	public boolean getMetacatRunning(){
90
		return this.metacatRunning;
91
	}
92 2732 sgarg
93 2901 sgarg
	public void setMetacatRunning(boolean metacatRunning){
94
		this.metacatRunning = metacatRunning;
95
96 2902 sgarg
		if(!metacatRunning){
97
			for(int count=0; count<currentThreads.size(); count++){
98
				((IndexingTask)currentThreads.get(count)).metacatRunning = false;
99
				((Thread)currentThreads.get(count)).interrupt();
100
			}
101 2901 sgarg
		}
102
	}
103
104 2732 sgarg
    protected IndexingQueueObject getNext() {
105
    	IndexingQueueObject returnVal = null;
106 2733 sgarg
        synchronized (indexingMap) {
107 2901 sgarg
          while (indexingMap.isEmpty() && metacatRunning) {
108 2732 sgarg
            try {
109 2733 sgarg
            	indexingMap.wait();
110 2732 sgarg
            } catch (InterruptedException ex) {
111
              System.err.println("Interrupted");
112
            }
113
          }
114 2901 sgarg
115
          if(metacatRunning){
116
        	  String docid = (String) indexingMap.keySet().iterator().next();
117
        	  returnVal = (IndexingQueueObject)indexingMap.get(docid);
118
        	  indexingMap.remove(docid);
119
          }
120 2732 sgarg
        }
121
        return returnVal;
122
      }
123
124
}
125
126
class IndexingTask extends Thread {
127
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
128 2733 sgarg
      protected final long MAXIMUMINDEXDELAY = Integer.
129
      	parseInt(MetaCatUtil.getOption("maximumIndexDelay"));;
130 2902 sgarg
      protected boolean metacatRunning = true;
131
132 2732 sgarg
	  public void run() {
133 2902 sgarg
	    while (metacatRunning) {
134 2732 sgarg
	      // blocks until job
135
	      IndexingQueueObject returnVal =
136
	    	  IndexingQueue.getInstance().getNext();
137 2733 sgarg
138 2901 sgarg
	      if(returnVal != null){
139
	    	  if(!IndexingQueue.getInstance().
140 2733 sgarg
	    			currentDocidsBeingIndexed.contains(returnVal.getDocid())){
141
    		  try {
142
    			  IndexingQueue.getInstance().
143
    			  		currentDocidsBeingIndexed.add(returnVal.getDocid());
144
    		      String docid = returnVal.getDocid() + "." + returnVal.getRev();
145 2764 sgarg
    			  if(checkDocumentTable(docid, "xml_documents")){
146
    				  logMetacat.warn("Calling buildIndex for " + docid);
147
    				  DocumentImpl doc = new DocumentImpl(docid, false);
148
    				  doc.buildIndex();
149
    			  } else {
150
    				  logMetacat.warn("Couldn't find the docid:" + docid
151
	                			+ " in xml_documents table");
152
	                  sleep(MAXIMUMINDEXDELAY);
153
	                  throw(new Exception("Couldn't find the docid:" + docid
154
	                			+ " in xml_documents table"));
155
    			  }
156 2901 sgarg
    		  	} catch (Exception e) {
157 2733 sgarg
    			  logMetacat.warn("Exception: " + e);
158
    			  e.printStackTrace();
159 2732 sgarg
160 2733 sgarg
    			  if(returnVal.getCount() < 25){
161
    				  returnVal.setCount(returnVal.getCount()+1);
162
    				  // add the docid back to the list
163
    				  IndexingQueue.getInstance().add(returnVal);
164
    			  } else {
165
    				  logMetacat.fatal("Docid " + returnVal.getDocid()
166 2732 sgarg
	        			+ " has been inserted to IndexingQueue "
167 2733 sgarg
	        			+ "more than 25 times. Not adding the docid to"
168
	        			+ " the queue again.");
169
    			  }
170 2901 sgarg
    		  	} finally {
171
    			  	IndexingQueue.getInstance().currentDocidsBeingIndexed
172
    			  		.remove(returnVal.getDocid());
173
    		  	}
174
	    	  } else {
175
	    		  returnVal.setCount(returnVal.getCount()+1);
176
	    		  IndexingQueue.getInstance().add(returnVal);
177
	    	  }
178
	      }
179 2732 sgarg
	    }
180
	  }
181
182 2764 sgarg
      private boolean checkDocumentTable(String docid, String tablename) throws Exception{
183 2732 sgarg
	        DBConnection dbConn = null;
184
	        int serialNumber = -1;
185 2764 sgarg
	        boolean inxmldoc = false;
186
187 2732 sgarg
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
188
	        docid = docid.substring(0,docid.lastIndexOf("."));
189
190 2764 sgarg
	        logMetacat.info("Checking if document exsists in xml_documents: docid is "
191
	        		+ docid + " and revision is " + revision);
192 2732 sgarg
193
	        try {
194
	            // Opening separate db connection for writing XML Index
195
	            dbConn = DBConnectionPool
196
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
197
	            serialNumber = dbConn.getCheckOutSerialNumber();
198
199 2764 sgarg
	            String xmlDocumentsCheck = "SELECT distinct docid FROM " + tablename
200
	                        + " WHERE docid ='" + docid
201
	                        + "' AND rev ='" + revision + "'";
202 2732 sgarg
203 2764 sgarg
                PreparedStatement xmlDocCheck = dbConn.prepareStatement(xmlDocumentsCheck);
204
                // Increase usage count
205
206
                dbConn.increaseUsageCount(1);
207
	            xmlDocCheck.execute();
208
	            ResultSet doccheckRS = xmlDocCheck.getResultSet();
209
	            boolean tableHasRows = doccheckRS.next();
210
	            if (tableHasRows) {
211
	            	inxmldoc = true;
212
	            }
213
	            doccheckRS.close();
214
	            xmlDocCheck.close();
215 2732 sgarg
	        } catch (SQLException e) {
216
	        	   e.printStackTrace();
217
	        } finally {
218
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
219
	        }//finally
220 2764 sgarg
221
	        return inxmldoc;
222
      }
223 2732 sgarg
	}
224
225 2733 sgarg
class IndexingQueueObject implements Comparable{
226 2732 sgarg
	// the docid of the document to be indexed.
227
	private String docid;
228 2733 sgarg
	// the docid of the document to be indexed.
229
	private String rev;
230 2732 sgarg
	// the count of number of times the document has been in the queue
231
	private int count;
232
233 2733 sgarg
	IndexingQueueObject(String docid, String rev, int count){
234
		this.docid = docid;
235
		this.rev = rev;
236 2732 sgarg
		this.count = count;
237
	}
238
239
	public int getCount(){
240
		return count;
241
	}
242
243
	public String getDocid(){
244
		return docid;
245
	}
246
247 2733 sgarg
	public String getRev(){
248
		return rev;
249
	}
250
251 2732 sgarg
	public void setCount(int count){
252
		this.count = count;
253
	}
254
255
	public void setDocid(String docid){
256
		this.docid = docid;
257
	}
258 2733 sgarg
259
	public void setRev(String rev){
260
		this.rev = rev;
261
	}
262
263
	public int compareTo(Object o){
264
		if(o instanceof IndexingQueueObject){
265
			int revision = Integer.parseInt(rev);
266
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
267
268
			if(revision == oRevision) {
269
				return 0;
270
			} else if (revision > oRevision) {
271
				return 1;
272
			} else {
273
				return -1;
274
			}
275
		} else {
276
			throw new java.lang.ClassCastException();
277
		}
278
	}
279 2732 sgarg
}