Project

General

Profile

1
/**
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15
 */
16
package edu.ucsb.nceas.metacat.index.annotation;
17

    
18
import edu.ucsb.nceas.metacat.index.resourcemap.ResourceMapSubprocessor;
19

    
20
import java.io.ByteArrayOutputStream;
21
import java.io.IOException;
22
import java.io.InputStream;
23
import java.net.URI;
24
import java.net.URISyntaxException;
25
import java.util.ArrayList;
26
import java.util.HashMap;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.Set;
31

    
32
import javax.xml.xpath.XPathExpressionException;
33

    
34
import org.apache.commons.codec.EncoderException;
35
import org.apache.commons.io.IOUtils;
36
import org.apache.commons.logging.Log;
37
import org.apache.commons.logging.LogFactory;
38
import org.dataone.cn.index.util.PerformanceLogger;
39
import org.dataone.cn.indexer.annotation.SparqlField;
40
import org.dataone.cn.indexer.annotation.TripleStoreService;
41
import org.dataone.cn.indexer.parser.IDocumentSubprocessor;
42
import org.dataone.cn.indexer.parser.ISolrDataField;
43
import org.dataone.cn.indexer.parser.SubprocessorUtility;
44
import org.dataone.cn.indexer.solrhttp.HTTPService;
45
import org.dataone.cn.indexer.solrhttp.SolrDoc;
46
import org.dataone.cn.indexer.solrhttp.SolrElementField;
47
import org.springframework.beans.factory.annotation.Autowired;
48

    
49
import com.hp.hpl.jena.ontology.OntModel;
50
import com.hp.hpl.jena.query.Dataset;
51
import com.hp.hpl.jena.query.Query;
52
import com.hp.hpl.jena.query.QueryExecution;
53
import com.hp.hpl.jena.query.QueryExecutionFactory;
54
import com.hp.hpl.jena.query.QueryFactory;
55
import com.hp.hpl.jena.query.QuerySolution;
56
import com.hp.hpl.jena.query.ResultSet;
57
import com.hp.hpl.jena.rdf.model.ModelFactory;
58
import com.hp.hpl.jena.tdb.TDBFactory;
59

    
60
/**
61
 * A solr index parser for an RDF/XML file.
62
 * The solr doc of the RDF/XML object only has the system metadata information.
63
 * The solr docs of the science metadata doc and data file have the annotation information.
64
 */
