Project

General

Profile

1
/**
2
 *  '$RCSfile$'
3
 *    Purpose: A Class that handles scheduling workflow jobs 
4
 *  Copyright: 2009 Regents of the University of California and the
5
 *             National Center for Ecological Analysis and Synthesis
6
 *    Authors: Michael Daigle
7
 * 
8
 *   '$Author: daigle $'
9
 *     '$Date: 2009-03-25 13:41:15 -0800 (Wed, 25 Mar 2009) $'
10
 * '$Revision: 4861 $'
11
 *
12
 * This program is free software; you can redistribute it and/or modify
13
 * it under the terms of the GNU General Public License as published by
14
 * the Free Software Foundation; either version 2 of the License, or
15
 * (at your option) any later version.
16
 *
17
 * This program is distributed in the hope that it will be useful,
18
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20
 * GNU General Public License for more details.
21
 *
22
 * You should have received a copy of the GNU General Public License
23
 * along with this program; if not, write to the Free Software
24
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25
 */
26

    
27
package edu.ucsb.nceas.workflowscheduler;
28

    
29
import java.io.IOException;
30
import java.io.PrintWriter;
31
import java.io.StringReader;
32
import java.util.Calendar;
33
import java.util.Hashtable;
34
import java.util.HashMap;
35

    
36
import javax.servlet.http.HttpServletRequest;
37
import javax.servlet.http.HttpServletResponse;
38
import javax.xml.xpath.XPath;
39
import javax.xml.xpath.XPathFactory;
40

    
41
import org.apache.log4j.Logger;
42

    
43
import org.xml.sax.InputSource;
44

    
45
import org.ecoinformatics.ecogrid.client.AuthenticationServiceClient;
46
import org.ecoinformatics.ecogrid.client.AuthorizationServiceClient;
47

    
48
import edu.ucsb.nceas.metacat.AccessControlInterface;
49
import edu.ucsb.nceas.metacat.scheduler.BaseScheduler;
50
import edu.ucsb.nceas.metacat.scheduler.ScheduledJobAccess;
51
import edu.ucsb.nceas.metacat.scheduler.ScheduledJobDAO;
52
import edu.ucsb.nceas.metacat.scheduler.SchedulerService;
53
import edu.ucsb.nceas.metacat.scheduler.MetacatSchedulerException;
54
import edu.ucsb.nceas.metacat.properties.PropertyService;
55
import edu.ucsb.nceas.metacat.shared.AccessException;
56
import edu.ucsb.nceas.metacat.shared.ServiceException;
57
import edu.ucsb.nceas.metacat.util.ErrorSendingErrorException;
58
import edu.ucsb.nceas.metacat.util.ResponseUtil;
59
import edu.ucsb.nceas.utilities.DateUtil;
60
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
61
import edu.ucsb.nceas.utilities.UtilException;
62

    
63
/**
64
 * @author daigle
65
 *
66
 */
67
public class WorkflowScheduler extends BaseScheduler {
68
	
69
	private static WorkflowScheduler workflowScheduler = null;
70
	
71
	private static Logger logMetacat = Logger.getLogger(WorkflowScheduler.class);
72
	
73
	private static String WORKFLOW_JOB_GROUP = "workflow";
74
	private static String WORKFLOW_JOB_CLASS = "edu.ucsb.nceas.workflowscheduler.WorkflowJob";
75

    
76
	/**
77
	 * private constructor since this is a singleton
78
	 */
79
	private WorkflowScheduler()  {}
80
	
81
	/**
82
	 * Get the single instance of SchedulerService.
83
	 * 
84
	 * @return the single instance of SchedulerService
85
	 */
86
	public static WorkflowScheduler getInstance() {
87
		if (workflowScheduler == null) {
88
			workflowScheduler = new WorkflowScheduler();
89
		}
90
		return workflowScheduler;
91
	}
92
	
93
	/**
94
	 * Scheduling a workflow
95
	 * 
96
	 * @param request
97
	 *            the servlet request object
98
	 * @param response
99
	 *            the servlet response object
100
	 * @param params
101
	 *            the request parameters
102
	 * @param username
103
	 *            the user
104
	 * @param groups
105
	 *            the user's group
106
	 */
107
	public void scheduleJob(HttpServletRequest request, HttpServletResponse response, 
108
			Hashtable<String, String[]> params) throws MetacatSchedulerException {
109
		 
110
		String sessionIds[] = params.get("sessionid");
111
		String delays[] = params.get("delay");
112
		String startTimes[] = params.get("starttime");
113
		String endTimes[] = params.get("endtime");
114
		HashMap<String, String> jobParams = new HashMap<String, String>();
115
		Calendar startCal = null;
116
		Calendar endCal = null;
117
		
118
		if (sessionIds == null) {
119
			throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - sessionid field must be populated "
120
							+ "in scheduler parameters when scheduling job.");
121
		}
122
		
123
		String sessionStatus = validateRemoteSession(sessionIds[0]);
124
		logMetacat.debug("WorkflowScheduler.scheduleJob - session status: " + sessionStatus);
125
		
126
		if (!sessionStatus.equals("valid")) {
127
			throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - session " 
128
					+ request.getSession().getId() + " is not valid.");
129
		}
