Project

General

Profile

« Previous | Next » 

Revision 9058

moved RDF XML subprocessor to cn-index project.

View differences:

metacat-index/src/main/java/edu/ucsb/nceas/metacat/index/annotation/RdfXmlSubprocessor.java
1
/**
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15
 */
16
package edu.ucsb.nceas.metacat.index.annotation;
17

  
18
import java.io.IOException;
19
import java.io.InputStream;
20
import java.net.MalformedURLException;
21
import java.net.URI;
22
import java.util.ArrayList;
23
import java.util.Arrays;
24
import java.util.Date;
25
import java.util.HashMap;
26
import java.util.HashSet;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.Set;
31

  
32
import javax.xml.parsers.ParserConfigurationException;
33
import javax.xml.xpath.XPathExpressionException;
34

  
35
import org.apache.commons.codec.EncoderException;
36
import org.apache.commons.logging.Log;
37
import org.apache.commons.logging.LogFactory;
38
import org.apache.solr.client.solrj.SolrServer;
39
import org.apache.solr.client.solrj.SolrServerException;
40
import org.apache.solr.client.solrj.response.QueryResponse;
41
import org.apache.solr.common.SolrDocument;
42
import org.apache.solr.common.params.SolrParams;
43
import org.apache.solr.schema.IndexSchema;
44
import org.apache.solr.servlet.SolrRequestParsers;
45
import org.dataone.cn.indexer.annotation.SparqlField;
46
import org.dataone.cn.indexer.annotation.TripleStoreService;
47
import org.dataone.cn.indexer.convert.SolrDateConverter;
48
import org.dataone.cn.indexer.parser.IDocumentSubprocessor;
49
import org.dataone.cn.indexer.parser.ISolrDataField;
50
import org.dataone.cn.indexer.solrhttp.SolrDoc;
51
import org.dataone.cn.indexer.solrhttp.SolrElementField;
52
import org.dataone.service.exceptions.NotFound;
53
import org.dataone.service.exceptions.UnsupportedType;
54
import org.dataone.service.types.v1.Permission;
55
import org.dataone.service.types.v1.Subject;
56
import org.dataone.service.types.v1.util.AccessUtil;
57
import org.dataone.service.types.v1.util.AuthUtils;
58
import org.dataone.service.util.DateTimeMarshaller;
59
import org.xml.sax.SAXException;
60

  
61
import com.hp.hpl.jena.ontology.OntModel;
62
import com.hp.hpl.jena.query.Dataset;
63
import com.hp.hpl.jena.query.Query;
64
import com.hp.hpl.jena.query.QueryExecution;
65
import com.hp.hpl.jena.query.QueryExecutionFactory;
66
import com.hp.hpl.jena.query.QueryFactory;
67
import com.hp.hpl.jena.query.QuerySolution;
68
import com.hp.hpl.jena.query.ResultSet;
69
import com.hp.hpl.jena.rdf.model.ModelFactory;
70
import com.hp.hpl.jena.tdb.TDBFactory;
71

  
72
import edu.ucsb.nceas.metacat.common.SolrServerFactory;
73
import edu.ucsb.nceas.metacat.common.query.SolrQueryServiceController;
74
import edu.ucsb.nceas.metacat.index.DistributedMapsFactory;
75

  
76

  
77
/**
78
 * A solr index parser for an RDF/XML file.
79
 * The solr doc of the RDF/XML object only has the system metadata information.
80
 * The solr docs of the science metadata doc and data file have the annotation information.
81
 */
