Project

General

Profile

1
/**
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15
 */
16
package edu.ucsb.nceas.metacat.index.annotation;
17

    
18
import java.io.IOException;
19
import java.io.InputStream;
20
import java.net.MalformedURLException;
21
import java.net.URI;
22
import java.util.ArrayList;
23
import java.util.Arrays;
24
import java.util.Date;
25
import java.util.HashMap;
26
import java.util.HashSet;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.Set;
31

    
32
import javax.xml.parsers.ParserConfigurationException;
33

    
34
import org.apache.commons.logging.Log;
35
import org.apache.commons.logging.LogFactory;
36
import org.apache.solr.client.solrj.SolrServer;
37
import org.apache.solr.client.solrj.SolrServerException;
38
import org.apache.solr.client.solrj.response.QueryResponse;
39
import org.apache.solr.common.SolrDocument;
40
import org.apache.solr.common.params.SolrParams;
41
import org.apache.solr.schema.IndexSchema;
42
import org.apache.solr.servlet.SolrRequestParsers;
43
import org.dataone.cn.indexer.annotation.SparqlField;
44
import org.dataone.cn.indexer.annotation.TripleStoreService;
45
import org.dataone.cn.indexer.convert.SolrDateConverter;
46
import org.dataone.cn.indexer.parser.IDocumentSubprocessor;
47
import org.dataone.cn.indexer.parser.ISolrDataField;
48
import org.dataone.cn.indexer.solrhttp.SolrDoc;
49
import org.dataone.cn.indexer.solrhttp.SolrElementField;
50
import org.dataone.service.exceptions.NotFound;
51
import org.dataone.service.exceptions.UnsupportedType;
52
import org.dataone.service.types.v1.Permission;
53
import org.dataone.service.types.v1.Subject;
54
import org.dataone.service.types.v1.util.AccessUtil;
55
import org.dataone.service.types.v1.util.AuthUtils;
56
import org.dataone.service.util.DateTimeMarshaller;
57
import org.xml.sax.SAXException;
58

    
59
import com.hp.hpl.jena.ontology.OntModel;
60
import com.hp.hpl.jena.query.Dataset;
61
import com.hp.hpl.jena.query.Query;
62
import com.hp.hpl.jena.query.QueryExecution;
63
import com.hp.hpl.jena.query.QueryExecutionFactory;
64
import com.hp.hpl.jena.query.QueryFactory;
65
import com.hp.hpl.jena.query.QuerySolution;
66
import com.hp.hpl.jena.query.ResultSet;
67
import com.hp.hpl.jena.rdf.model.ModelFactory;
68
import com.hp.hpl.jena.tdb.TDBFactory;
69

    
70
import edu.ucsb.nceas.metacat.common.SolrServerFactory;
71
import edu.ucsb.nceas.metacat.common.query.SolrQueryServiceController;
72
import edu.ucsb.nceas.metacat.index.DistributedMapsFactory;
73

    
74

    
75
/**
76
 * A solr index parser for an RDF/XML file.
77
 * The solr doc of the RDF/XML object only has the system metadata information.
78
 * The solr docs of the science metadata doc and data file have the annotation information.
79
 */