130
		
131
		try {
132
			SchedulerService schedulerService = SchedulerService.getInstance();
133

    
134
			// get start time or delay.  Start time takes precedence if both exist.
135
			if (startTimes != null && startTimes.length > 0) {
136
				startCal = DateUtil.humanReadableToCalendar(startTimes[0], "MM/dd/yyyy HH:mm:ss");
137
			} else if (delays != null && delays.length > 0) {
138
				startCal = schedulerService.getStartDateFromDelay(delays[0]);
139
			} else {
140
				// if delay and starttime were not provided, set date to now.
141
				startCal = Calendar.getInstance();
142
			}
143
			
144
			// get end time.  null is fine.
145
			if (endTimes != null && endTimes.length > 0) {
146
				endCal = DateUtil.humanReadableToCalendar(endTimes[0], "MM/dd/yyyy HH:mm:ss");
147
			} 
148

    
149
			// interval value must exist
150
			String intervalValues[] = params.get("intervalvalue");
151
			if (intervalValues == null || intervalValues.length == 0) {
152
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - intervalvalue field must be populated "
153
								+ "in scheduler parameters when scheduling job.");
154
			}
155
			String intervalStrValue = intervalValues[0];
156
			int intervalValue = Integer.parseInt(intervalStrValue);
157
			
158
			// interval unit must exist
159
			String intervalUnits[] = params.get("intervalunit");
160
			if (intervalUnits == null || intervalUnits.length == 0) {
161
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - intervalunit field must be populated "
162
								+ "in scheduler parameters when scheduling job.");
163
			}
164
			String intervalUnit = intervalUnits[0];
165

    
166
			// workflow id must exist.  Add to job params
167
			String workflowids[] = params.get("workflowid");
168
			if (workflowids == null || workflowids.length == 0) {
169
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - workflowid field must be populated "
170
								+ "in scheduler parameters when scheduling job.");
171
			}
172
			jobParams.put("workflowid", workflowids[0]);
173
			
174
			String workflowAuthorizeStatus = authorizeRemoteSession(sessionIds[0], workflowids[0], AccessControlInterface.WRITESTRING);
175
			if (!workflowAuthorizeStatus.equals("true")) {
176
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - session " 
177
						+ request.getSession().getId() + " is not authorized to write workflow " + workflowids[0]  + ".");
178
			}
179
			
180
			// kar id must exist.  Add to job params
181
			String karids[] = params.get("karid");
182
			if (karids == null || karids.length == 0) {
183
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - karid field must be populated "
184
								+ "in scheduler parameters when scheduling job.");
185
			}
186
			jobParams.put("karid", karids[0]);
187
			
188
			String karAuthorizeStatus = authorizeRemoteSession(sessionIds[0], karids[0], AccessControlInterface.READSTRING);
189
			if (!karAuthorizeStatus.equals("true")) {
190
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - session " 
191
						+ request.getSession().getId() + " is not authorized to read kar " + karids[0]  + ".");
192
			}
193
			
194
			// workflow name unit must exist.  Add to job params
195
			String workflownames[] = params.get("workflowname");
196
			if (workflownames == null || workflownames.length == 0) {
197
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - workflowname field must be populated "
198
								+ "in scheduler parameters when scheduling job.");
199
			}
200
			jobParams.put("workflowname", workflownames[0]);
201

    
202
			String jobName = WORKFLOW_JOB_GROUP
