Project

General

Profile

1
/**
2
 * This program is free software; you can redistribute it and/or modify
3
 * it under the terms of the GNU General Public License as published by
4
 * the Free Software Foundation; either version 2 of the License, or
5
 * (at your option) any later version.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15
 */
16
package edu.ucsb.nceas.metacat.index.annotation;
17

    
18
import edu.ucsb.nceas.metacat.index.resourcemap.ResourceMapSubprocessor;
19

    
20
import java.io.ByteArrayOutputStream;
21
import java.io.IOException;
22
import java.io.InputStream;
23
import java.net.URI;
24
import java.net.URISyntaxException;
25
import java.util.ArrayList;
26
import java.util.HashMap;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.Set;
31

    
32
import javax.xml.xpath.XPathExpressionException;
33

    
34
import org.apache.commons.codec.EncoderException;
35
import org.apache.commons.io.IOUtils;
36
import org.apache.commons.logging.Log;
37
import org.apache.commons.logging.LogFactory;
38
import org.dataone.cn.index.util.PerformanceLogger;
39
import org.dataone.cn.indexer.annotation.SparqlField;
40
import org.dataone.cn.indexer.annotation.TripleStoreService;
41
import org.dataone.cn.indexer.parser.IDocumentSubprocessor;
42
import org.dataone.cn.indexer.parser.ISolrDataField;
43
import org.dataone.cn.indexer.parser.SubprocessorUtility;
44
import org.dataone.cn.indexer.solrhttp.HTTPService;
45
import org.dataone.cn.indexer.solrhttp.SolrDoc;
46
import org.dataone.cn.indexer.solrhttp.SolrElementField;
47
import org.springframework.beans.factory.annotation.Autowired;
48

    
49
import com.hp.hpl.jena.ontology.OntModel;
50
import com.hp.hpl.jena.query.Dataset;
51
import com.hp.hpl.jena.query.Query;
52
import com.hp.hpl.jena.query.QueryExecution;
53
import com.hp.hpl.jena.query.QueryExecutionFactory;
54
import com.hp.hpl.jena.query.QueryFactory;
55
import com.hp.hpl.jena.query.QuerySolution;
56
import com.hp.hpl.jena.query.ResultSet;
57
import com.hp.hpl.jena.rdf.model.ModelFactory;
58
import com.hp.hpl.jena.tdb.TDBFactory;
59

    
60
/**
61
 * A solr index parser for an RDF/XML file.
62
 * The solr doc of the RDF/XML object only has the system metadata information.
63
 * The solr docs of the science metadata doc and data file have the annotation information.
64
 */
