Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A class that gets Accession Number, check for uniqueness
4
 *             and register it into db
5
 *  Copyright: 2000 Regents of the University of California and the
6
 *             National Center for Ecological Analysis and Synthesis
7
 *    Authors: Jivka Bojilova, Matt Jones
8
 *
9
 *   '$Author: leinfelder $'
10
 *     '$Date: 2011-11-02 20:40:12 -0700 (Wed, 02 Nov 2011) $'
11
 * '$Revision: 6595 $'
12
 *
13
 * This program is free software; you can redistribute it and/or modify
14
 * it under the terms of the GNU General Public License as published by
15
 * the Free Software Foundation; either version 2 of the License, or
16
 * (at your option) any later version.
17
 *
18
 * This program is distributed in the hope that it will be useful,
19
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21
 * GNU General Public License for more details.
22
 *
23
 * You should have received a copy of the GNU General Public License
24
 * along with this program; if not, write to the Free Software
25
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26
 */
27
package edu.ucsb.nceas.metacat.index;
28

    
29
import java.io.FileInputStream;
30
import java.io.FileNotFoundException;
31
import java.io.InputStream;
32
import java.util.ArrayList;
33
import java.util.List;
34

    
35
import org.apache.commons.logging.Log;
36
import org.apache.commons.logging.LogFactory;
37
import org.dataone.configuration.Settings;
38
import org.dataone.service.types.v1.Identifier;
39
import org.dataone.service.types.v1.SystemMetadata;
40

    
41
import com.hazelcast.client.ClientConfig;
42
import com.hazelcast.client.HazelcastClient;
43
import com.hazelcast.config.Config;
44
import com.hazelcast.config.FileSystemXmlConfig;
45
import com.hazelcast.core.EntryEvent;
46
import com.hazelcast.core.EntryListener;
47
import com.hazelcast.core.IMap;
48

    
49
public class SystemMetadataEventListener implements EntryListener<Identifier, SystemMetadata> {
50
	
51
	private static Log log = LogFactory.getLog(SystemMetadataEventListener.class);
52
	
53
	private SolrIndex solrIndex = null;
54
	
55
	private HazelcastClient hzClient;
56

    
57
    private IMap<Identifier, SystemMetadata> systemMetadataMap;
58
    
59
    private IMap<Identifier, String> objectPathMap;
60
    
61
    /**
62
     * Default constructor - caller needs to initialize manually
63
     * @see setSolrIndex()
64
     * @see start()
65
     */
66
    public SystemMetadataEventListener() {
67
    }
68
    
69
    /**
70
     * Sets the SolrIndex instance and initializes the listener 
71
     * @param solrIndex
72
     */
73
    public SystemMetadataEventListener(SolrIndex solrIndex) {
74
    	this.solrIndex = solrIndex;
75
    	start();
76
    }
77
    
78
    /**
79
     * Get the SolrIndex that this listener communicates with
80
     * @return a SolrIndex instance that indexes the content being listened to
81
     */
82
    public SolrIndex getSolrIndex() {
83
		return solrIndex;
84
	}
85

    
86
    /**
87
     * Set the SolrIndex instance that the listener modifies
88
     * @param solrIndex
89
     */
90
	public void setSolrIndex(SolrIndex solrIndex) {
91
		this.solrIndex = solrIndex;
92
	}
93

    
94
	/**
95
     * Registers this instance as a system metadata map event listener
96
     */
97
    public void start() {
98
    	
99
    	// get config values
100
        String hzSystemMetadata = Settings.getConfiguration().getString(
101
                "dataone.hazelcast.storageCluster.systemMetadataMap");
102
        String hzObjectPath = Settings.getConfiguration().getString(
103
                "dataone.hazelcast.storageCluster.objectPathMap");
104
        String configFileName = Settings.getConfiguration().getString(
105
        		"dataone.hazelcast.configFilePath");;
106
    	Config hzConfig = null;
107
		try {
108
    		hzConfig = new FileSystemXmlConfig(configFileName);
109
    	} catch (FileNotFoundException e) {
110
    		log.error("could not load hazelcast configuration file from: " + configFileName, e);
111
			//e.printStackTrace();
112
    	}
113
		
114
        String hzGroupName = hzConfig.getGroupConfig().getName();
115
        String hzGroupPassword = hzConfig.getGroupConfig().getPassword();
116
        String hzAddress = hzConfig.getNetworkConfig().getInterfaces().getInterfaces().iterator().next() + ":" + hzConfig.getNetworkConfig().getPort();
117

    
118
    	log.info("starting index entry listener...");
119
    	log.info("System Metadata value: " + hzSystemMetadata);
120
    	log.info("Object path value: " + hzObjectPath);
121
    	log.info("Group Name: " + hzGroupName);
122
    	log.info("Group Password: " + "*****"); // don't show value
123
    	log.info("HZ Address: " + hzAddress);
124

    
125
    	// connect to the HZ cluster
126
        ClientConfig cc = new ClientConfig();
127
		cc.getGroupConfig().setName(hzGroupName);
128
		cc.getGroupConfig().setPassword(hzGroupPassword);
129
		cc.addAddress(hzAddress);
130
        try {
131
        	this.hzClient = HazelcastClient.newHazelcastClient(cc);
132
        } catch (Exception e) {
133
            log.error("Unable to create hazelcast client: ", e);
134
            e.printStackTrace();
135
        }
136
        
137
        // get shared structures and add listener
138
        this.systemMetadataMap = this.hzClient.getMap(hzSystemMetadata);
139
        this.objectPathMap = this.hzClient.getMap(hzObjectPath);
140
        this.systemMetadataMap.addEntryListener(this, true);
141

    
142
        log.info("System Metadata size: " + this.systemMetadataMap.size());
143
        log.info("Object path size:" + this.objectPathMap.size());
144
    }
145

    
146
    /**
147
     * Removes this instance as a system metadata map event listener
148
     */
149
    public void stop() {
150
    	log.info("stopping index entry listener...");
151
        this.systemMetadataMap.removeEntryListener(this);
152
    }
153
    
154
    /**
155
     * Get the SystemMetadata for the specified id from the distributed Map.
156
     * The null maybe is returned if there is no system metadata found.
157
     * @param id  the specified id.
158
     * @return the SystemMetadata associated with the id.
159
     */
160
    public SystemMetadata getSystemMetadata(String id) {
161
        SystemMetadata metadata = null;
162
        if(systemMetadataMap != null && id != null) {
163
            Identifier identifier = new Identifier();
164
            identifier.setValue(id);
165
            metadata = systemMetadataMap.get(identifier);
166
        }
167
        return metadata;
168
    }
169
    
170
    /**
171
     * Get the obsoletes chain of the specified id. The returned list doesn't include
172
     * the specified id itself. The newer version has the lower index number in the list.
173
     * Empty list will be returned if there is no document to be obsoleted by this id.
174
     * @param id
175
     * @return
176
     */
177
    public List<String> getObsoletes(String id) {
178
        List<String> obsoletes = new ArrayList<String>();
179
        while (id != null) {
180
            SystemMetadata metadata = getSystemMetadata(id);
181
            id = null;//set it to be null in order to stop the while loop if the id can't be assinged to a new value in the following code.
182
            if(metadata != null) {
183
                Identifier identifier = metadata.getObsoletes();
184
                if(identifier != null && identifier.getValue() != null && !identifier.getValue().trim().equals("")) {
185
                    obsoletes.add(identifier.getValue());
186
                    id = identifier.getValue();
187
                } 
188
            } 
189
        }
190
        return obsoletes;
191
    }
192
    
193
    
194
    /**
195
     * Get an InputStream as the data object for the specific pid.
196
     * @param pid
197
     * @return
198
     * @throws FileNotFoundException
199
     */
200
    public InputStream getDataObject(String pid) throws FileNotFoundException {
201
        Identifier identifier = new Identifier();
202
        identifier.setValue(pid);
203
        String objectPath = objectPathMap.get(identifier);
204
        InputStream data = null;
205
        data = new FileInputStream(objectPath);
206
        return data;
207

    
208
    }
209

    
210
	public void entryAdded(EntryEvent<Identifier, SystemMetadata> entryEvent) {
211
	    //System.out.println("===================================calling entryAdded method ");
212
	    log.info("===================================calling entryAdded method ");
213
		// use the same implementation for insert/update for now
214
		this.entryUpdated(entryEvent);
215

    
216
	}
217

    
218
	public void entryEvicted(EntryEvent<Identifier, SystemMetadata> entryEvent) {
219
		// remove from the index for now, this may be a temporary eviction
220
		this.entryRemoved(entryEvent);
221
		
222
	}
223

    
224
	public void entryRemoved(EntryEvent<Identifier, SystemMetadata> entryEvent) {
225
		// remove from the index
226
		Identifier pid = entryEvent.getKey();		
227
		try {
228
			solrIndex.remove(pid.getValue());
229
		} catch (Exception e) {
230
			// TODO: need to track errors, retry later
231
			log.error(e.getMessage(), e);
232
		}
233
		
234
	}
235

    
236
	public void entryUpdated(EntryEvent<Identifier, SystemMetadata> entryEvent) {
237
	    //System.out.println("===================================calling entryUpdated method ");
238
	    log.info("===================================calling entryUpdated method ");
239
		// add to the index
240
		Identifier pid = entryEvent.getKey();
241
		//System.out.println("===================================update the document "+pid.getValue());
242
		log.info("===================================update the document "+pid.getValue());
243
		SystemMetadata systemMetadata = entryEvent.getValue();
244
		Identifier obsoletes = systemMetadata.getObsoletes();
245
		List<String> obsoletesChain = null;
246
		if(obsoletes != null) {
247
		    obsoletesChain= getObsoletes(pid.getValue());
248
		}
249
		String objectPath = objectPathMap.get(pid);
250
		InputStream data = null;
251
		try {
252
			data = new FileInputStream(objectPath);
253
			solrIndex.update(pid.getValue(), obsoletesChain, systemMetadata, data);
254
		} catch (Exception e) {
255
			// TODO: need to track errors, retry later
256
			log.error(e.getMessage(), e);
257
		}
258
	}
259
    
260
}
(5-5/5)