203
					+ Calendar.getInstance().getTimeInMillis();
204

    
205
			// Schedule the job
206
			String xmlResult = schedulerService.scheduleJob(jobName, startCal, endCal, intervalValue, intervalUnit,
207
					WORKFLOW_JOB_CLASS, WORKFLOW_JOB_GROUP, jobParams);
208
			
209
			ResponseUtil.sendSuccessXML(response, xmlResult);
210
			
211
		} catch (UtilException ue) {
212
			throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - " 
213
					+ "Utility issue when scheduling job: " + ue.getMessage());
214
		} catch (ServiceException se) {
215
			throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - " 
216
					+ "Service issue when scheduling job", se);
217
		} catch (ErrorSendingErrorException esee) {
218
			throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - " 
219
					+ "Issue sending error when scheduling job: " + esee.getMessage());			
220
		}
221
	}
222
	
223
	/**
224
	 * Unschedule a job
225
	 * 
226
	 * @param request
227
	 *            the servlet request object
228
	 * @param response
229
	 *            the servlet response object
230
	 * @param params
231
	 *            the request parameters
232
	 * @param username
233
	 *            the user
234
	 * @param groups
235
	 *            the user's group
236
	 */
237
	public void unscheduleJob(HttpServletRequest request, HttpServletResponse response,
238
			Hashtable<String, String[]> params, String username, String[] groups)
239
			throws MetacatSchedulerException {
240
		try {
241
			String sessionIds[] = params.get("sessionid");
242
			
243
			// interval value must exist
244
			if (sessionIds == null || sessionIds.length == 0) {
245
				throw new MetacatSchedulerException("WorkflowScheduler.unscheduleJob - sessionid field must be populated "
246
								+ "in scheduler parameters when unscheduling job.");
247
			}
248
			
249
			String sessionStatus = validateRemoteSession(sessionIds[0]);
250
			logMetacat.debug("WorkflowScheduler.unscheduleJob - session status: " + sessionStatus);
251
			
252
			if (!sessionStatus.equals("valid")) {
253
				throw new MetacatSchedulerException("WorkflowScheduler.unscheduleJob - session " 
254
						+ sessionIds[0] + " is not valid.");
255
			}
256
			
257
			// workflow job id must exist
258
			String jobNames[] = params.get("workflowjobname");
259
			if (jobNames == null || jobNames.length == 0) {
260
				throw new MetacatSchedulerException("SchedulerService.unScheduleJob - workflowjobname " 
261
						+ "field must be populated in scheduler parameters when unscheduling job.");
262
			}
263
			String jobName = jobNames[0];
264

    
265
			// unschedule the job
266
			SchedulerService schedulerService = SchedulerService.getInstance();
267
			String xmlResult = schedulerService.unscheduleJob(jobName, username, groups);
268
			
269
			ResponseUtil.sendSuccessXML(response, xmlResult);
270

    
271
		} catch (ServiceException se) {
272
			throw new MetacatSchedulerException("WorkflowScheduler.unScheduleJob - " 
273
					+ "Service issue unscheduling job", se);
274
		} catch (Exception e) {
275
			throw new MetacatSchedulerException("WorkflowScheduler.unScheduleJob - " 
276
					+ "Generic issue unscheduling job: " + e.getMessage());
277
		}
278
	}
279
	
280
	/**
281
	 * reschedule job
282
	 * 
283
	 * @param request
284
	 *            the servlet request object
285
	 * @param response
286
	 *            the servlet response object
287
	 * @param params
288
	 *            the request parameters
289
	 * @param username
290
	 *            the user
291
	 * @param groups
292
	 *            the user's group
293
	 */
