Project

General

Profile

1
/**
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15
 */
16
package edu.ucsb.nceas.metacat.index.annotation;
17

    
18
import java.io.ByteArrayInputStream;
19
import java.io.ByteArrayOutputStream;
20
import java.io.IOException;
21
import java.io.InputStream;
22
import java.net.MalformedURLException;
23
import java.util.ArrayList;
24
import java.util.Arrays;
25
import java.util.Date;
26
import java.util.HashMap;
27
import java.util.HashSet;
28
import java.util.Iterator;
29
import java.util.List;
30
import java.util.Map;
31
import java.util.Set;
32

    
33
import javax.xml.parsers.ParserConfigurationException;
34
import javax.xml.transform.Result;
35
import javax.xml.transform.Source;
36
import javax.xml.transform.TransformerConfigurationException;
37
import javax.xml.transform.TransformerException;
38
import javax.xml.transform.TransformerFactory;
39
import javax.xml.transform.TransformerFactoryConfigurationError;
40
import javax.xml.transform.dom.DOMSource;
41
import javax.xml.transform.stream.StreamResult;
42

    
43
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
45
import org.apache.solr.client.solrj.SolrServer;
46
import org.apache.solr.client.solrj.SolrServerException;
47
import org.apache.solr.client.solrj.response.QueryResponse;
48
import org.apache.solr.common.SolrDocument;
49
import org.apache.solr.common.params.SolrParams;
50
import org.apache.solr.schema.IndexSchema;
51
import org.apache.solr.servlet.SolrRequestParsers;
52
import org.dataone.cn.indexer.convert.SolrDateConverter;
53
import org.dataone.cn.indexer.parser.AbstractDocumentSubprocessor;
54
import org.dataone.cn.indexer.parser.IDocumentSubprocessor;
55
import org.dataone.cn.indexer.parser.ISolrField;
56
import org.dataone.cn.indexer.solrhttp.SolrDoc;
57
import org.dataone.cn.indexer.solrhttp.SolrElementField;
58
import org.dataone.service.exceptions.NotFound;
59
import org.dataone.service.exceptions.UnsupportedType;
60
import org.dataone.service.types.v1.Permission;
61
import org.dataone.service.types.v1.Subject;
62
import org.dataone.service.types.v1.util.AccessUtil;
63
import org.dataone.service.types.v1.util.AuthUtils;
64
import org.dataone.service.util.DateTimeMarshaller;
65
import org.w3c.dom.Document;
66
import org.xml.sax.SAXException;
67

    
68
import com.hp.hpl.jena.ontology.OntModel;
69
import com.hp.hpl.jena.query.Dataset;
70
import com.hp.hpl.jena.query.Query;
71
import com.hp.hpl.jena.query.QueryExecution;
72
import com.hp.hpl.jena.query.QueryExecutionFactory;
73
import com.hp.hpl.jena.query.QueryFactory;
74
import com.hp.hpl.jena.query.QuerySolution;
75
import com.hp.hpl.jena.query.ResultSet;
76
import com.hp.hpl.jena.rdf.model.ModelFactory;
77
import com.hp.hpl.jena.tdb.TDBFactory;
78

    
79
import edu.ucsb.nceas.metacat.common.SolrServerFactory;
80
import edu.ucsb.nceas.metacat.common.query.SolrQueryServiceController;
81
import edu.ucsb.nceas.metacat.index.DistributedMapsFactory;
82

    
83

    
84
/**
85
 * A solr index parser for an RDF/XML file.
86
 * The solr doc of the RDF/XML object only has the system metadata information.
87
 * The solr docs of the science metadata doc and data file have the annotation information.
88
 */
