Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *    Release: @release@
8
 *
9
 *   '$Author: sgarg $'
10
 *     '$Date: 2005-11-11 10:00:54 -0800 (Fri, 11 Nov 2005) $'
11
 * '$Revision: 2733 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27

    
28
package edu.ucsb.nceas.metacat;
29

    
30
import java.sql.PreparedStatement;
31
import java.sql.ResultSet;
32
import java.sql.SQLException;
33
import java.util.Vector;
34
import java.util.HashMap;
35
import java.lang.Comparable;
36
import edu.ucsb.nceas.metacat.MetaCatUtil;
37
import org.apache.log4j.Logger;
38

    
39
public class IndexingQueue {
40

    
41
	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
42
	//	 Map used to keep tracks of docids to be indexed
43
	private HashMap indexingMap = new HashMap();     
44
	private Vector currentThreads = new Vector();
45
	public Vector currentDocidsBeingIndexed = new Vector();
46
	
47
	private static IndexingQueue instance = null;
48

    
49
	final static int NUMBEROFINDEXINGTHREADS = 
50
		Integer.parseInt(MetaCatUtil.getOption("numberOfIndexingThreads"));
51
	private int sleepTime = 2000; 
52

    
53
    public IndexingQueue() {
54
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
55
	      Thread thread = new IndexingTask();
56
	      thread.start();
57
	    }
58
    }
59
	
60
	public static synchronized IndexingQueue getInstance(){
61
		if (instance == null) {
62
			instance = new IndexingQueue();
63
		}
64
		return instance;
65
	}//getInstance
66

    
67
    public void add(String docid, String rev) {
68
    	add(new IndexingQueueObject(docid, rev, 0));
69
    }
70
    
71
    protected void add(IndexingQueueObject queueObject) {
72
	    synchronized (indexingMap) {
73
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
74
	    		indexingMap.put(queueObject.getDocid(), queueObject);
75
	    		indexingMap.notify();
76
	    	} else {
77
	    		IndexingQueueObject oldQueueObject = 
78
	    			(IndexingQueueObject) indexingMap.get(queueObject.getDocid());
79
	    		if(oldQueueObject.compareTo(queueObject) < 0){
80
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
81
		    		indexingMap.notify();  			
82
	    		}
83
	    	}
84
	    }
85
	  }
86
    
87
	
88
    protected IndexingQueueObject getNext() {
89
    	IndexingQueueObject returnVal = null;
90
        synchronized (indexingMap) {
91
          while (indexingMap.isEmpty()) {
92
            try {
93
            	indexingMap.wait();
94
            } catch (InterruptedException ex) {
95
              System.err.println("Interrupted");
96
            }
97
          }
98
          String docid = (String) indexingMap.keySet().iterator().next();
99
          returnVal = (IndexingQueueObject)indexingMap.get(docid);
100
          indexingMap.remove(docid);
101
        }
102
        return returnVal;
103
      }