294
	public void rescheduleJob(HttpServletRequest request, HttpServletResponse response, 
295
			Hashtable<String, String[]> params, String username,
296
			String[] groups) throws MetacatSchedulerException {
297
		 		
298
		try {
299
			String sessionIds[] = params.get("sessionid");
300
			
301
			// interval value must exist
302
			if (sessionIds == null || sessionIds.length == 0) {
303
				throw new MetacatSchedulerException("WorkflowScheduler.rescheduleJob - sessionid field must be populated "
304
								+ "in scheduler parameters when rescheduling job.");
305
			}
306
			
307
			String sessionStatus = validateRemoteSession(sessionIds[0]);
308
			
309
			logMetacat.debug("WorkflowScheduler.rescheduleJob - session status: " + sessionStatus);
310
			
311
			if (!sessionStatus.equals("valid")) {
312
				throw new MetacatSchedulerException("WorkflowScheduler.scheduleJob - session " 
313
						+ sessionIds[0] + " is not valid.");
314
			}
315
			
316
			// workflow job id must exist
317
			String jobNames[] = params.get("workflowjobname");	
318
			if (jobNames == null || jobNames.length == 0) {
319
				throw new MetacatSchedulerException("WorkflowScheduler.rescheduleJob - workflowjobname field must be populated "
320
						+ "in scheduler parameters when rescheduling job.");
321
			}			
322
			String jobName = jobNames[0];
323
	
324
			ScheduledJobAccess jobAccess = new ScheduledJobAccess();
325
			ScheduledJobDAO jobDAO = jobAccess.getJobByName(jobName);
326
			
327
			// reschedule the job
328
			SchedulerService schedulerService = SchedulerService.getInstance();
329
			String result = schedulerService.rescheduleJob(jobDAO, username, groups);
330

    
331
			ResponseUtil.sendSuccessXML(response, result);
332
			
333
		} catch (AccessException ae) {
334
			throw new MetacatSchedulerException("WorkflowScheduler.rescheduleJob - " 
335
					+ "DB access issue when rescheduling job  : ", ae);
336
		} catch (ServiceException se) {
337
			throw new MetacatSchedulerException("WorkflowScheduler.rescheduleJob - " 
338
					+ "Service issue rescheduling job", se);
339
		} catch (ErrorSendingErrorException esee) {
340
			throw new MetacatSchedulerException("WorkflowScheduler.rescheduleJob - " 
341
					+ "Issue sending erro when rescheduling job: " + esee.getMessage());			
342
		}
343
	}
344
	
345
	/**
346
	 * delete job - to be implemented
347
	 */
348
	public void deleteJob(HttpServletRequest request, HttpServletResponse response, 
349
			Hashtable<String, String[]> params, String username, String[] groups) throws MetacatSchedulerException {
350
		 try {
351
			String sessionIds[] = params.get("sessionid");
352
				
353
			// interval value must exist
354
			if (sessionIds == null || sessionIds.length == 0) {
355
				throw new MetacatSchedulerException("WorkflowScheduler.deleteJob - sessionid field must be populated "
356
								+ "in scheduler parameters when deleting job.");
357
			}
358
				
359
			String sessionStatus = validateRemoteSession(sessionIds[0]);
360
			logMetacat.debug("WorkflowScheduler.rescheduleJob - session status: " + sessionStatus);
361
				
362
			if (!sessionStatus.equals("valid")) {
363
				throw new MetacatSchedulerException("WorkflowScheduler.deleteJob - session " 
364
						+ sessionIds[0] + " is not valid.");
365
			}
366
				
367
			// workflow job id must exist
368
			String jobNames[] = params.get("workflowjobname");
369
			if (jobNames == null || jobNames.length == 0) {
370
				throw new MetacatSchedulerException(
371
						"WorkflowScheduler.deleteJob - workflowname field must be populated "
372
								+ "in scheduler parameters when deleting job.");
373
			}
374
			String jobName = jobNames[0];
375

    
376
			ScheduledJobAccess jobAccess = new ScheduledJobAccess();
377
			ScheduledJobDAO jobDAO = jobAccess.getJobByName(jobName);
378

    
379
			// delete the job
380
			SchedulerService schedulerService = SchedulerService.getInstance();
381
			String result = schedulerService.deleteJob(jobDAO);
382

    
383
			ResponseUtil.sendSuccessXML(response, result);
384

    
385
		} catch (AccessException ae) {
386
			throw new MetacatSchedulerException("WorkflowScheduler.deleteJob - "
387
					+ "DB access issue when deleting job  : ", ae);
388
		} catch (ServiceException se) {
389
			throw new MetacatSchedulerException("WorkflowScheduler.deleteJob - "
390
					+ "Service issue deleting job", se);
391
		} catch (ErrorSendingErrorException esee) {
392
			throw new MetacatSchedulerException("WorkflowScheduler.deleteJob - "
393
					+ "Issue sending erro when deleting job: " + esee.getMessage());
394
		}
395
	}
396
	