65
public class MetacatRdfXmlSubprocessor implements IDocumentSubprocessor {
66

    
67
    private static Log log = LogFactory.getLog(MetacatRdfXmlSubprocessor.class);
68
    private static PerformanceLogger perfLog = PerformanceLogger.getInstance();
69
    /**
70
     * If xpath returns true execute the processDocument Method
71
     */
72
    private List<String> matchDocuments = null;
73

    
74
    private List<ISolrDataField> fieldList = new ArrayList<ISolrDataField>();
75

    
76
    private List<String> fieldsToMerge = new ArrayList<String>();
77

    
78
    @Autowired
79
    private HTTPService httpService = null;
80

    
81
    @Autowired
82
    private String solrQueryUri = null;
83

    
84
    @Autowired
85
    private SubprocessorUtility processorUtility;
86

    
87
    /**
88
     * Returns true if subprocessor should be run against object
89
     * 
90
     * @param formatId the the document to be processed
91
     * @return true if this processor can parse the formatId
92
     */
93
    public boolean canProcess(String formatId) {
94
        return matchDocuments.contains(formatId);
95
    }
96

    
97
    public List<String> getMatchDocuments() {
98
        return matchDocuments;
99
    }
100

    
101
    public void setMatchDocuments(List<String> matchDocuments) {
102
        this.matchDocuments = matchDocuments;
103
    }
104

    
105
    public List<ISolrDataField> getFieldList() {
106
        return fieldList;
107
    }
108

    
109
    public void setFieldList(List<ISolrDataField> fieldList) {
110
        this.fieldList = fieldList;
111
    }
112

    
113
    @Override
114
    public Map<String, SolrDoc> processDocument(String identifier, Map<String, SolrDoc> docs,
115
            InputStream is) throws Exception {
116

    
117
        if (log.isTraceEnabled()) {
118
            log.trace("INCOMING DOCS to processDocument(): ");
119
            serializeDocuments(docs);
120
        }
121

    
122
        SolrDoc resourceMapDoc = docs.get(identifier);
123
        List<SolrDoc> processedDocs = process(resourceMapDoc, is);
124
        Map<String, SolrDoc> processedDocsMap = new HashMap<String, SolrDoc>();
125
        for (SolrDoc processedDoc : processedDocs) {
126
            processedDocsMap.put(processedDoc.getIdentifier(), processedDoc);
127
        }
128

    
129
        if (log.isTraceEnabled()) {
130
            log.trace("PREMERGED DOCS from processDocument(): ");
131
            serializeDocuments(processedDocsMap);
132
        }
133

    
134
        // Merge previously processed (but yet to be indexed) documents
135
        Map<String, SolrDoc> mergedDocs = mergeDocs(docs, processedDocsMap);
136

    
137
        if (log.isTraceEnabled()) {
138
            log.trace("OUTGOING DOCS from processDocument(): ");
139
            serializeDocuments(mergedDocs);
140
        }
141

    
142
        return mergedDocs;
143
    }
144

    
145
    /**
146
     * Serialize documents to be indexed for debugging
147
     * 
148
     * @param docs
149
     * @throws IOException
150
     */
151
    private void serializeDocuments(Map<String, SolrDoc> docs) {
152
        StringBuilder documents = new StringBuilder();
153
        documents.append("<docs>");
154

    
155
        for (SolrDoc doc : docs.values()) {
156
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
157
            try {
158
                doc.serialize(baos, "UTF-8");
159

    
160
            } catch (IOException e) {
161
                log.trace("Couldn't serialize documents: " + e.getMessage());
162
            }
163
            
164
            try {
165
                documents.append(baos.toString());
166
            } finally {
167
                IOUtils.closeQuietly(baos);
168
            }
169
        }
170
        documents.append("</docs>");
171
        log.trace(documents.toString());
172
    }
173

    
174
    private List<SolrDoc> process(SolrDoc indexDocument, InputStream is) throws Exception {
175
        
176
        // get the triplestore dataset
177
        long start = System.currentTimeMillis();
178
        Dataset dataset = TripleStoreService.getInstance().getDataset();
179
        perfLog.log("RdfXmlSubprocess.process gets a dataset from tripe store service ", System.currentTimeMillis() - start);
180
        
181
        // read the annotation
182
        String indexDocId = indexDocument.getIdentifier();
183
        String name = indexDocId;
184

    
185
        //Check if the identifier is a valid URI and if not, make it one by prepending "http://"
186
        URI nameURI;
187
        String scheme = null;
188
        try {
189
            nameURI = new URI(indexDocId);
190
            scheme = nameURI.getScheme();
191
            
192
        } catch (URISyntaxException use) {
193
            // The identifier can't be parsed due to offending characters. It's not a URL
194
            
195
            name = "https://cn.dataone.org/cn/v1/resolve/"+indexDocId;
196
        }
197
        
198
        // The had no scheme prefix. It's not a URL
199
        if ((scheme == null) || (scheme.isEmpty())) {
200
            name = "https://cn.dataone.org/cn/v1/resolve/"+indexDocId;
201
            
202
        }
203
        
204
        long startOntModel = System.currentTimeMillis();
205
        boolean loaded = dataset.containsNamedModel(name);
206
        if (!loaded) {
207
            OntModel ontModel = ModelFactory.createOntologyModel();
208
            ontModel.read(is, name);
209
            dataset.addNamedModel(name, ontModel);
210
        }
211
        perfLog.log("RdfXmlSubprocess.process adds ont-model ", System.currentTimeMillis() - startOntModel);
212
        //dataset.getDefaultModel().add(ontModel);
213

    
214
        // process each field query
215
        Map<String, SolrDoc> documentsToIndex = new HashMap<String, SolrDoc>();
216
        long startField = System.currentTimeMillis();
217
        for (ISolrDataField field : this.fieldList) {
218
            long filed = System.currentTimeMillis();
219
            String q = null;
220
            if (field instanceof SparqlField) {
221
                q = ((SparqlField) field).getQuery();
222
                q = q.replaceAll("\\$GRAPH_NAME", name);
223
                Query query = QueryFactory.create(q);
224
                log.trace("Executing SPARQL query:\n" + query.toString());
225
                QueryExecution qexec = QueryExecutionFactory.create(query, dataset);
226
                ResultSet results = qexec.execSelect();
227
                while (results.hasNext()) {
228
                    SolrDoc solrDoc = null;
229
                    QuerySolution solution = results.next();
230
                    log.trace(solution.toString());
231

    
232
                    // find the index document we are trying to augment with the annotation
233
                    if (solution.contains("pid")) {
234
                        String id = solution.getLiteral("pid").getString();
235

    
236
                        // TODO: check if anyone with permissions on the annotation document has write permission on the document we are annotating
237
                        boolean statementAuthorized = true;
238
                        if (!statementAuthorized) {
239
                            continue;
240
                        }
241

    
242
                        // otherwise carry on with the indexing
243
                        solrDoc = documentsToIndex.get(id);
244
                        if (solrDoc == null) {
245
                            solrDoc = new SolrDoc();
246
                            solrDoc.addField(new SolrElementField(SolrElementField.FIELD_ID, id));
247
                            documentsToIndex.put(id, solrDoc);
248
                        }
249
                    }
250

    
251
                    // add the field to the index document
252
                    if (solution.contains(field.getName())) {
253
                        String value = solution.get(field.getName()).toString();
254
                        SolrElementField f = new SolrElementField(field.getName(), value);
255
                        if (!solrDoc.hasFieldWithValue(f.getName(), f.getValue())) {
256
                            solrDoc.addField(f);
257
                        }
258
                    }
259
                }
260
            }
261
            perfLog.log("RdfXmlSubprocess.process process the field "+field.getName(), System.currentTimeMillis() - filed);
262
        }
263
        perfLog.log("RdfXmlSubprocess.process process the fields total ", System.currentTimeMillis() - startField);
264
        // clean up the triple store
265
        TDBFactory.release(dataset);
266

    
267
        // merge the existing index with the new[er] values
268
        long getStart = System.currentTimeMillis();
269
        Map<String, SolrDoc> existingDocuments = getSolrDocs(documentsToIndex.keySet());
270
        perfLog.log("RdfXmlSubprocess.process get existing solr docs ", System.currentTimeMillis() - getStart);
271
        Map<String, SolrDoc> mergedDocuments = mergeDocs(documentsToIndex, existingDocuments);
272
        mergedDocuments.put(indexDocument.getIdentifier(), indexDocument);
273

    
274
        perfLog.log("RdfXmlSubprocess.process() total take ", System.currentTimeMillis() - start);
275
        return new ArrayList<SolrDoc>(mergedDocuments.values());
276
    }
277

    
278
    private Map<String, SolrDoc> getSolrDocs(Set<String> ids) throws Exception {
279
        Map<String, SolrDoc> list = new HashMap<String, SolrDoc>();
280
        if (ids != null) {
281
            for (String id : ids) {
282
                //SolrDoc doc = httpService.retrieveDocumentFromSolrServer(id, solrQueryUri);
283
                SolrDoc doc = ResourceMapSubprocessor.getSolrDoc(id);;
284
                if (doc != null) {
285
                    list.put(id, doc);
286
                }
287
            }
288
        }
289
        return list;
290
    }
291

    
292
    /*
293
     * Merge existing documents from the Solr index with pending documents
294
     */
295
    private Map<String, SolrDoc> mergeDocs(Map<String, SolrDoc> pending,
296
            Map<String, SolrDoc> existing) throws Exception {
297
        long start = System.currentTimeMillis();
298
        Map<String, SolrDoc> merged = new HashMap<String, SolrDoc>();
299

    
300
        Iterator<String> pendingIter = pending.keySet().iterator();
301
        while (pendingIter.hasNext()) {
302
            String id = pendingIter.next();
303
            SolrDoc pendingDoc = pending.get(id);
304
            SolrDoc existingDoc = existing.get(id);
305
            SolrDoc mergedDoc = new SolrDoc();
306
            if (existingDoc != null) {
307
                // merge the existing fields
308
                for (SolrElementField field : existingDoc.getFieldList()) {
309
                    mergedDoc.addField(field);
310

    
311
                }
312
            }
313
            // add the pending
314
            for (SolrElementField field : pendingDoc.getFieldList()) {
315
                if (field.getName().equals(SolrElementField.FIELD_ID)
316
                        && mergedDoc.hasField(SolrElementField.FIELD_ID)) {
317
                    continue;
318
                }
319

    
320
                // only add if we don't already have it
321
                if (!mergedDoc.hasFieldWithValue(field.getName(), field.getValue())) {
322
                    mergedDoc.addField(field);
323
                }
324
            }
325

    
326
            // include in results
327
            merged.put(id, mergedDoc);
328
        }
329

    
330
        // add existing if not yet merged (needed if existing map size > pending map size)
331
        Iterator<String> existingIter = existing.keySet().iterator();
332

    
333
        while (existingIter.hasNext()) {
334
            String existingId = existingIter.next();
335

    
336
            if (!merged.containsKey(existingId)) {
337
                merged.put(existingId, existing.get(existingId));
338

    
339
            }
340
        }
341

    
342
        if (log.isTraceEnabled()) {
343
            log.trace("MERGED DOCS with existing from the Solr index: ");
344
            serializeDocuments(merged);
345
        }
346
        perfLog.log("RdfXmlSubprocess.merge total ", System.currentTimeMillis() - start);
347
        return merged;
348
    }
349

    
350
    @Override
351
    public SolrDoc mergeWithIndexedDocument(SolrDoc indexDocument) throws IOException,
352
            EncoderException, XPathExpressionException {
353
        return processorUtility.mergeWithIndexedDocument(indexDocument, fieldsToMerge);
354
    }
355

    
356
    public List<String> getFieldsToMerge() {
357
        return fieldsToMerge;
358
    }
359

    
360
    public void setFieldsToMerge(List<String> fieldsToMerge) {
361
        this.fieldsToMerge = fieldsToMerge;
362
    }
363
}
(2-2/2)