Project

General

Profile

« Previous | Next » 

Revision 2732

Added by sgarg over 18 years ago

New code for run an indexing queue and indexing thread which runs every 24 hours

View differences:

src/edu/ucsb/nceas/metacat/IndexingQueue.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27

  
28
package edu.ucsb.nceas.metacat;
29

  
30
import java.sql.PreparedStatement;
31
import java.sql.ResultSet;
32
import java.sql.SQLException;
33
import java.util.Vector;
34
import edu.ucsb.nceas.metacat.MetaCatUtil;
35
import org.apache.log4j.Logger;
36

  
37
public class IndexingQueue {
38

  
39
	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
40
	private Vector indexingQueue = new Vector();
41
	private Vector currentThreads = new Vector();
42
	
43
	private static IndexingQueue instance = null;
44

  
45
	final static int NUMBEROFINDEXINGTHREADS = 
46
		Integer.parseInt(MetaCatUtil.getOption("numberOfIndexingThreads"));
47
	private int sleepTime = 2000; 
48

  
49
    public IndexingQueue() {
50
	    for (int i = 0; i < NUMBEROFINDEXINGTHREADS; i++) {
51
	      Thread thread = new IndexingTask();
52
	      thread.start();
53
	    }
54
    }
55
	
56
	public static synchronized IndexingQueue getInstance(){
57
		if (instance == null) {
58
			instance = new IndexingQueue();
59
		}
60
		return instance;
61
	}//getInstance
62

  
63
    public void add(String docid) {
64
	    synchronized (indexingQueue) {
65
	    	indexingQueue.add(new IndexingQueueObject(0, docid));
66
	    	indexingQueue.notify();
67
	    }
68
	  }
69
    
70
    public void add(IndexingQueueObject queueObject) {
71
	    synchronized (indexingQueue) {
72
	    	indexingQueue.add(queueObject);
73
	    	indexingQueue.notify();
74
	    }
75
	  }
76
    
77
	
78
    protected IndexingQueueObject getNext() {
79
    	IndexingQueueObject returnVal = null;
80
        synchronized (indexingQueue) {
81
          while (indexingQueue.isEmpty()) {
82
            try {
83
            	indexingQueue.wait();
84
            } catch (InterruptedException ex) {
85
              System.err.println("Interrupted");
86
            }
87
          }
88
          returnVal = (IndexingQueueObject) indexingQueue.get(0);
89
          indexingQueue.remove(0);
90
        }
91
        return returnVal;
92
      }
93

  
94
}
95

  
96
class IndexingTask extends Thread {
97
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
98
      protected final long INDEXDELAY = 5000;
99

  
100
	  public void run() {
101
	    while (true) {
102
	      // blocks until job
103
	      IndexingQueueObject returnVal = 
104
	    	  IndexingQueue.getInstance().getNext();
105
	      String docid = returnVal.getDocid();
106
	      
107
	      try {
108
	        checkDocumentTable(docid);
109
	    	DocumentImpl doc = new DocumentImpl(docid, false);
110
	    	logMetacat.warn("Calling buildIndex for " + docid);
111
	    	doc.buildIndex();
112
	      } catch (Exception e) {
113
	        logMetacat.warn("Exception: " + e);
114
	        e.printStackTrace();
115
	        
116
	        if(returnVal.getCount() < 25){
117
	        	returnVal.setCount(returnVal.getCount()+1);
118
	        	// add the docid back to the list
119
	        	IndexingQueue.getInstance().add(docid);
120
	        } else {
121
	        	logMetacat.fatal("Docid " + returnVal.getDocid() 
122
	        			+ " has been inserted to IndexingQueue "
123
	        			+ "more than 25 times.");
124
	        }
125
	      }
126
	    }
127
	  }
128
	  
129
      private void checkDocumentTable(String docid) throws Exception{
130
	        DBConnection dbConn = null;
131
	        int serialNumber = -1;
132

  
133
	        String revision = docid.substring(docid.lastIndexOf(".")+1,docid.length());
134
	        docid = docid.substring(0,docid.lastIndexOf("."));
135

  
136
	        logMetacat.warn("docid is " + docid 
137
	        		+ " and revision is " + revision);
138

  
139
	        try {
140
	            // Opening separate db connection for writing XML Index
141
	            dbConn = DBConnectionPool
142
	                    .getDBConnection("DBSAXHandler.checkDocumentTable");
143
	            serialNumber = dbConn.getCheckOutSerialNumber();
144

  
145
	            // the following while loop construct checks to make sure that
146
	            // the docid of the document that we are trying to index is already
147
	            // in the xml_documents table. if this is not the case, the foreign
148
	            // key relationship between xml_documents and xml_index is
149
	            // temporarily broken causing multiple problems.
150
	            boolean inxmldoc = false;
151
	            long startTime = System.currentTimeMillis();
152
	            while (!inxmldoc) {
153
	                String xmlDocumentsCheck = "select distinct docid from xml_documents"
154
	                        + " where docid ='"
155
	                        + docid
156
	                        + "' and "
157
	                        + " rev ='"
158
	                        + revision + "'";
159

  
160
	                PreparedStatement xmlDocCheck = dbConn
161
	                        .prepareStatement(xmlDocumentsCheck);
162
	                // Increase usage count
163
	                dbConn.increaseUsageCount(1);
164
	                xmlDocCheck.execute();
165
	                ResultSet doccheckRS = xmlDocCheck.getResultSet();
166
	                boolean tableHasRows = doccheckRS.next();
167
	                if (tableHasRows) {
168
	                    inxmldoc = true;
169
	                }
170
	                doccheckRS.close();
171
	                xmlDocCheck.close();
172
	                // make sure the while loop will be ended in reseaonable time
173
	                long stopTime = System.currentTimeMillis();
174
	                if ((stopTime - startTime) > INDEXDELAY) { 
175
	                	logMetacat.warn("Couldn't find the docid:" + docid + " for indexing in "
176
                                + "reseaonable time!");
177
	                	throw new Exception(
178
	                        "Couldn't find the docid for index build in "
179
	                                + "reseaonable time!"); 
180
	                }
181
	            }//while
182
	        } catch (SQLException e) {
183
	        	   e.printStackTrace();
184
	        } finally {
185
	            DBConnectionPool.returnDBConnection(dbConn, serialNumber);
186
	        }//finally
187

  
188
	    }
189
	}