65
public class MetacatRdfXmlSubprocessor implements IDocumentSubprocessor {
66

    
67
    private static Log log = LogFactory.getLog(MetacatRdfXmlSubprocessor.class);
68
    private static PerformanceLogger perfLog = PerformanceLogger.getInstance();
69
    /**
70
     * If xpath returns true execute the processDocument Method
71
     */
72
    private List<String> matchDocuments = null;
73

    
74
    private List<ISolrDataField> fieldList = new ArrayList<ISolrDataField>();
75

    
76
    private List<String> fieldsToMerge = new ArrayList<String>();
77

    
78
    @Autowired
79
    private HTTPService httpService = null;
80

    
81
    @Autowired
82
    private String solrQueryUri = null;
83

    
84
    @Autowired
85
    private SubprocessorUtility processorUtility;
86

    
87
    /**
88
     * Returns true if subprocessor should be run against object
89
     * 
90
     * @param formatId the the document to be processed
91
     * @return true if this processor can parse the formatId
92
     */
93
    public boolean canProcess(String formatId) {
94
        return matchDocuments.contains(formatId);
95
    }
96

    
97
    public List<String> getMatchDocuments() {
98
        return matchDocuments;
99
    }
100

    
101
    public void setMatchDocuments(List<String> matchDocuments) {
102
        this.matchDocuments = matchDocuments;
103
    }
104

    
105
    public List<ISolrDataField> getFieldList() {
106
        return fieldList;
107
    }
108

    
109
    public void setFieldList(List<ISolrDataField> fieldList) {
110
        this.fieldList = fieldList;
111
    }
112

    
113
    @Override
114
    public Map<String, SolrDoc> processDocument(String identifier, Map<String, SolrDoc> docs,
115
            InputStream is) throws Exception {
116

    
117
        if (log.isTraceEnabled()) {
118
            log.trace("INCOMING DOCS to processDocument(): ");
119
            serializeDocuments(docs);
120
        }
121

    
122
        SolrDoc resourceMapDoc = docs.get(identifier);
123
        List<SolrDoc> processedDocs = process(resourceMapDoc, is);
124
        Map<String, SolrDoc> processedDocsMap = new HashMap<String, SolrDoc>();
125
        for (SolrDoc processedDoc : processedDocs) {
126
            processedDocsMap.put(processedDoc.getIdentifier(), processedDoc);
127
        }
128

    
129
        if (log.isTraceEnabled()) {
130
            log.trace("PREMERGED DOCS from processDocument(): ");
131
            serializeDocuments(processedDocsMap);
132
        }
133

    
134
        // Merge previously processed (but yet to be indexed) documents
135
        Map<String, SolrDoc> mergedDocs = mergeDocs(docs, processedDocsMap);
136

    
137
        if (log.isTraceEnabled()) {
138
            log.trace("OUTGOING DOCS from processDocument(): ");
139
            serializeDocuments(mergedDocs);
140
        }
141

    
142
        return mergedDocs;
143
    }
144

    
145
    /**
146
     * Serialize documents to be indexed for debugging
147
     * 
148
     * @param docs
149
     * @throws IOException
150
     */
151
    private void serializeDocuments(Map<String, SolrDoc> docs) {
152
        StringBuilder documents = new StringBuilder();
153
        documents.append("<docs>");
154

    
155
        for (SolrDoc doc : docs.values()) {
156
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
157
            try {
158
                doc.serialize(baos, "UTF-8");
159

    
160
            } catch (IOException e) {
161
                log.trace("Couldn't serialize documents: " + e.getMessage());
162
            }
163
            
164
            try {
165
                documents.append(baos.toString());
166
            } finally {
167
                IOUtils.closeQuietly(baos);
168
            }
169
        }
170
        documents.append("</docs>");
171
        log.trace(documents.toString());
172
    }
173

    
174
    private List<SolrDoc> process(SolrDoc indexDocument, InputStream is) throws Exception {
175
        
176
        // get the triplestore dataset
177
        long start = System.currentTimeMillis();
178
        Map<String, SolrDoc> mergedDocuments;
179
        Dataset dataset = TripleStoreService.getInstance().getDataset();
180
        try {
181
            perfLog.log("RdfXmlSubprocess.process gets a dataset from tripe store service ", System.currentTimeMillis() - start);
182
            
183
            // read the annotation
184
            String indexDocId = indexDocument.getIdentifier();
185
            String name = indexDocId;
186
    
187
            //Check if the identifier is a valid URI and if not, make it one by prepending "http://"
188
            URI nameURI;
189
            String scheme = null;
190
            try {
191
                nameURI = new URI(indexDocId);
192
                scheme = nameURI.getScheme();
193
                
194
            } catch (URISyntaxException use) {
195
                // The identifier can't be parsed due to offending characters. It's not a URL
196
                
197
                name = "https://cn.dataone.org/cn/v1/resolve/"+indexDocId;
198
            }
199
            
200
            // The had no scheme prefix. It's not a URL
201
            if ((scheme == null) || (scheme.isEmpty())) {
202
                name = "https://cn.dataone.org/cn/v1/resolve/"+indexDocId;
203
                
204
            }
205
            
206
            long startOntModel = System.currentTimeMillis();
207
            boolean loaded = dataset.containsNamedModel(name);
208
            if (!loaded) {
209
                OntModel ontModel = ModelFactory.createOntologyModel();
210
                ontModel.read(is, name);
211
                dataset.addNamedModel(name, ontModel);
212
            }
213
            perfLog.log("RdfXmlSubprocess.process adds ont-model ", System.currentTimeMillis() - startOntModel);
214
            //dataset.getDefaultModel().add(ontModel);
215
    
216
            // process each field query
217
            Map<String, SolrDoc> documentsToIndex = new HashMap<String, SolrDoc>();
218
            long startField = System.currentTimeMillis();
219
            for (ISolrDataField field : this.fieldList) {
220
                long filed = System.currentTimeMillis();
221
                String q = null;
222
                if (field instanceof SparqlField) {
223
                    q = ((SparqlField) field).getQuery();
224
                    q = q.replaceAll("\\$GRAPH_NAME", name);
225
                    Query query = QueryFactory.create(q);
226
                    log.trace("Executing SPARQL query:\n" + query.toString());
227
                    QueryExecution qexec = QueryExecutionFactory.create(query, dataset);
228
                    ResultSet results = qexec.execSelect();
229
                    while (results.hasNext()) {
230
                        SolrDoc solrDoc = null;
231
                        QuerySolution solution = results.next();
232
                        log.trace(solution.toString());
233
    
234
                        // find the index document we are trying to augment with the annotation
235
                        if (solution.contains("pid")) {
236
                            String id = solution.getLiteral("pid").getString();
237
    
238
                            // TODO: check if anyone with permissions on the annotation document has write permission on the document we are annotating
239
                            boolean statementAuthorized = true;
240
                            if (!statementAuthorized) {
241
                                continue;
242
                            }
243
    
244
                            // otherwise carry on with the indexing
245
                            solrDoc = documentsToIndex.get(id);
246
                            if (solrDoc == null) {
247
                                solrDoc = new SolrDoc();
248
                                solrDoc.addField(new SolrElementField(SolrElementField.FIELD_ID, id));
249
                                documentsToIndex.put(id, solrDoc);
250
                            }
251
                        }
252
    
253
                        // add the field to the index document
254
                        if (solution.contains(field.getName())) {
255
                            String value = solution.get(field.getName()).toString();
256
                            SolrElementField f = new SolrElementField(field.getName(), value);
257
                            if (!solrDoc.hasFieldWithValue(f.getName(), f.getValue())) {
258
                                solrDoc.addField(f);
259
                            }
260
                        }
261
                    }
262
                }
263
                perfLog.log("RdfXmlSubprocess.process process the field "+field.getName(), System.currentTimeMillis() - filed);
264
            }
265
            perfLog.log("RdfXmlSubprocess.process process the fields total ", System.currentTimeMillis() - startField);
266
            // clean up the triple store
267
            //TDBFactory.release(dataset);
268
    
269
            // merge the existing index with the new[er] values
270
            long getStart = System.currentTimeMillis();
271
            Map<String, SolrDoc> existingDocuments = getSolrDocs(documentsToIndex.keySet());
272
            perfLog.log("RdfXmlSubprocess.process get existing solr docs ", System.currentTimeMillis() - getStart);
273
            mergedDocuments = mergeDocs(documentsToIndex, existingDocuments);
274
            mergedDocuments.put(indexDocument.getIdentifier(), indexDocument);
275
    
276
            perfLog.log("RdfXmlSubprocess.process() total take ", System.currentTimeMillis() - start);
277
        } finally {
278
            try {
279
                TripleStoreService.getInstance().destoryDataset(dataset);
280
            } catch (Exception e) {
281
                log.warn("A tdb directory can't be removed since "+e.getMessage(), e);
282
            }
283
        }
284
        return new ArrayList<SolrDoc>(mergedDocuments.values());
285
    }
286

    
287
    private Map<String, SolrDoc> getSolrDocs(Set<String> ids) throws Exception {
288
        Map<String, SolrDoc> list = new HashMap<String, SolrDoc>();
289
        if (ids != null) {
290
            for (String id : ids) {
291
                //SolrDoc doc = httpService.retrieveDocumentFromSolrServer(id, solrQueryUri);
292
                SolrDoc doc = ResourceMapSubprocessor.getSolrDoc(id);;
293
                if (doc != null) {
294
                    list.put(id, doc);
295
                }
296
            }
297
        }
298
        return list;
299
    }
300

    
301
    /*
302
     * Merge existing documents from the Solr index with pending documents
303
     */
304
    private Map<String, SolrDoc> mergeDocs(Map<String, SolrDoc> pending,
305
            Map<String, SolrDoc> existing) throws Exception {
306
        long start = System.currentTimeMillis();
307
        Map<String, SolrDoc> merged = new HashMap<String, SolrDoc>();
308

    
309
        Iterator<String> pendingIter = pending.keySet().iterator();
310
        while (pendingIter.hasNext()) {
311
            String id = pendingIter.next();
312
            SolrDoc pendingDoc = pending.get(id);
313
            SolrDoc existingDoc = existing.get(id);
314
            SolrDoc mergedDoc = new SolrDoc();
315
            if (existingDoc != null) {
316
                // merge the existing fields
317
                for (SolrElementField field : existingDoc.getFieldList()) {
318
                    mergedDoc.addField(field);
319

    
320
                }
321
            }
322
            // add the pending
323
            for (SolrElementField field : pendingDoc.getFieldList()) {
324
                if (field.getName().equals(SolrElementField.FIELD_ID)
325
                        && mergedDoc.hasField(SolrElementField.FIELD_ID)) {
326
                    continue;
327
                }
328

    
329
                // only add if we don't already have it
330
                if (!mergedDoc.hasFieldWithValue(field.getName(), field.getValue())) {
331
                    mergedDoc.addField(field);
332
                }
333
            }
334

    
335
            // include in results
336
            merged.put(id, mergedDoc);
337
        }
338

    
339
        // add existing if not yet merged (needed if existing map size > pending map size)
340
        Iterator<String> existingIter = existing.keySet().iterator();
341

    
342
        while (existingIter.hasNext()) {
343
            String existingId = existingIter.next();
344

    
345
            if (!merged.containsKey(existingId)) {
346
                merged.put(existingId, existing.get(existingId));
347

    
348
            }
349
        }
350

    
351
        if (log.isTraceEnabled()) {
352
            log.trace("MERGED DOCS with existing from the Solr index: ");
353
            serializeDocuments(merged);
354
        }
355
        perfLog.log("RdfXmlSubprocess.merge total ", System.currentTimeMillis() - start);
356
        return merged;
357
    }
358

    
359
    @Override
360
    public SolrDoc mergeWithIndexedDocument(SolrDoc indexDocument) throws IOException,
361
            EncoderException, XPathExpressionException {
362
        return processorUtility.mergeWithIndexedDocument(indexDocument, fieldsToMerge);
363
    }
364

    
365
    public List<String> getFieldsToMerge() {
366
        return fieldsToMerge;
367
    }
368

    
369
    public void setFieldsToMerge(List<String> fieldsToMerge) {
370
        this.fieldsToMerge = fieldsToMerge;
371
    }
372
}
(2-2/2)