Project

General

Profile

« Previous | Next » 

Revision 2733

Added by sgarg over 18 years ago

A version with less number of bugs.

View differences:

src/edu/ucsb/nceas/metacat/IndexingQueue.java
31 31
import java.sql.ResultSet;
32 32
import java.sql.SQLException;
33 33
import java.util.Vector;
34
import java.util.HashMap;
35
import java.lang.Comparable;
34 36
import edu.ucsb.nceas.metacat.MetaCatUtil;
35 37
import org.apache.log4j.Logger;
36 38

  
37 39
public class IndexingQueue {
38 40

  
39 41
	private Logger logMetacat = Logger.getLogger(IndexingQueue.class);
40
	private Vector indexingQueue = new Vector();
42
	//	 Map used to keep tracks of docids to be indexed
43
	private HashMap indexingMap = new HashMap();     
41 44
	private Vector currentThreads = new Vector();
45
	public Vector currentDocidsBeingIndexed = new Vector();
42 46
	
43 47
	private static IndexingQueue instance = null;
44 48

  
......
60 64
		return instance;
61 65
	}//getInstance
62 66

  
63
    public void add(String docid) {
64
	    synchronized (indexingQueue) {
65
	    	indexingQueue.add(new IndexingQueueObject(0, docid));
66
	    	indexingQueue.notify();
67
	    }
68
	  }
67
    public void add(String docid, String rev) {
68
    	add(new IndexingQueueObject(docid, rev, 0));
69
    }
69 70
    