190

  
191
class IndexingQueueObject{
192
	// the docid of the document to be indexed. 
193
	private String docid;
194
	// the count of number of times the document has been in the queue
195
	private int count;
196
	
197
	IndexingQueueObject(int count, String docid){
198
		this.count = count;
199
		this.docid = docid;
200
	}
201
	
202
	public int getCount(){
203
		return count;
204
	}
205

  
206
	public String getDocid(){
207
		return docid;
208
	}
209

  
210
	public void setCount(int count){
211
		this.count = count;
212
	}
213
	
214
	public void setDocid(String docid){
215
		this.docid = docid;
216
	}
217
}
0 218

  
src/edu/ucsb/nceas/metacat/IndexingTimerTask.java
1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that tracks sessions for MetaCatServlet users.
4
 *  Copyright: 2000 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Matt Jones
7
 *    Release: @release@
8
 *
9
 *   '$Author$'
10
 *     '$Date$'
11
 * '$Revision$'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27

  
28
package edu.ucsb.nceas.metacat;
29

  
30
import java.sql.PreparedStatement;
31
import java.sql.ResultSet;
32
import java.sql.SQLException;
33
import java.util.TimerTask;
34

  
35
import edu.ucsb.nceas.metacat.MetaCatUtil;
36
import org.apache.log4j.Logger;
37

  
38
public class IndexingTimerTask extends TimerTask{
39

  
40
	 private Logger logMetacat = Logger.getLogger(IndexingTimerTask.class);
41

  
42
	 int count = 0;
43
	 
44
	  /*
45
	     * Run a separate thread to build the XML index for this document.  This
46
	     * thread is run asynchronously in order to more quickly return control to
47
	     * the submitting user.  The run method checks to see if the document has
48
	     * been fully inserted before trying to update the xml_index table.
49
	     */
50
	    public void run()
51
	    {
52
	    	DBConnection dbConn = null;
53
	    	int serialNumber = 0;
54
	    	try{
55
	    		logMetacat.warn("Running indexing timer task");
56
	    		
57
	    		dbConn = DBConnectionPool.getDBConnection("IndexingThread.run");
58
	    		serialNumber = dbConn.getCheckOutSerialNumber();
59
	    		String xmlDocumentsCheck = 
60
	    			MetaCatUtil.dbAdapter.getLeftJoinQuery("a.docid, a.rev", "xml_documents", 
61
	    					"xml_index", "a.docid = b.docid", "b.docid is NULL AND "
62
	    					+ "(a.doctype like 'eml://ecoinformatics.org/eml-2.0.0' "
63
	    					+ "or a.doctype like 'eml://ecoinformatics.org/eml-2.0.1')");
64
	    		
65
	    		PreparedStatement xmlDocCheck = dbConn
66
	    		.prepareStatement(xmlDocumentsCheck);
67
	    		
68
	    		// Increase usage count
69
	    		dbConn.increaseUsageCount(1);
70
	    		xmlDocCheck.execute();
71
	    		ResultSet rs = xmlDocCheck.getResultSet();
72
	    		
73
	    		boolean tableHasRows = rs.next();
74
	    		while (tableHasRows) {
75
	    			String docid = rs.getString(1);
76
	    			String rev = rs.getString(2);
77
	    			
78
	    			IndexingQueue.getInstance().add(docid + "." + rev);
79

  
80
	    			tableHasRows = rs.next();
81
	            }
82
	                	
83
	    		rs.close();
84
	    		xmlDocCheck.close();
85
	    		
86
	       	} catch (SQLException se){
87
	       				se.printStackTrace();
88
		        		
89
		    } catch (Exception e){
90
	        		e.printStackTrace();
91
	        		
92
	        }finally {
93
	                DBConnectionPool.returnDBConnection(dbConn, serialNumber);
94
	        }
95
		      	
96
			logMetacat.warn("Indexing timer task returning");		
97
			count++;
98
	    }
99
}
0 100

  

Also available in: Unified diff