89
public class RdfXmlSubprocessor extends AbstractDocumentSubprocessor implements IDocumentSubprocessor {
90

    
91
    private static final String QUERY ="q=id:";
92
    private static Log log = LogFactory.getLog(RdfXmlSubprocessor.class);
93
    private static SolrServer solrServer =  null;
94
    static {
95
        try {
96
            solrServer = SolrServerFactory.createSolrServer();
97
        } catch (Exception e) {
98
            log.error("RdfXmlSubprocessor - can't generate the SolrServer since - "+e.getMessage());
99
        }
100
    }
101
          
102
    @Override
103
    public Map<String, SolrDoc> processDocument(String identifier, Map<String, SolrDoc> docs, Document doc) throws Exception {
104
        SolrDoc resourceMapDoc = docs.get(identifier);
105
        List<SolrDoc> processedDocs = process(resourceMapDoc, doc);
106
        Map<String, SolrDoc> processedDocsMap = new HashMap<String, SolrDoc>();
107
        for (SolrDoc processedDoc : processedDocs) {
108
            processedDocsMap.put(processedDoc.getIdentifier(), processedDoc);
109
        }
110
        // make sure to merge any docs that are currently being processed
111
        Map<String, SolrDoc> mergedDocuments = mergeDocs(docs, processedDocsMap);
112
        return mergedDocuments;
113
    }
114

    
115
    private InputStream toInputStream(Document doc) throws TransformerConfigurationException, TransformerException, TransformerFactoryConfigurationError {
116
    	ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
117
    	Source xmlSource = new DOMSource(doc);
118
    	Result outputTarget = new StreamResult(outputStream);
119
    	TransformerFactory.newInstance().newTransformer().transform(xmlSource, outputTarget);
120
    	InputStream is = new ByteArrayInputStream(outputStream.toByteArray());
121
    	return is;
122
    }
123
    
124
    private List<SolrDoc> process(SolrDoc indexDocument, Document rdfXmlDocument) throws Exception {
125
    	
126
    	// get the triplestore dataset
127
		Dataset dataset = TripleStoreService.getInstance().getDataset();
128
		
129
    	// read the annotation
130
		InputStream source = toInputStream(rdfXmlDocument);
131
    	String name = indexDocument.getIdentifier();
132
    	boolean loaded = dataset.containsNamedModel(name);
133
		if (!loaded) {
134
			OntModel ontModel = ModelFactory.createOntologyModel();
135
			ontModel.read(source, name);
136
			dataset.addNamedModel(name, ontModel);
137
		}
138
		//dataset.getDefaultModel().add(ontModel);
139
		
140
		// process each field query
141
        Map<String, SolrDoc> documentsToIndex = new HashMap<String, SolrDoc>();
142
		for (ISolrField field: this.getFieldList()) {
143
			String q = null;
144
			if (field instanceof SparqlField) {
145
				q = ((SparqlField) field).getQuery();
146
				q = q.replaceAll("\\$GRAPH_NAME", name);
147
				Query query = QueryFactory.create(q);
148
				QueryExecution qexec = QueryExecutionFactory.create(query, dataset);
149
				ResultSet results = qexec.execSelect();
150
				
151
				while (results.hasNext()) {
152
					SolrDoc solrDoc = null;
153
					QuerySolution solution = results.next();
154
					System.out.println(solution.toString());
155
					
156
					// find the index document we are trying to augment with the annotation
157
					if (solution.contains("pid")) {
158
						String id = solution.getLiteral("pid").getString();
159
						
160
						// check if anyone with permissions on the annotation document has write permission on the document we are annotating
161
						boolean statementAuthorized = false;
162
						try {
163
							HashMap<Subject, Set<Permission>> annotationPermissionMap = AccessUtil.getPermissionMap(DistributedMapsFactory.getSystemMetadata(name).getAccessPolicy());
164
							annotationPermissionMap.put(DistributedMapsFactory.getSystemMetadata(name).getRightsHolder(), new HashSet<Permission>(Arrays.asList(Permission.CHANGE_PERMISSION)));
165
							statementAuthorized = AuthUtils.isAuthorized(annotationPermissionMap.keySet(), Permission.WRITE, DistributedMapsFactory.getSystemMetadata(id));
166
						} catch (Exception e) {
167
							log.warn("Could not check for assertion permission on original pid: " + id, e);
168
						}
169
						if (!statementAuthorized) {	
170
							continue;
171
						}
172
						
173
						// otherwise carry on with the indexing
174
						solrDoc = documentsToIndex.get(id);
175
						if (solrDoc == null) {
176
							solrDoc = new SolrDoc();
177
							solrDoc.addField(new SolrElementField(SolrElementField.FIELD_ID, id));
178
							documentsToIndex.put(id, solrDoc);
179
						}
180
					}
181

    
182
					// add the field to the index document
183
					if (solution.contains(field.getName())) {
184
						String value = solution.get(field.getName()).toString();
185
						SolrElementField f = new SolrElementField(field.getName(), value);
186
						if (!solrDoc.hasFieldWithValue(f.getName(), f.getValue())) {
187
							solrDoc.addField(f);
188
						}
189
					}
190
				}
191
			}
192
		}
193
		
194
		// clean up the triple store
195
		TDBFactory.release(dataset);
196

    
197
		// merge the existing index with the new[er] values
198
        Map<String, SolrDoc> existingDocuments = getSolrDocs(documentsToIndex.keySet());
199
        Map<String, SolrDoc> mergedDocuments = mergeDocs(documentsToIndex, existingDocuments);
200
        mergedDocuments.put(indexDocument.getIdentifier(), indexDocument);
201
        
202
        return new ArrayList<SolrDoc>(mergedDocuments.values());
203
    }
204
    
205
    private Map<String, SolrDoc> getSolrDocs(Set<String> ids) throws Exception {
206
        Map<String, SolrDoc> list = new HashMap<String, SolrDoc>();
207
        if (ids != null) {
208
            for (String id : ids) {
209
            	SolrDoc doc = getSolrDoc(id);
210
                if (doc != null) {
211
                    list.put(id, doc);
212
                }
213
            }
214
        }
215
        return list;
216
    }
217
    
218
    private Map<String, SolrDoc> mergeDocs(Map<String, SolrDoc> pending, Map<String, SolrDoc> existing) throws Exception {
219
    	IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
220

    
221
    	Map<String, SolrDoc> merged = new HashMap<String, SolrDoc>();
222
    	Iterator<String> pendingIter = pending.keySet().iterator();
223
    	while (pendingIter.hasNext()) {
224
    		String id = pendingIter.next();
225
    		SolrDoc pendingDoc = pending.get(id);
226
    		SolrDoc existingDoc = existing.get(id);
227
    		SolrDoc mergedDoc = new SolrDoc();
228
    		if (existingDoc != null) {
229
    			// merge the existing fields
230
    			for (SolrElementField field: existingDoc.getFieldList()) {
231
    				mergedDoc.addField(field);
232
    				
233
    			}
234
    		}
235
    		// add the pending
236
    		for (SolrElementField field: pendingDoc.getFieldList()) {
237
    			if (field.getName().equals(SolrElementField.FIELD_ID) && mergedDoc.hasField(SolrElementField.FIELD_ID)) {
238
    				continue;
239
    			}
240
    			// don't transfer the copyTo fields, otherwise there are errors
241
				if (indexSchema.isCopyFieldTarget(indexSchema.getField(field.getName()))) {
242
					continue;
243
				}
244
				// only add if we don't already have it
245
				if (!mergedDoc.hasFieldWithValue(field.getName(), field.getValue())) {
246
					mergedDoc.addField(field);
247
				}	
248
			}
249
    		
250
    		// include in results
251
			merged.put(id, mergedDoc);
252
    	}
253
    	return merged;
254
    }
255
	/*
256
	 * Get the SolrDoc for the specified id
257
	 */
258
	public static SolrDoc getSolrDoc(String id) throws SolrServerException, MalformedURLException, UnsupportedType, NotFound, ParserConfigurationException, IOException, SAXException {
259
		SolrDoc doc = null;
260

    
261
		if (solrServer != null) {
262
			String query = QUERY + "\"" + id + "\"";
263
			SolrParams solrParams = SolrRequestParsers.parseQueryString(query);
264
			QueryResponse qr = solrServer.query(solrParams);
265
			if (!qr.getResults().isEmpty()) {
266
				doc = new SolrDoc();
267
				SolrDocument orig = qr.getResults().get(0);
268
				IndexSchema indexSchema = SolrQueryServiceController.getInstance().getSchema();
269
				for (String fieldName : orig.getFieldNames()) {
270
					// don't transfer the copyTo fields, otherwise there are errors
271
					if (indexSchema.isCopyFieldTarget(indexSchema.getField(fieldName))) {
272
						continue;
273
					}
274
					for (Object value : orig.getFieldValues(fieldName)) {
275
						String stringValue = value.toString();
276
						// special handling for dates in ISO 8601
277
						if (value instanceof Date) {
278
							stringValue = DateTimeMarshaller.serializeDateToUTC((Date) value);
279
							SolrDateConverter converter = new SolrDateConverter();
280
							stringValue = converter.convert(stringValue);
281
						}
282
						SolrElementField field = new SolrElementField(fieldName, stringValue);
283
						log.debug("Adding field: " + fieldName);
284
						doc.addField(field);
285
					}
286
				}
287
			}
288

    
289
		}
290
		return doc;
291
	}
292

    
293

    
294
}
(1-1/3)