82
public class RdfXmlSubprocessor implements IDocumentSubprocessor {
83

  
84
    private static final String QUERY ="q=id:";
85
    private static Log log = LogFactory.getLog(RdfXmlSubprocessor.class);
86
    
87
    /**
88
     * If xpath returns true execute the processDocument Method
89
     */
90
    private List<String> matchDocuments = null;
91
    private List<ISolrDataField> fieldList = new ArrayList<ISolrDataField>();
92
    
93
    private static SolrServer solrServer =  null;
94
    static {
95
        try {
96
            solrServer = SolrServerFactory.createSolrServer();
97
        } catch (Exception e) {
98
            log.error("RdfXmlSubprocessor - can't generate the SolrServer since - "+e.getMessage());
99
        }
100
    }
101
    
102
    /**
103
     * Returns true if subprocessor should be run against object
104
     * 
105
     * @param formatId the the document to be processed
106
     * @return true if this processor can parse the formatId
107
     */
108
    public boolean canProcess(String formatId) {
109
        return matchDocuments.contains(formatId);
110
    } 
111
    
112
    public List<String> getMatchDocuments() {
113
        return matchDocuments;
114
    }
115

  
116
    public void setMatchDocuments(List<String> matchDocuments) {
117
        this.matchDocuments = matchDocuments;
118
    }
119
    public List<ISolrDataField> getFieldList() {
120
		return fieldList;
121
	}
122

  
123
	public void setFieldList(List<ISolrDataField> fieldList) {
124
		this.fieldList = fieldList;
125
	}
126

  
127
	@Override
128
    public Map<String, SolrDoc> processDocument(String identifier, Map<String, SolrDoc> docs, InputStream is) throws Exception {
129
        SolrDoc resourceMapDoc = docs.get(identifier);
130
        List<SolrDoc> processedDocs = process(resourceMapDoc, is);
131
        Map<String, SolrDoc> processedDocsMap = new HashMap<String, SolrDoc>();
132
        for (SolrDoc processedDoc : processedDocs) {
133
            processedDocsMap.put(processedDoc.getIdentifier(), processedDoc);
134
        }
135
        // make sure to merge any docs that are currently being processed
136
        Map<String, SolrDoc> mergedDocuments = mergeDocs(docs, processedDocsMap);
137
        return mergedDocuments;
138
    }
139
    
140
    private List<SolrDoc> process(SolrDoc indexDocument, InputStream is) throws Exception {
141
    	
142
    	// get the triplestore dataset
143
		Dataset dataset = TripleStoreService.getInstance().getDataset();
144
		
145
    	// read the annotation
146
    	String indexDocId = indexDocument.getIdentifier();
147
    	String name = indexDocId;
148
    			
149
    	//Check if the identifier is a valid URI and if not, make it one by prepending "http://"
150
    	URI nameURI = new URI(indexDocId);
151
    	String scheme = nameURI.getScheme();
152
    	if((scheme == null) || (scheme.isEmpty())){
153
    		name = "http://" + indexDocId.toLowerCase();
154
    	}
155
    	
156
    	boolean loaded = dataset.containsNamedModel(name);
157
		if (!loaded) {
158
			OntModel ontModel = ModelFactory.createOntologyModel();
159
			ontModel.read(is, name);
160
			dataset.addNamedModel(name, ontModel);
161
		}
162
		//dataset.getDefaultModel().add(ontModel);
163
		
164
		// process each field query
165
        Map<String, SolrDoc> documentsToIndex = new HashMap<String, SolrDoc>();
166
		for (ISolrDataField field: this.fieldList) {
167
			String q = null;
168
			if (field instanceof SparqlField) {
169
				q = ((SparqlField) field).getQuery();
170
				q = q.replaceAll("\\$GRAPH_NAME", name);
171
				Query query = QueryFactory.create(q);
172
				QueryExecution qexec = QueryExecutionFactory.create(query, dataset);
173
				ResultSet results = qexec.execSelect();
174
				
175
				while (results.hasNext()) {
176
					SolrDoc solrDoc = null;
177
					QuerySolution solution = results.next();
178
					System.out.println(solution.toString());
179
					
180
					// find the index document we are trying to augment with the annotation
181
					if (solution.contains("pid")) {
182
						String id = solution.getLiteral("pid").getString();
183
						
184
						// check if anyone with permissions on the annotation document has write permission on the document we are annotating
185
						boolean statementAuthorized = false;
186
						try {
187
							HashMap<Subject, Set<Permission>> annotationPermissionMap = AccessUtil.getPermissionMap(DistributedMapsFactory.getSystemMetadata(indexDocId).getAccessPolicy());
188
							annotationPermissionMap.put(DistributedMapsFactory.getSystemMetadata(indexDocId).getRightsHolder(), new HashSet<Permission>(Arrays.asList(Permission.CHANGE_PERMISSION)));
189
							statementAuthorized = AuthUtils.isAuthorized(annotationPermissionMap.keySet(), Permission.WRITE, DistributedMapsFactory.getSystemMetadata(id));
190
						} catch (Exception e) {
191
							log.warn("Could not check for assertion permission on original pid: " + id, e);
192
						}
193
						if (!statementAuthorized) {	
194
							continue;
195
						}
196
						
197
						// otherwise carry on with the indexing
198
						solrDoc = documentsToIndex.get(id);
199
						if (solrDoc == null) {
200
							solrDoc = new SolrDoc();
201
							solrDoc.addField(new SolrElementField(SolrElementField.FIELD_ID, id));
202
							documentsToIndex.put(id, solrDoc);
203
						}
204
					}
205

  
206
					// add the field to the index document
207
					if (solution.contains(field.getName())) {
208
						String value = solution.get(field.getName()).toString();
209
						SolrElementField f = new SolrElementField(field.getName(), value);
210
						if (!solrDoc.hasFieldWithValue(f.getName(), f.getValue())) {
211
							solrDoc.addField(f);
212
						}
213
					}
214
				}
215
			}
216
		}
217
		
218
		// clean up the triple store
219
		TDBFactory.release(dataset);
220

  
221
		// merge the existing index with the new[er] values
222
        Map<String, SolrDoc> existingDocuments = getSolrDocs(documentsToIndex.keySet());
223
        Map<String, SolrDoc> mergedDocuments = mergeDocs(documentsToIndex, existingDocuments);
224
        mergedDocuments.put(indexDocument.getIdentifier(), indexDocument);
225
        
226
        return new ArrayList<SolrDoc>(mergedDocuments.values());
227
    }
228
    
229
    private Map<String, SolrDoc> getSolrDocs(Set<String> ids) throws Exception {
230
        Map<String, SolrDoc> list = new HashMap<String, SolrDoc>();
231
        if (ids != null) {
232
            for (String id : ids) {
233
            	SolrDoc doc = getSolrDoc(id);
234
                if (doc != null) {
235
                    list.put(id, doc);
236
                }
237
            }
238
        }
239
        return list;
240
    }
241
    
242
    private Map<String, SolrDoc> mergeDocs(Map<String, SolrDoc> pending, Map<String, SolrDoc> existing) throws Exception {
243
    	IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
244

  
245
    	Map<String, SolrDoc> merged = new HashMap<String, SolrDoc>();
246
    	Iterator<String> pendingIter = pending.keySet().iterator();
247
    	while (pendingIter.hasNext()) {
248
    		String id = pendingIter.next();
249
    		SolrDoc pendingDoc = pending.get(id);
250
    		SolrDoc existingDoc = existing.get(id);
251
    		SolrDoc mergedDoc = new SolrDoc();
252
    		if (existingDoc != null) {
253
    			// merge the existing fields
254
    			for (SolrElementField field: existingDoc.getFieldList()) {
255
    				mergedDoc.addField(field);
256
    				
257
    			}
258
    		}
259
    		// add the pending
260
    		for (SolrElementField field: pendingDoc.getFieldList()) {
261
    			if (field.getName().equals(SolrElementField.FIELD_ID) && mergedDoc.hasField(SolrElementField.FIELD_ID)) {
262
    				continue;
263
    			}
264
    			// don't transfer the copyTo fields, otherwise there are errors
265
				if (indexSchema.isCopyFieldTarget(indexSchema.getField(field.getName()))) {
266
					continue;
267
				}
268
				// only add if we don't already have it
269
				if (!mergedDoc.hasFieldWithValue(field.getName(), field.getValue())) {
270
					mergedDoc.addField(field);
271
				}	
272
			}
273
    		
274
    		// include in results
275
			merged.put(id, mergedDoc);
276
    	}
277
    	return merged;
278
    }
279
	/*
280
	 * Get the SolrDoc for the specified id
281
	 */
282
	public static SolrDoc getSolrDoc(String id) throws SolrServerException, MalformedURLException, UnsupportedType, NotFound, ParserConfigurationException, IOException, SAXException {
283
		SolrDoc doc = null;
284

  
285
		if (solrServer != null) {
286
			String query = QUERY + "\"" + id + "\"";
287
			SolrParams solrParams = SolrRequestParsers.parseQueryString(query);
288
			QueryResponse qr = solrServer.query(solrParams);
289
			if (!qr.getResults().isEmpty()) {
290
				doc = new SolrDoc();
291
				SolrDocument orig = qr.getResults().get(0);
292
				IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
293
				for (String fieldName : orig.getFieldNames()) {
294
					// don't transfer the copyTo fields, otherwise there are errors
295
					if (indexSchema.isCopyFieldTarget(indexSchema.getField(fieldName))) {
296
						continue;
297
					}
298
					for (Object value : orig.getFieldValues(fieldName)) {
299
						String stringValue = value.toString();
300
						// special handling for dates in ISO 8601
301
						if (value instanceof Date) {
302
							stringValue = DateTimeMarshaller.serializeDateToUTC((Date) value);
303
							SolrDateConverter converter = new SolrDateConverter();
304
							stringValue = converter.convert(stringValue);
305
						}
306
						SolrElementField field = new SolrElementField(fieldName, stringValue);
307
						log.debug("Adding field: " + fieldName);
308
						doc.addField(field);
309
					}
310
				}
311
			}
312

  
313
		}
314
		return doc;
315
	}
316

  
317
	@Override
318
	public SolrDoc mergeWithIndexedDocument(SolrDoc indexDocument)
319
			throws IOException, EncoderException, XPathExpressionException {
320
		// TODO: actually perform merging 
321
		return indexDocument;
322
	}
323

  
324

  
325
}
326 0

  

Also available in: Unified diff