70
    public void add(IndexingQueueObject queueObject) {
71
	    synchronized (indexingQueue) {
72
	    	indexingQueue.add(queueObject);
73
	    	indexingQueue.notify();
71
    protected void add(IndexingQueueObject queueObject) {
72
	    synchronized (indexingMap) {
73
	    	if(!indexingMap.containsKey(queueObject.getDocid())){
74
	    		indexingMap.put(queueObject.getDocid(), queueObject);
75
	    		indexingMap.notify();
76
	    	} else {
77
	    		IndexingQueueObject oldQueueObject = 
78
	    			(IndexingQueueObject) indexingMap.get(queueObject.getDocid());
79
	    		if(oldQueueObject.compareTo(queueObject) < 0){
80
	    	  		indexingMap.put(queueObject.getDocid(), queueObject);
81
		    		indexingMap.notify();  			
82
	    		}
83
	    	}
74 84
	    }
75 85
	  }
76 86
    
77 87
	
78 88
    protected IndexingQueueObject getNext() {
79 89
    	IndexingQueueObject returnVal = null;
80
        synchronized (indexingQueue) {
81
          while (indexingQueue.isEmpty()) {
90
        synchronized (indexingMap) {
91
          while (indexingMap.isEmpty()) {
82 92
            try {
83
            	indexingQueue.wait();
93
            	indexingMap.wait();
84 94
            } catch (InterruptedException ex) {
85 95
              System.err.println("Interrupted");
86 96
            }
87 97
          }
88
          returnVal = (IndexingQueueObject) indexingQueue.get(0);
89
          indexingQueue.remove(0);
98
          String docid = (String) indexingMap.keySet().iterator().next();
99
          returnVal = (IndexingQueueObject)indexingMap.get(docid);
100
          indexingMap.remove(docid);
90 101
        }
91 102
        return returnVal;
92 103
      }
......
95 106

  
96 107
class IndexingTask extends Thread {
97 108
  	  private Logger logMetacat = Logger.getLogger(IndexingTask.class);
98
      protected final long INDEXDELAY = 5000;
109
      protected final long MAXIMUMINDEXDELAY = Integer.
110
      	parseInt(MetaCatUtil.getOption("maximumIndexDelay"));;
99 111

  
100 112
	  public void run() {
101 113
	    while (true) {
102 114
	      // blocks until job
103 115
	      IndexingQueueObject returnVal = 
104 116
	    	  IndexingQueue.getInstance().getNext();
105
	      String docid = returnVal.getDocid();
106
	      
107
	      try {
108
	        checkDocumentTable(docid);
109
	    	DocumentImpl doc = new DocumentImpl(docid, false);
110
	    	logMetacat.warn("Calling buildIndex for " + docid);
111
	    	doc.buildIndex();
112
	      } catch (Exception e) {
113
	        logMetacat.warn("Exception: " + e);
114
	        e.printStackTrace();
117

  
118
    	  if(!IndexingQueue.getInstance().
119
	    			currentDocidsBeingIndexed.contains(returnVal.getDocid())){
120
    		  try {
121
    			  IndexingQueue.getInstance().
122
    			  		currentDocidsBeingIndexed.add(returnVal.getDocid());
123
    		      String docid = returnVal.getDocid() + "." + returnVal.getRev();
124
    			  checkDocumentTable(docid, "xml_documents");
125
    			  DocumentImpl doc = new DocumentImpl(docid, false);
126
    			  logMetacat.warn("Calling buildIndex for " + docid);
127
    			  doc.buildIndex();
128
    		  } catch (Exception e) {
129
    			  logMetacat.warn("Exception: " + e);
130
    			  e.printStackTrace();
115 131
	        
116
	        if(returnVal.getCount() < 25){
117
	        	returnVal.setCount(returnVal.getCount()+1);
118
	        	// add the docid back to the list
119
	        	IndexingQueue.getInstance().add(docid);
120
	        } else {
121
	        	logMetacat.fatal("Docid " + returnVal.getDocid() 
132
    			  if(returnVal.getCount() < 25){
133
    				  returnVal.setCount(returnVal.getCount()+1);
134
    				  // add the docid back to the list
135
    				  IndexingQueue.getInstance().add(returnVal);
136
    			  } else {
137
    				  logMetacat.fatal("Docid " + returnVal.getDocid() 
122 138
	        			+ " has been inserted to IndexingQueue "
123
	        			+ "more than 25 times.");
124
	        }
125
	      }
139
	        			+ "more than 25 times. Not adding the docid to"
140
	        			+ " the queue again.");
141
    			  }
142
    		  } finally {
143
    			  IndexingQueue.getInstance().currentDocidsBeingIndexed
144
	    	  			.remove(returnVal.getDocid());	    	  
145
    		  }
146
    	  } else {
147
				returnVal.setCount(returnVal.getCount()+1);
148
				IndexingQueue.getInstance().add(returnVal);    		  
149
    	  }
126 150
	    }
127 151
	  }
128 152
	  
129
      private void checkDocumentTable(String docid) throws Exception{
153
      private void checkDocumentTable(String docid, String tablename) throws Exception{
130 154
	        DBConnection dbConn = null;
131 155
	        int serialNumber = -1;
132 156

  
......
150 174
	            boolean inxmldoc = false;
151 175
	            long startTime = System.currentTimeMillis();
152 176
	            while (!inxmldoc) {
153
	                String xmlDocumentsCheck = "select distinct docid from xml_documents"
177
	                String xmlDocumentsCheck = "select distinct docid from " + tablename
154 178
	                        + " where docid ='"
155 179
	                        + docid
156 180
	                        + "' and "
......
171 195
	                xmlDocCheck.close();
172 196
	                // make sure the while loop will be ended in reseaonable time
173 197
	                long stopTime = System.currentTimeMillis();
174
	                if ((stopTime - startTime) > INDEXDELAY) { 
175
	                	logMetacat.warn("Couldn't find the docid:" + docid + " for indexing in "
198
	                if ((stopTime - startTime) > MAXIMUMINDEXDELAY) { 
199
	                	logMetacat.warn("Couldn't find the docid:" + docid 
200
	                			+ " for indexing in "
176 201
                                + "reseaonable time!");
177 202
	                	throw new Exception(
178 203
	                        "Couldn't find the docid for index build in "
......
188 213
	    }
189 214
	}
190 215

  
191
class IndexingQueueObject{
216
class IndexingQueueObject implements Comparable{
192 217
	// the docid of the document to be indexed. 
193 218
	private String docid;
219
	// the docid of the document to be indexed. 
220
	private String rev;
194 221
	// the count of number of times the document has been in the queue
195 222
	private int count;
196 223
	
197
	IndexingQueueObject(int count, String docid){
224
	IndexingQueueObject(String docid, String rev, int count){
225
		this.docid = docid;
226
		this.rev = rev;
198 227
		this.count = count;
199
		this.docid = docid;
200 228
	}
201 229
	
202 230
	public int getCount(){
......
207 235
		return docid;
208 236
	}
209 237

  
238
	public String getRev(){
239
		return rev;
240
	}
241

  
210 242
	public void setCount(int count){
211 243
		this.count = count;
212 244
	}
......
214 246
	public void setDocid(String docid){
215 247
		this.docid = docid;
216 248
	}
249

  
250
	public void setRev(String rev){
251
		this.rev = rev;
252
	}
253
	
254
	public int compareTo(Object o){
255
		if(o instanceof IndexingQueueObject){
256
			int revision = Integer.parseInt(rev);
257
			int oRevision = Integer.parseInt(((IndexingQueueObject)o).getRev());
258
			
259
			if(revision == oRevision) {
260
				return 0;
261
			} else if (revision > oRevision) {
262
				return 1;
263
			} else {
264
				return -1;
265
			}
266
		} else {
267
			throw new java.lang.ClassCastException();
268
		}
269
	}
217 270
}

Also available in: Unified diff