104

    
105
}
106

    
107
class IndexingTask extends Thread {
108
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
109
      protected final long MAXIMUMINDEXDELAY = Integer.
110
      	parseInt(MetaCatUtil.getOption("maximumIndexDelay"));;
111

    
112
	  public void run() {
113
	    while (true) {
114
	      // blocks until job
115
	      IndexingQueueObject returnVal = 
116
	    	  IndexingQueue.getInstance().getNext();
117

    
118
    	  if(!IndexingQueue.getInstance().
119
	    			currentDocidsBeingIndexed.contains(returnVal.getDocid())){
120
    		  try {
121
    			  IndexingQueue.getInstance().
122
    			  		currentDocidsBeingIndexed.add(returnVal.getDocid());
123
    		      String docid = returnVal.getDocid() + "." + returnVal.getRev();
124
    			  checkDocumentTable(docid, "xml_documents");
125
    			  DocumentImpl doc = new DocumentImpl(docid, false);
126
    			  logMetacat.warn("Calling buildIndex for " + docid);
127
    			  doc.buildIndex();
128
    		  } catch (Exception e) {
129
    			  logMetacat.warn("Exception: " + e);
130
    			  e.printStackTrace();
131
	        
132
    			  if(returnVal.getCount() < 25){
133
    				  returnVal.setCount(returnVal.getCount()+1);
134
    				  // add the docid back to the list
135
    				  IndexingQueue.getInstance().add(returnVal);
136
    			  } else {
137
    				  logMetacat.fatal("Docid " + returnVal.getDocid() 
138
	        			+ " has been inserted to IndexingQueue "
139
	        			+ "more than 25 times. Not adding the docid to"
140
	        			+ " the queue again.");
141
    			  }
142
    		  } finally {
143
    			  IndexingQueue.getInstance().currentDocidsBeingIndexed
144
	    	  			.remove(returnVal.getDocid());	    	  
145
    		  }
146
    	  } else {
147
				returnVal.setCount(returnVal.getCount()+1);
148
				IndexingQueue.getInstance().add(returnVal);    		  
149
    	  }
150
	    }
151
	  }
152
	  
153
      private void checkDocumentTable(String docid, String tablename) throws Exception{
154
	        DBConnection dbConn = null;
155
	        int serialNumber = -1;
156

    
157
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
158
	        docid = docid.substring(0,docid.lastIndexOf("."));
159

    
160
	        logMetacat.warn("docid is " + docid 
161
	        		+ " and revision is " + revision);
162

    
163
	        try {
164
	            // Opening separate db connection for writing XML Index
165
	            dbConn = DBConnectionPool
166
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
167
	            serialNumber = dbConn.getCheckOutSerialNumber();
168

    
169
	            // the following while loop construct checks to make sure that
170
	            // the docid of the document that we are trying to index is already
171
	            // in the xml_documents table. if this is not the case, the foreign
172
	            // key relationship between xml_documents and xml_index is
173
	            // temporarily broken causing multiple problems.
174
	            boolean inxmldoc = false;
175
	            long startTime = System.currentTimeMillis();
176
	            while (!inxmldoc) {
177
	                String xmlDocumentsCheck = "select distinct docid from " + tablename
178
	                        + " where docid ='"
179
	                        + docid
180
	                        + "' and "
181
	                        + " rev ='"
182
	                        + revision + "'";
183

    
184
	                PreparedStatement xmlDocCheck = dbConn
185
	                        .prepareStatement(xmlDocumentsCheck);
186
	                // Increase usage count
187
	                dbConn.increaseUsageCount(1);
188
	                xmlDocCheck.execute();
189
	                ResultSet doccheckRS = xmlDocCheck.getResultSet();
190
	                boolean tableHasRows = doccheckRS.next();
191
	                if (tableHasRows) {
192
	                    inxmldoc = true;
193
	                }
194
	                doccheckRS.close();
195
	                xmlDocCheck.close();
196
	                // make sure the while loop will be ended in reseaonable time
197
	                long stopTime = System.currentTimeMillis();
198
	                if ((stopTime - startTime) > MAXIMUMINDEXDELAY) { 
199
	                	logMetacat.warn("Couldn't find the docid:" + docid 
200
	                			+ " for indexing in "
201
                                + "reseaonable time!");
202
	                	throw new Exception(
203
	                        "Couldn't find the docid for index build in "
204
	                                + "reseaonable time!"); 
205
	                }
206
	            }//while
207
	        } catch (SQLException e) {
208
	        	   e.printStackTrace();
209
	        } finally {
210
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
211
	        }//finally
212

    
213
	    }
214
	}
215

    
216
class IndexingQueueObject implements Comparable{
217
	// the docid of the document to be indexed. 
218
	private String docid;
219
	// the docid of the document to be indexed. 
220
	private String rev;
221
	// the count of number of times the document has been in the queue
222
	private int count;
223
	
224
	IndexingQueueObject(String docid, String rev, int count){
225
		this.docid = docid;
226
		this.rev = rev;
227
		this.count = count;
228
	}
229
	
230
	public int getCount(){
231
		return count;
232
	}
233

    
234
	public String getDocid(){
235
		return docid;
236
	}
237

    
238
	public String getRev(){
239
		return rev;
240
	}
241

    
242
	public void setCount(int count){
243
		this.count = count;
244
	}
245
	
246
	public void setDocid(String docid){
247
		this.docid = docid;
248
	}
249

    
250
	public void setRev(String rev){
251
		this.rev = rev;
252
	}
253
	
254
	public int compareTo(Object o){
255
		if(o instanceof IndexingQueueObject){
256
			int revision = Integer.parseInt(rev);
257
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
258
			
259
			if(revision == oRevision) {
260
				return 0;
261
			} else if (revision > oRevision) {
262
				return 1;
263
			} else {
264
				return -1;
265
			}
266
		} else {
267
			throw new java.lang.ClassCastException();
268
		}
269
	}
270
}
(40-40/65)