Project

General

Profile

« Previous | Next » 

Revision 8590

Run syncAll in a single thread so admin config UI doesn't freeze

View differences:

src/edu/ucsb/nceas/metacat/dataone/SyncAccessPolicy.java
36 36
import java.util.List;
37 37
import java.util.Map;
38 38
import java.util.Set;
39
import java.util.concurrent.ExecutorService;
40
import java.util.concurrent.Executors;
39 41

  
40 42
import org.apache.log4j.Logger;
41 43
import org.dataone.client.CNode;
......
61 63
import edu.ucsb.nceas.metacat.IdentifierManager;
62 64
import edu.ucsb.nceas.metacat.McdbDocNotFoundException;
63 65
import edu.ucsb.nceas.metacat.accesscontrol.AccessControlException;
66
import edu.ucsb.nceas.metacat.admin.AdminException;
64 67
import edu.ucsb.nceas.metacat.properties.PropertyService;
65 68
import edu.ucsb.nceas.metacat.shared.ServiceException;
66 69
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
......
69 72
public class SyncAccessPolicy {
70 73

  
71 74
	private static Logger logMetacat = Logger.getLogger(SyncAccessPolicy.class);
72
	
75

  
73 76
	/**
74 77
	 * Synchronize access policy (from system metadata) of d1 member node with
75 78
	 * the corresponding controlling node.
......
92 95
	private List<Identifier> sync(ObjectList objList) throws ServiceFailure,
93 96
			InvalidToken, NotAuthorized, NotFound, NotImplemented,
94 97
			McdbDocNotFoundException, InvalidRequest, VersionMismatch,
95
			NumberFormatException, AccessionNumberException, SQLException, Exception {
98
			NumberFormatException, AccessionNumberException, SQLException,
99
			Exception {
96 100

  
97 101
		AccessPolicy cnAccessPolicy = null;
98 102
		AccessPolicy mnAccessPolicy = null;
......
104 108
		SystemMetadata mnSysMeta = null;
105 109

  
106 110
		CNode cn = null;
107
		
111

  
108 112
		try {
109 113
			cn = D1Client.getCN();
114
			logMetacat.debug("Will sync access policies to CN id: " + cn.getNodeId() + " with info: " + cn.toString());
110 115
		} catch (ServiceFailure sf) {
111
			logMetacat.error("Unable to get Coordinating node name for this MN");
112
			throw new AccessControlException ("Unable to get Coordinating node name for this MN");
116
			logMetacat
117
					.error("Unable to get Coordinating node name for this MN");
118
			throw new AccessControlException(
119
					"Unable to get Coordinating node name for this MN");
113 120
		}
114 121

  
115 122
		for (int i = objList.getStart(); i < objList.getCount(); i++) {
......
117 124
			objInfo = objList.getObjectInfo(i);
118 125
			pid = objInfo.getIdentifier();
119 126

  
120
			logMetacat.debug("Getting SM for pid: " + pid.getValue() + " i: " + i);
127
			logMetacat.debug("Getting SM for pid: " + pid.getValue() + " i: "
128
					+ i);
121 129
			try {
122 130
				// Get sm, access policy for requested localId
123 131
				mnSysMeta = IdentifierManager.getInstance().getSystemMetadata(
......
132 140
				continue;
133 141
			}
134 142

  
135
			logMetacat.debug("Getting access policy for pid: " + pid.getValue());
143
			logMetacat
144
					.debug("Getting access policy for pid: " + pid.getValue());
136 145

  
137 146
			mnAccessPolicy = mnSysMeta.getAccessPolicy();
138
			
147

  
139 148
			// Get sm, access policy for requested pid from the CN
140 149
			try {
141 150
				cnSysMeta = cn.getSystemMetadata(pid);
......
154 163
			if (!isEqual(mnAccessPolicy, cnAccessPolicy)) {
155 164
				try {
156 165
					BigInteger serialVersion = cnSysMeta.getSerialVersion();
157
					logMetacat.debug("Requesting CN to set access policy for pid: "
158
							+ pid.getValue() + ", serial version: "
159
							+ serialVersion.toString());
166
					logMetacat
167
							.debug("Requesting CN to set access policy for pid: "
168
									+ pid.getValue()
169
									+ ", serial version: "
170
									+ serialVersion.toString());
160 171
					cn.setAccessPolicy(session, pid, mnAccessPolicy,
161 172
							serialVersion.longValue());
162 173
					logMetacat.debug("Successfully set access policy");
......
169 180
									+ pid.getValue()
170 181
									+ " user not authorized: "
171 182
									+ na.getMessage());
172
					//throw na;
183
					// throw na;
173 184
					continue;
174 185
				} catch (ServiceFailure sf) {
175 186
					logMetacat
......
177 188
									+ pid.getValue()
178 189
									+ " Service failure: "
179 190
									+ sf.getMessage());
180
					//throw sf;
191
					// throw sf;
181 192
					continue;
182 193
				} catch (Exception e) {
183 194
					logMetacat
184 195
							.error("Error syncing CN with access policy of pid: "
185 196
									+ pid.getValue() + e.getMessage());
186
					//throw e;
197
					// throw e;
187 198
					continue;
188 199
				}
189 200
			} else {
190 201
				logMetacat.warn("Skipping pid: " + pid.getValue());
191 202
			}
192
			logMetacat.debug("Done syncing access policy for pid: " + pid.getValue());
203
			logMetacat.debug("Done syncing access policy for pid: "
204
					+ pid.getValue());
193 205
		}
194 206

  
195 207
		return syncedIds;
......
223 235
		SystemMetadata sm = new SystemMetadata();
224 236

  
225 237
		int start = 0;
226
		int count = 0; //guidsToSync.size();
238
		int count = 0; // guidsToSync.size();
227 239

  
228 240
		objList.setStart(start);
229 241

  
......
251 263
			oi.setSize(sm.getSize());
252 264
			objList.addObjectInfo(oi);
253 265
		}
254
		
266

  
255 267
		int total = count;
256 268
		objList.setCount(count);
257 269
		objList.setTotal(total);
......
260 272
		return syncedPids;
261 273
	}
262 274

  
263
	public List<Identifier> syncAll() throws ServiceFailure, InvalidToken,
264
			NotAuthorized, NotFound, NotImplemented, McdbDocNotFoundException,
265
			InvalidRequest, VersionMismatch, NumberFormatException,
266
			AccessionNumberException, SQLException, PropertyNotFoundException,
267
			ServiceException, Exception {
275
	/**
276
	 * For all guids for which current MN is authoritative, check that access
277
	 * policy is synced with CN.
278
	 * 
279
	 * @return void
280
	 */
281
	public void syncAll() throws ServiceFailure, InvalidToken, NotAuthorized,
282
			NotFound, NotImplemented, McdbDocNotFoundException, InvalidRequest,
283
			VersionMismatch, NumberFormatException, AccessionNumberException,
284
			SQLException, PropertyNotFoundException, ServiceException,
285
			Exception {
268 286

  
269
		// For the following query parameters - null indicates that the query
270
		// will not be
271
		// constrained by the parameter.
272
		Date startTime = null;
273
		Date endTime = null;
274
		ObjectFormatIdentifier objectFormatId = null;
275
		Boolean replicaStatus = false; // return only pids for which this mn is
276
										// authoritative
277
		Integer start = 0;
278
		Integer count = Integer.valueOf(PropertyService.getProperty("database.webResultsetSize"));
287
		SyncTask st = new SyncTask();
288
		// Create a single thread to run the sync of all guids in
289
		ExecutorService executor = Executors.newSingleThreadExecutor();
290
		logMetacat.debug("syncAll starting thread");
291
		executor.execute(st);
292
		// Only one task will run on this thread
293
		executor.shutdown();
279 294

  
280
		ObjectList objsToSync = IdentifierManager.getInstance()
281
				.querySystemMetadata(startTime, endTime, objectFormatId,
282
						replicaStatus, start, count);
295
		// return syncedIds;
296
	}
283 297

  
284
		List<Identifier> syncedIds = sync(objsToSync);
298
	/**
299
	 * Perform syncAll in a single thread.
300
	 * 
301
	 * @return void
302
	 */
303
	private class SyncTask implements Runnable {
285 304

  
286
		return syncedIds;
305
		@Override
306
		public void run() {
307
			// For the following query parameters - null indicates that the
308
			// query
309
			// will not be
310
			// constrained by the parameter.
311
			Date startTime = null;
312
			Date endTime = null;
313
			ObjectFormatIdentifier objectFormatId = null;
314
			Boolean replicaStatus = false; // return only pids for which this mn
315
											// is
316

  
317
			ObjectList objsToSync = null;
318
			Integer count = 0;
319
			Integer start = 0;
320
			Integer total = 0;
321
			List<Identifier> tmpIds = null;
322
			List<Identifier> syncedIds = new ArrayList<Identifier>();
323

  
324
			try {
325
				count = Integer.valueOf(PropertyService
326
						.getProperty("database.webResultsetSize"));
327
			} catch (NumberFormatException e1) {
328
				logMetacat
329
						.error("Error in  propery file for format of database.webResultsetSize, will use 1000");
330
				e1.printStackTrace();
331
				count = 1000;
332
			} catch (PropertyNotFoundException e1) {
333
				logMetacat
334
						.error("Error reading propery file for database.webResultsetSize, will use 1000");
335
				e1.printStackTrace();
336
				count = 1000;
337
			}
338

  
339
			// Get the total count of guids before we start syncing
340
			try {
341
				objsToSync = IdentifierManager.getInstance()
342
						.querySystemMetadata(startTime, endTime,
343
								objectFormatId, replicaStatus, start, count);
344

  
345
				logMetacat.debug("syncTask total # of guids: "
346
						+ objsToSync.getTotal() + ", count for this page: " + objsToSync.getCount());
347
			} catch (Exception e) {
348
				logMetacat.error("Error syncing ids");
349
			}
350
			
351
			total = objsToSync.getTotal();
352

  
353
			// The first loop might have fewer results than the requested count value from the properties file,
354
			// so in this case use count returned from IdentiferManger for the loop count/increment (loop will only execute once).
355
			if (objsToSync.getCount() < count) count = objsToSync.getCount();
356
			
357
			for (int i = 0; (i + count -1) < total; i += count) {
358
				try {
359
					logMetacat.debug("syncTask # requested: " + count
360
							+ ", start: " + start + ", total: " + total
361
							+ ", count: " + objsToSync.getCount());
362
					tmpIds = sync(objsToSync);
363
					syncedIds.addAll(tmpIds);
364
					
365
					// Set start for the next db retrieval, loop interation
366
					start += objsToSync.getCount();
367
					if (start >= total) break;
368
					objsToSync = IdentifierManager
369
							.getInstance()
370
							.querySystemMetadata(startTime, endTime,
371
									objectFormatId, replicaStatus, start, count);
372
				} catch (Exception e) {
373
					logMetacat.error("Error syncing ids");
374
					break;
375
				}
376
			}
377
			logMetacat
378
					.debug("syncTask thread completed. Number of guids synced: "
379
							+ syncedIds.size());
380
		}
287 381
	}
288 382

  
289 383
	/**
......
293 387
	 *            - first access policy in the comparison
294 388
	 * @param ap2
295 389
	 *            - second access policy in the comparison
296
	 * @return	boolean - true if access policies are equivalent
390
	 * @return boolean - true if access policies are equivalent
297 391
	 */
298 392
	private boolean isEqual(AccessPolicy ap1, AccessPolicy ap2) {
299 393

  
......
349 443
		}
350 444

  
351 445
		// Check if the number of access rules is the same for mn and cn. If not
352
		// then consider them not equal, without performing diff of each access rule.
446
		// then consider them not equal, without performing diff of each access
447
		// rule.
353 448
		if (userPerms1.entrySet().size() != userPerms2.entrySet().size())
354 449
			return false;
355
		
356
		// Now perform the comparison of each access rule of access policy 1 to ap 2.
357
		// This test assumes that the mn perms are more complete than the cn perms.
450

  
451
		// Now perform the comparison of each access rule of access policy 1 to
452
		// ap 2.
453
		// This test assumes that the mn perms are more complete than the cn
454
		// perms.
358 455
		logMetacat.debug("Performing comparison of access policies");
359 456
		for (Map.Entry<Subject, Set<Permission>> entry : userPerms1.entrySet()) {
360 457
			// User name
361 458
			Subject s1 = entry.getKey();
362 459
			// Perms that the user holds
363 460
			Set<Permission> p1 = entry.getValue();
364
			logMetacat.debug("Checking access policy of user: " + s1.getValue());
461
			logMetacat
462
					.debug("Checking access policy of user: " + s1.getValue());
365 463

  
366 464
			// Does this user exist in both access policies?
367 465
			if (userPerms2.containsKey(s1)) {
......
382 480
	}
383 481

  
384 482
	/**
385
	 * Run pid synch script on the given pids
386
	 * Each argument is an individual pid 
387
	 * because pids cannot contain whitespace. 
483
	 * Run pid synch script on the given pids Each argument is an individual pid
484
	 * because pids cannot contain whitespace.
485
	 * 
388 486
	 * @param args
389 487
	 * @throws Exception
390 488
	 */
......
392 490

  
393 491
		// set up the properties based on the test/deployed configuration of the
394 492
		// workspace
395
		SortedProperties testProperties = new SortedProperties("test/test.properties");
493
		SortedProperties testProperties = new SortedProperties(
494
				"test/test.properties");
396 495
		testProperties.load();
397
		String metacatContextDir = testProperties.getProperty("metacat.contextDir");
496
		String metacatContextDir = testProperties
497
				.getProperty("metacat.contextDir");
398 498
		PropertyService.getInstance(metacatContextDir + "/WEB-INF");
399
		
499

  
400 500
		ArrayList<String> guids = null;
401 501
		SyncAccessPolicy syncAP = new SyncAccessPolicy();
402 502

  
403 503
		if (args.length > 0) {
404 504
			try {
405 505
				guids = new ArrayList<String>(Arrays.asList(args));
406
				logMetacat.warn("Trying to syncing access policy for " + args.length + " pids");
506
				logMetacat.warn("Trying to syncing access policy for "
507
						+ args.length + " pids");
407 508
				List<Identifier> synchedPids = syncAP.sync(guids);
408
				logMetacat.warn("Sunk access policies for " + synchedPids.size() + " pids");
509
				logMetacat.warn("Sunk access policies for "
510
						+ synchedPids.size() + " pids");
409 511
			} catch (Exception e) {
410
				logMetacat.error("Error syncing pids, message: " + e.getMessage(), e);
512
				logMetacat.error(
513
						"Error syncing pids, message: " + e.getMessage(), e);
411 514
				System.exit(1);
412 515
			}
413 516
		}
src/edu/ucsb/nceas/metacat/replication/ReplicationService.java
439 439
						out.write("<html><body>Error syncing access policies</body></html>");
440 440
					}
441 441
				} else {
442
					logMetacat.debug("Syncing access policies for all docids for this node");
442
					logMetacat.debug("Request to sync all access policies has been submitted.");
443 443
					try {
444 444
						syncAP.syncAll();
445
						out.write("<html><body>Syncing access policies for all docids has completed.</body></html>");
445
						out.write("<html><body>Request to sync all access policies has been submitted.</body></html>");
446 446
					} catch (Exception e) {
447 447
						logMetacat.error("Error syncing access policies: "
448 448
								+ e.getMessage());

Also available in: Unified diff