80
public class RdfXmlSubprocessor implements IDocumentSubprocessor {
81

    
82
    private static final String QUERY ="q=id:";
83
    private static Log log = LogFactory.getLog(RdfXmlSubprocessor.class);
84
    
85
    /**
86
     * If xpath returns true execute the processDocument Method
87
     */
88
    private List<String> matchDocuments = null;
89
    private List<ISolrDataField> fieldList = new ArrayList<ISolrDataField>();
90
    
91
    private static SolrServer solrServer =  null;
92
    static {
93
        try {
94
            solrServer = SolrServerFactory.createSolrServer();
95
        } catch (Exception e) {
96
            log.error("RdfXmlSubprocessor - can't generate the SolrServer since - "+e.getMessage());
97
        }
98
    }
99
    
100
    /**
101
     * Returns true if subprocessor should be run against object
102
     * 
103
     * @param formatId the the document to be processed
104
     * @return true if this processor can parse the formatId
105
     */
106
    public boolean canProcess(String formatId) {
107
        return matchDocuments.contains(formatId);
108
    } 
109
    
110
    public List<String> getMatchDocuments() {
111
        return matchDocuments;
112
    }
113

    
114
    public void setMatchDocuments(List<String> matchDocuments) {
115
        this.matchDocuments = matchDocuments;
116
    }
117
    public List<ISolrDataField> getFieldList() {
118
		return fieldList;
119
	}
120

    
121
	public void setFieldList(List<ISolrDataField> fieldList) {
122
		this.fieldList = fieldList;
123
	}
124

    
125
	@Override
126
    public Map<String, SolrDoc> processDocument(String identifier, Map<String, SolrDoc> docs, InputStream is) throws Exception {
127
        SolrDoc resourceMapDoc = docs.get(identifier);
128
        List<SolrDoc> processedDocs = process(resourceMapDoc, is);
129
        Map<String, SolrDoc> processedDocsMap = new HashMap<String, SolrDoc>();
130
        for (SolrDoc processedDoc : processedDocs) {
131
            processedDocsMap.put(processedDoc.getIdentifier(), processedDoc);
132
        }
133
        // make sure to merge any docs that are currently being processed
134
        Map<String, SolrDoc> mergedDocuments = mergeDocs(docs, processedDocsMap);
135
        return mergedDocuments;
136
    }
137
    
138
    private List<SolrDoc> process(SolrDoc indexDocument, InputStream is) throws Exception {
139
    	
140
    	// get the triplestore dataset
141
		Dataset dataset = TripleStoreService.getInstance().getDataset();
142
		
143
    	// read the annotation
144
    	String indexDocId = indexDocument.getIdentifier();
145
    	String name = indexDocId;
146
    			
147
    	//Check if the identifier is a valid URI and if not, make it one by prepending "http://"
148
    	URI nameURI = new URI(indexDocId);
149
    	String scheme = nameURI.getScheme();
150
    	if((scheme == null) || (scheme.isEmpty())){
151
    		name = "http://" + indexDocId.toLowerCase();
152
    	}
153
    	
154
    	boolean loaded = dataset.containsNamedModel(name);
155
		if (!loaded) {
156
			OntModel ontModel = ModelFactory.createOntologyModel();
157
			ontModel.read(is, name);
158
			dataset.addNamedModel(name, ontModel);
159
		}
160
		//dataset.getDefaultModel().add(ontModel);
161
		
162
		// process each field query
163
        Map<String, SolrDoc> documentsToIndex = new HashMap<String, SolrDoc>();
164
		for (ISolrDataField field: this.fieldList) {
165
			String q = null;
166
			if (field instanceof SparqlField) {
167
				q = ((SparqlField) field).getQuery();
168
				q = q.replaceAll("\\$GRAPH_NAME", name);
169
				Query query = QueryFactory.create(q);
170
				QueryExecution qexec = QueryExecutionFactory.create(query, dataset);
171
				ResultSet results = qexec.execSelect();
172
				
173
				while (results.hasNext()) {
174
					SolrDoc solrDoc = null;
175
					QuerySolution solution = results.next();
176
					System.out.println(solution.toString());
177
					
178
					// find the index document we are trying to augment with the annotation
179
					if (solution.contains("pid")) {
180
						String id = solution.getLiteral("pid").getString();
181
						
182
						// check if anyone with permissions on the annotation document has write permission on the document we are annotating
183
						boolean statementAuthorized = false;
184
						try {
185
							HashMap<Subject, Set<Permission>> annotationPermissionMap = AccessUtil.getPermissionMap(DistributedMapsFactory.getSystemMetadata(indexDocId).getAccessPolicy());
186
							annotationPermissionMap.put(DistributedMapsFactory.getSystemMetadata(indexDocId).getRightsHolder(), new HashSet<Permission>(Arrays.asList(Permission.CHANGE_PERMISSION)));
187
							statementAuthorized = AuthUtils.isAuthorized(annotationPermissionMap.keySet(), Permission.WRITE, DistributedMapsFactory.getSystemMetadata(id));
188
						} catch (Exception e) {
189
							log.warn("Could not check for assertion permission on original pid: " + id, e);
190
						}
191
						if (!statementAuthorized) {	
192
							continue;
193
						}
194
						
195
						// otherwise carry on with the indexing
196
						solrDoc = documentsToIndex.get(id);
197
						if (solrDoc == null) {
198
							solrDoc = new SolrDoc();
199
							solrDoc.addField(new SolrElementField(SolrElementField.FIELD_ID, id));
200
							documentsToIndex.put(id, solrDoc);
201
						}
202
					}
203

    
204
					// add the field to the index document
205
					if (solution.contains(field.getName())) {
206
						String value = solution.get(field.getName()).toString();
207
						SolrElementField f = new SolrElementField(field.getName(), value);
208
						if (!solrDoc.hasFieldWithValue(f.getName(), f.getValue())) {
209
							solrDoc.addField(f);
210
						}
211
					}
212
				}
213
			}
214
		}
215
		
216
		// clean up the triple store
217
		TDBFactory.release(dataset);
218

    
219
		// merge the existing index with the new[er] values
220
        Map<String, SolrDoc> existingDocuments = getSolrDocs(documentsToIndex.keySet());
221
        Map<String, SolrDoc> mergedDocuments = mergeDocs(documentsToIndex, existingDocuments);
222
        mergedDocuments.put(indexDocument.getIdentifier(), indexDocument);
223
        
224
        return new ArrayList<SolrDoc>(mergedDocuments.values());
225
    }
226
    
227
    private Map<String, SolrDoc> getSolrDocs(Set<String> ids) throws Exception {
228
        Map<String, SolrDoc> list = new HashMap<String, SolrDoc>();
229
        if (ids != null) {
230
            for (String id : ids) {
231
            	SolrDoc doc = getSolrDoc(id);
232
                if (doc != null) {
233
                    list.put(id, doc);
234
                }
235
            }
236
        }
237
        return list;
238
    }
239
    
240
    private Map<String, SolrDoc> mergeDocs(Map<String, SolrDoc> pending, Map<String, SolrDoc> existing) throws Exception {
241
    	IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
242

    
243
    	Map<String, SolrDoc> merged = new HashMap<String, SolrDoc>();
244
    	Iterator<String> pendingIter = pending.keySet().iterator();
245
    	while (pendingIter.hasNext()) {
246
    		String id = pendingIter.next();
247
    		SolrDoc pendingDoc = pending.get(id);
248
    		SolrDoc existingDoc = existing.get(id);
249
    		SolrDoc mergedDoc = new SolrDoc();
250
    		if (existingDoc != null) {
251
    			// merge the existing fields
252
    			for (SolrElementField field: existingDoc.getFieldList()) {
253
    				mergedDoc.addField(field);
254
    				
255
    			}
256
    		}
257
    		// add the pending
258
    		for (SolrElementField field: pendingDoc.getFieldList()) {
259
    			if (field.getName().equals(SolrElementField.FIELD_ID) && mergedDoc.hasField(SolrElementField.FIELD_ID)) {
260
    				continue;
261
    			}
262
    			// don't transfer the copyTo fields, otherwise there are errors
263
				if (indexSchema.isCopyFieldTarget(indexSchema.getField(field.getName()))) {
264
					continue;
265
				}
266
				// only add if we don't already have it
267
				if (!mergedDoc.hasFieldWithValue(field.getName(), field.getValue())) {
268
					mergedDoc.addField(field);
269
				}	
270
			}
271
    		
272
    		// include in results
273
			merged.put(id, mergedDoc);
274
    	}
275
    	return merged;
276
    }
277
	/*
278
	 * Get the SolrDoc for the specified id
279
	 */
280
	public static SolrDoc getSolrDoc(String id) throws SolrServerException, MalformedURLException, UnsupportedType, NotFound, ParserConfigurationException, IOException, SAXException {
281
		SolrDoc doc = null;
282

    
283
		if (solrServer != null) {
284
			String query = QUERY + "\"" + id + "\"";
285
			SolrParams solrParams = SolrRequestParsers.parseQueryString(query);
286
			QueryResponse qr = solrServer.query(solrParams);
287
			if (!qr.getResults().isEmpty()) {
288
				doc = new SolrDoc();
289
				SolrDocument orig = qr.getResults().get(0);
290
				IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
291
				for (String fieldName : orig.getFieldNames()) {
292
					// don't transfer the copyTo fields, otherwise there are errors
293
					if (indexSchema.isCopyFieldTarget(indexSchema.getField(fieldName))) {
294
						continue;
295
					}
296
					for (Object value : orig.getFieldValues(fieldName)) {
297
						String stringValue = value.toString();
298
						// special handling for dates in ISO 8601
299
						if (value instanceof Date) {
300
							stringValue = DateTimeMarshaller.serializeDateToUTC((Date) value);
301
							SolrDateConverter converter = new SolrDateConverter();
302
							stringValue = converter.convert(stringValue);
303
						}
304
						SolrElementField field = new SolrElementField(fieldName, stringValue);
305
						log.debug("Adding field: " + fieldName);
306
						doc.addField(field);
307
					}
308
				}
309
			}
310

    
311
		}
312
		return doc;
313
	}
314

    
315

    
316
}
    (1-1/1)