397
	/**
398
	 * get job information for a given workflow in xml format
399
	 * 
400
	 * @param request
401
	 *            the servlet request object
402
	 * @param response
403
	 *            the servlet response object
404
	 * @param params
405
	 *            the request parameters
406
	 * @param username
407
	 *            the user
408
	 * @param groups
409
	 *            the user's group
410
	 */
411
	public void getJobs(HttpServletRequest request, HttpServletResponse response, 
412
			Hashtable<String, String[]> params, String username, String[] groups) throws MetacatSchedulerException {
413
		
414
		String workflowId = null;
415
		
416
		String workflowIds[] = params.get("workflowid");
417
		if (workflowIds != null && workflowIds.length != 0) {
418
			workflowId = workflowIds[0];
419
		}
420
		
421
		logMetacat.debug("WorkflowScheduler.getJobs - getting jobs for workflow:" + workflowId);
422
		
423
        PrintWriter out = null;
424
		try {
425
			// get the job info in xml format
426
			out = response.getWriter();
427
			SchedulerService.getInstance().getJobsInfoXML(WORKFLOW_JOB_GROUP,
428
					"workflowid", workflowId, out);
429
		} catch (ServiceException se) {
430
			throw new MetacatSchedulerException("WorkflowScheduler.getJobs - " 
431
					+ "Service error when transforming XML for workflow id: " 
432
					+ workflowId + " : " + se.getMessage());
433
		} catch (IOException ioe) {
434
			throw new MetacatSchedulerException("WorkflowScheduler.getJobs - " 
435
					+ "I/O error when transforming XML for workflow id: " 
436
					+ workflowId + " : " + ioe.getMessage());
437
		} finally {
438
			if (out != null) {
439
				out.close();
440
			}
441
		}
442
	}
443
	
444
	private String validateRemoteSession(String sessionId) throws MetacatSchedulerException {
445
		String sessionStatus = "unknown";
446
	    XPath xpath = XPathFactory.newInstance().newXPath();
447
		
448
		try {
449
			String ecogridUrl = PropertyService.getProperty("workflowScheduler.authServiceUrl");
450
		
451
			AuthenticationServiceClient authServiceClient = 
452
					new AuthenticationServiceClient(ecogridUrl);
453
			
454
			String sessionStatusXML = authServiceClient.validate_session_action(sessionId);
455
			
456
			sessionStatus = 
457
				xpath.evaluate("/validateSession/status",  new InputSource(new StringReader(sessionStatusXML)));
458
			
459
			
460
		} catch (PropertyNotFoundException pnfe) {
461
			throw new MetacatSchedulerException("WorkflowScheduler.validateSession - Could not " 
462
					+ "find property: " + pnfe.getMessage());
463
		} catch (Exception e) {
464
			throw new MetacatSchedulerException("WorkflowScheduler.validateSession - " 
465
					+ "general error when validating Session: " + e.getMessage());
466
		}
467
		
468
		
469
		return sessionStatus;
470
	}
471
	
472
	private String authorizeRemoteSession(String sessionId, String resourceLsid, String permission) 
473
		throws MetacatSchedulerException {
474
		
475
		String authStatus = "unknown";
476
	    XPath xpath = XPathFactory.newInstance().newXPath();
477
		
478
		try {
479
			String ecogridUrl = PropertyService.getProperty("workflowScheduler.authorizationServiceUrl");
480
		
481
			AuthorizationServiceClient authorizationServiceClient = 
482
					new AuthorizationServiceClient(ecogridUrl);
483
			
484
			String authStatusXML = authorizationServiceClient.is_authorized_action(sessionId, resourceLsid, permission);
485
			
486
			authStatus = 
487
				xpath.evaluate("/resourceAuthorization/isAuthorized",  new InputSource(new StringReader(authStatusXML)));
488
			
489
			
490
		} catch (PropertyNotFoundException pnfe) {
491
			throw new MetacatSchedulerException("WorkflowScheduler.authorizeRemoteSession - Could not " 
492
					+ "find property: " + pnfe.getMessage());
493
		} catch (Exception e) {
494
			throw new MetacatSchedulerException("WorkflowScheduler.authorizeRemoteSession - " 
495
					+ "general error when authorizing Session: " + e.getMessage());
496
		}
497
		
498
		
499
		return authStatus;
500
	}
501
}
(2-2/3)