Revision 2733
Added by sgarg about 19 years ago
src/edu/ucsb/nceas/metacat/IndexingQueue.java | ||
---|---|---|
31 | 31 |
import java.sql.ResultSet; |
32 | 32 |
import java.sql.SQLException; |
33 | 33 |
import java.util.Vector; |
34 |
import java.util.HashMap; |
|
35 |
import java.lang.Comparable; |
|
34 | 36 |
import edu.ucsb.nceas.metacat.MetaCatUtil; |
35 | 37 |
import org.apache.log4j.Logger; |
36 | 38 |
|
37 | 39 |
public class IndexingQueue { |
38 | 40 |
|
39 | 41 |
private Logger logMetacat = Logger.getLogger(IndexingQueue.class); |
40 |
private Vector indexingQueue = new Vector(); |
|
42 |
// Map used to keep tracks of docids to be indexed |
|
43 |
private HashMap indexingMap = new HashMap(); |
|
41 | 44 |
private Vector currentThreads = new Vector(); |
45 |
public Vector currentDocidsBeingIndexed = new Vector(); |
|
42 | 46 |
|
43 | 47 |
private static IndexingQueue instance = null; |
44 | 48 |
|
... | ... | |
60 | 64 |
return instance; |
61 | 65 |
}//getInstance |
62 | 66 |
|
63 |
public void add(String docid) { |
|
64 |
synchronized (indexingQueue) { |
|
65 |
indexingQueue.add(new IndexingQueueObject(0, docid)); |
|
66 |
indexingQueue.notify(); |
|
67 |
} |
|
68 |
} |
|
67 |
public void add(String docid, String rev) { |
|
68 |
add(new IndexingQueueObject(docid, rev, 0)); |
|
69 |
} |
|
69 | 70 |
|
70 |
public void add(IndexingQueueObject queueObject) { |
|
71 |
synchronized (indexingQueue) { |
|
72 |
indexingQueue.add(queueObject); |
|
73 |
indexingQueue.notify(); |
|
71 |
protected void add(IndexingQueueObject queueObject) { |
|
72 |
synchronized (indexingMap) { |
|
73 |
if(!indexingMap.containsKey(queueObject.getDocid())){ |
|
74 |
indexingMap.put(queueObject.getDocid(), queueObject); |
|
75 |
indexingMap.notify(); |
|
76 |
} else { |
|
77 |
IndexingQueueObject oldQueueObject = |
|
78 |
(IndexingQueueObject) indexingMap.get(queueObject.getDocid()); |
|
79 |
if(oldQueueObject.compareTo(queueObject) < 0){ |
|
80 |
indexingMap.put(queueObject.getDocid(), queueObject); |
|
81 |
indexingMap.notify(); |
|
82 |
} |
|
83 |
} |
|
74 | 84 |
} |
75 | 85 |
} |
76 | 86 |
|
77 | 87 |
|
78 | 88 |
protected IndexingQueueObject getNext() { |
79 | 89 |
IndexingQueueObject returnVal = null; |
80 |
synchronized (indexingQueue) {
|
|
81 |
while (indexingQueue.isEmpty()) {
|
|
90 |
synchronized (indexingMap) {
|
|
91 |
while (indexingMap.isEmpty()) {
|
|
82 | 92 |
try { |
83 |
indexingQueue.wait();
|
|
93 |
indexingMap.wait();
|
|
84 | 94 |
} catch (InterruptedException ex) { |
85 | 95 |
System.err.println("Interrupted"); |
86 | 96 |
} |
87 | 97 |
} |
88 |
returnVal = (IndexingQueueObject) indexingQueue.get(0); |
|
89 |
indexingQueue.remove(0); |
|
98 |
String docid = (String) indexingMap.keySet().iterator().next(); |
|
99 |
returnVal = (IndexingQueueObject)indexingMap.get(docid); |
|
100 |
indexingMap.remove(docid); |
|
90 | 101 |
} |
91 | 102 |
return returnVal; |
92 | 103 |
} |
... | ... | |
95 | 106 |
|
96 | 107 |
class IndexingTask extends Thread { |
97 | 108 |
private Logger logMetacat = Logger.getLogger(IndexingTask.class); |
98 |
protected final long INDEXDELAY = 5000; |
|
109 |
protected final long MAXIMUMINDEXDELAY = Integer. |
|
110 |
parseInt(MetaCatUtil.getOption("maximumIndexDelay"));; |
|
99 | 111 |
|
100 | 112 |
public void run() { |
101 | 113 |
while (true) { |
102 | 114 |
// blocks until job |
103 | 115 |
IndexingQueueObject returnVal = |
104 | 116 |
IndexingQueue.getInstance().getNext(); |
105 |
String docid = returnVal.getDocid(); |
|
106 |
|
|
107 |
try { |
|
108 |
checkDocumentTable(docid); |
|
109 |
DocumentImpl doc = new DocumentImpl(docid, false); |
|
110 |
logMetacat.warn("Calling buildIndex for " + docid); |
|
111 |
doc.buildIndex(); |
|
112 |
} catch (Exception e) { |
|
113 |
logMetacat.warn("Exception: " + e); |
|
114 |
e.printStackTrace(); |
|
117 |
|
|
118 |
if(!IndexingQueue.getInstance(). |
|
119 |
currentDocidsBeingIndexed.contains(returnVal.getDocid())){ |
|
120 |
try { |
|
121 |
IndexingQueue.getInstance(). |
|
122 |
currentDocidsBeingIndexed.add(returnVal.getDocid()); |
|
123 |
String docid = returnVal.getDocid() + "." + returnVal.getRev(); |
|
124 |
checkDocumentTable(docid, "xml_documents"); |
|
125 |
DocumentImpl doc = new DocumentImpl(docid, false); |
|
126 |
logMetacat.warn("Calling buildIndex for " + docid); |
|
127 |
doc.buildIndex(); |
|
128 |
} catch (Exception e) { |
|
129 |
logMetacat.warn("Exception: " + e); |
|
130 |
e.printStackTrace(); |
|
115 | 131 |
|
116 |
if(returnVal.getCount() < 25){
|
|
117 |
returnVal.setCount(returnVal.getCount()+1);
|
|
118 |
// add the docid back to the list
|
|
119 |
IndexingQueue.getInstance().add(docid);
|
|
120 |
} else {
|
|
121 |
logMetacat.fatal("Docid " + returnVal.getDocid()
|
|
132 |
if(returnVal.getCount() < 25){
|
|
133 |
returnVal.setCount(returnVal.getCount()+1);
|
|
134 |
// add the docid back to the list
|
|
135 |
IndexingQueue.getInstance().add(returnVal);
|
|
136 |
} else {
|
|
137 |
logMetacat.fatal("Docid " + returnVal.getDocid()
|
|
122 | 138 |
+ " has been inserted to IndexingQueue " |
123 |
+ "more than 25 times."); |
|
124 |
} |
|
125 |
} |
|
139 |
+ "more than 25 times. Not adding the docid to" |
|
140 |
+ " the queue again."); |
|
141 |
} |
|
142 |
} finally { |
|
143 |
IndexingQueue.getInstance().currentDocidsBeingIndexed |
|
144 |
.remove(returnVal.getDocid()); |
|
145 |
} |
|
146 |
} else { |
|
147 |
returnVal.setCount(returnVal.getCount()+1); |
|
148 |
IndexingQueue.getInstance().add(returnVal); |
|
149 |
} |
|
126 | 150 |
} |
127 | 151 |
} |
128 | 152 |
|
129 |
private void checkDocumentTable(String docid) throws Exception{ |
|
153 |
private void checkDocumentTable(String docid, String tablename) throws Exception{
|
|
130 | 154 |
DBConnection dbConn = null; |
131 | 155 |
int serialNumber = -1; |
132 | 156 |
|
... | ... | |
150 | 174 |
boolean inxmldoc = false; |
151 | 175 |
long startTime = System.currentTimeMillis(); |
152 | 176 |
while (!inxmldoc) { |
153 |
String xmlDocumentsCheck = "select distinct docid from xml_documents"
|
|
177 |
String xmlDocumentsCheck = "select distinct docid from " + tablename
|
|
154 | 178 |
+ " where docid ='" |
155 | 179 |
+ docid |
156 | 180 |
+ "' and " |
... | ... | |
171 | 195 |
xmlDocCheck.close(); |
172 | 196 |
// make sure the while loop will be ended in reseaonable time |
173 | 197 |
long stopTime = System.currentTimeMillis(); |
174 |
if ((stopTime - startTime) > INDEXDELAY) { |
|
175 |
logMetacat.warn("Couldn't find the docid:" + docid + " for indexing in " |
|
198 |
if ((stopTime - startTime) > MAXIMUMINDEXDELAY) { |
|
199 |
logMetacat.warn("Couldn't find the docid:" + docid |
|
200 |
+ " for indexing in " |
|
176 | 201 |
+ "reseaonable time!"); |
177 | 202 |
throw new Exception( |
178 | 203 |
"Couldn't find the docid for index build in " |
... | ... | |
188 | 213 |
} |
189 | 214 |
} |
190 | 215 |
|
191 |
class IndexingQueueObject{ |
|
216 |
class IndexingQueueObject implements Comparable{
|
|
192 | 217 |
// the docid of the document to be indexed. |
193 | 218 |
private String docid; |
219 |
// the docid of the document to be indexed. |
|
220 |
private String rev; |
|
194 | 221 |
// the count of number of times the document has been in the queue |
195 | 222 |
private int count; |
196 | 223 |
|
197 |
IndexingQueueObject(int count, String docid){ |
|
224 |
IndexingQueueObject(String docid, String rev, int count){ |
|
225 |
this.docid = docid; |
|
226 |
this.rev = rev; |
|
198 | 227 |
this.count = count; |
199 |
this.docid = docid; |
|
200 | 228 |
} |
201 | 229 |
|
202 | 230 |
public int getCount(){ |
... | ... | |
207 | 235 |
return docid; |
208 | 236 |
} |
209 | 237 |
|
238 |
public String getRev(){ |
|
239 |
return rev; |
|
240 |
} |
|
241 |
|
|
210 | 242 |
public void setCount(int count){ |
211 | 243 |
this.count = count; |
212 | 244 |
} |
... | ... | |
214 | 246 |
public void setDocid(String docid){ |
215 | 247 |
this.docid = docid; |
216 | 248 |
} |
249 |
|
|
250 |
public void setRev(String rev){ |
|
251 |
this.rev = rev; |
|
252 |
} |
|
253 |
|
|
254 |
public int compareTo(Object o){ |
|
255 |
if(o instanceof IndexingQueueObject){ |
|
256 |
int revision = Integer.parseInt(rev); |
|
257 |
int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev()); |
|
258 |
|
|
259 |
if(revision == oRevision) { |
|
260 |
return 0; |
|
261 |
} else if (revision > oRevision) { |
|
262 |
return 1; |
|
263 |
} else { |
|
264 |
return -1; |
|
265 |
} |
|
266 |
} else { |
|
267 |
throw new java.lang.ClassCastException(); |
|
268 |
} |
|
269 |
} |
|
217 | 270 |
} |
Also available in: Unified diff
A version with less number of bugs.