1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package de.fu_berlin.ties.util;
23
24 import org.apache.commons.lang.builder.ToStringBuilder;
25 import org.apache.commons.pool.BasePoolableObjectFactory;
26 import org.apache.commons.pool.ObjectPool;
27 import org.apache.commons.pool.impl.StackObjectPool;
28
29 import de.fu_berlin.ties.Closeable;
30
31 /***
32 * Asynchronously executes any number of {@link java.lang.Runnable} tasks.
33 * Internally manages a pool of worker threads that are invoked to
34 * asychronously execute tasks as requested. The number of worker threads is
35 * updated on demand so every task is executed without delay.
36 *
37 * <p>Usually it should be sufficient to invoke tasks through the static
38 * {@link #invokeDefault(Runnable, String) invokeDefault} methods; in most
39 * cases there should be no reason to create instances of this class.
40 * <em>To avoid the creation of unnecessary instances, it is highly recommended
41 * to register your interest ({@link #registerInterest()}) in the default task
42 * runner prior to using it and to deregister
43 * ({@link #deregisterInterest()}) when you no longer need it.
44 * A good idea is to do this at the begin and end of your main method.
45 * You should register in a <code>finally</code> block and you <strong>must not
46 * forget</strong> to deregister, otherwise your program might run forever
47 * (because the worker threads continue waiting for tasks even after all other
48 * threads have terminated).</em>
49 *
50 * <p>When creating your own runner, you <strong>must</strong> finally call
51 * {@link #close(int)} to release each task runner you have created, or your
52 * program might run forever.
53 *
54 * @author Christian Siefkes
55 * @version $Revision: 1.15 $, $Date: 2006/10/21 16:04:27 $, $Author: siefkes $
56 */
57 public class TaskRunner implements Closeable {
58
59 /***
60 * A helper threads that executes any tasks as requested and then sleeps
61 * until a new task is required or it is told to terminate.
62 */
63 private final class WorkerThread extends Thread {
64
65 /***
66 * Indicates whether this thread should terminate (after executing any
67 * specified task).
68 */
69 private boolean shouldTerminate = false;
70
71 /***
72 * The task to be execute by this thread.
73 */
74 private Runnable task = null;
75
76 /***
77 * A name or short description of the current task -- used for debugging
78 * purposes.
79 */
80 private String taskName = null;
81
82 /***
83 * Used to guard synchrization of this thread.
84 */
85 private final Object threadGuard = new Object();
86
87 /***
88 * Creates a new instance.
89 *
90 * @param name the name of the new thread
91 */
92 private WorkerThread(final String name) {
93 super(name);
94 }
95
96 /***
97 * The run method of this thread: executing the task provided by
98 * calling {@link #setTask(Runnable, String)} and then puts itself back
99 * into the pool and waits until a new task arrives or it
100 * {@link #shouldTerminate()}. Any throwables thrown by the executed
101 * {@link java.lang.Runnable} are catched and logged so this thread
102 * cannot be crashed by careless Runnables.
103 */
104 public void run() {
105
106 boolean doTerminate = false;
107 Runnable currentTask;
108 String currentTaskName;
109
110 while (!doTerminate) {
111
112 synchronized (threadGuard) {
113 if ((task == null) && (!shouldTerminate) && (!closed)) {
114
115 try {
116 threadGuard.wait();
117 } catch (InterruptedException ie) {
118 Util.LOG.warn(
119 "Interrupted while waiting for status change"
120 + " -- this was not supposed to happen");
121 }
122 }
123
124
125
126 doTerminate = shouldTerminate || closed;
127 currentTask = task;
128 currentTaskName = taskName;
129
130
131 task = null;
132 taskName = null;
133 }
134
135
136 if (currentTask != null) {
137 try {
138 currentTask.run();
139 } catch (Throwable t) {
140 Util.LOG.error("Error while executing "
141 + currentTaskName,
142 t);
143 }
144
145
146 if (!closed) {
147 try {
148 threadPool.returnObject(this);
149 } catch (Exception e) {
150 doTerminate = true;
151 Util.LOG.error("Could not return myself into the "
152 + "pool -- will now terminate myself",
153 e);
154 }
155 }
156 }
157 }
158 }
159
160 /***
161 * Sets the task to be executed by this thread and notifies the
162 * thread that new work has arrived.
163 *
164 * @param newTask the task to be executed by this thread
165 * @param newTaskName A name or short description of the new task --
166 * used for debugging purposes
167 * @throws IllegalStateException if there already is another task set
168 * (a thread can only execute a single given task at any time)
169 */
170 private void setTask(final Runnable newTask,
171 final String newTaskName) throws IllegalStateException {
172 synchronized (threadGuard) {
173 if (task != null) {
174 throw new IllegalStateException("Task of " + this.toString()
175 + "already set to " + taskName
176 + " -- must be null when setting a new task");
177 }
178 task = newTask;
179 taskName = newTaskName;
180
181
182 threadGuard.notify();
183 }
184 }
185
186 /***
187 * Tells this thread to terminate and notifies the thread of the status
188 * change. The thread will only terminate after executing the given
189 * task, if any is left.
190 */
191 private void shouldTerminate() {
192 synchronized (threadGuard) {
193 shouldTerminate = true;
194
195
196 threadGuard.notify();
197 }
198 }
199
200 /***
201 * Returns a string representation of this thread.
202 *
203 * @return a textual representation
204 */
205 public String toString() {
206 return new ToStringBuilder(this).
207 appendSuper(super.toString()).
208 append("task name", taskName).
209 append("should terminate", shouldTerminate).
210 toString();
211 }
212
213 }
214
215
216 /***
217 * The base name of worker threads used by the default instance.
218 */
219 public static final String DEFAULT_NAME = "worker";
220
221 /***
222 * The default instance used by the static methods. This instance should be
223 * sufficient for most purposes, so usually there should be no reason to
224 * create any other instances.
225 */
226 private static TaskRunner defaultRunner;
227
228 /***
229 * The default executor used for tasks that can be queued (executed later).
230 */
231
232 /***
233 * The number of callers that have registered their interest in the
234 * {@link #defaultRunner}. The default runner is alive while this number
235 * is greater than 0.
236 */
237 private static int interestCount = 0;
238
239 /***
240 * Deregisters interest to use the default runner. You <strong>must</strong>
241 * have called {@link #registerInterest()} prior to calling this method.
242 * You should calls this method in a <code>finally</code> block to ensure
243 * it is always executed.
244 */
245 public static synchronized void deregisterInterest() {
246 if (interestCount > 0) {
247 interestCount--;
248
249 if (interestCount == 0) {
250 if (defaultRunner != null) {
251
252 defaultRunner.close(0);
253 defaultRunner = null;
254 Util.LOG.debug("Terminated default task runner");
255 }
256 }
257 } else {
258 Util.LOG.error("TaskRunner.deregisterInterest called but interest "
259 + "count is already " + interestCount);
260 }
261 }
262
263 /***
264 * Invokes a task to be executed asynchronously, using the default task
265 * runner. This method returns immediately; the given task is executed in
266 * another thread. The task is executed with the default priority
267 * ({@link Thread#NORM_PRIORITY}).
268 *
269 * @param task the Runnable to be executed asynchronously
270 * @param taskName a name or short description of the new task --
271 * used for debugging purposes
272 */
273 public static void invokeDefault(final Runnable task,
274 final String taskName) {
275 invokeDefault(task, taskName, Thread.NORM_PRIORITY);
276 }
277
278 /***
279 * Invokes a task to be executed asynchronously using the default task
280 * runner. This method returns immediately; the given task is executed in
281 * another thread.
282 *
283 * @param task the Runnable to be executed asynchronously
284 * @param taskName a name or short description of the new task --
285 * used for debugging purposes
286 * @param priority the priority to use for the thread executing the task;
287 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
288 * {@link java.lang.Thread#MAX_PRIORITY}
289 * @throws IllegalArgumentException if the priority is not in the range
290 * {@link java.lang.Thread#MIN_PRIORITY} to
291 * {@link java.lang.Thread#MAX_PRIORITY}
292 */
293 public static synchronized void invokeDefault(final Runnable task,
294 final String taskName, final int priority)
295 throws IllegalArgumentException {
296 registerInterest();
297
298
299 if (defaultRunner == null) {
300 defaultRunner = new TaskRunner(DEFAULT_NAME);
301 Util.LOG.debug("Initialized default task runner");
302 }
303
304 try {
305 defaultRunner.invoke(task, taskName, priority);
306 } finally {
307 deregisterInterest();
308 }
309 }
310
311 /***
312 * Registers interest to use the default runner. You <strong>must</strong>
313 * {@link #deregisterInterest()} some time after calling this method,
314 * otherwise your program will not terminate regularly.
315 */
316 public static synchronized void registerInterest() {
317 interestCount++;
318 }
319
320
321 /***
322 * The base name of worker threads. The actual names of worker threads
323 * are formed by appending {@link #nextFreeNumber} to this name.
324 */
325 private final String baseName;
326
327 /***
328 * Whether this task runner has been closed.
329 */
330 private boolean closed = false;
331
332 /***
333 * The default priority to use for threads.
334 */
335 private final int defaultPriority;
336
337 /***
338 * The next available number to use when naming worker threads. Should
339 * be synchronized on {@link #threadPool}.
340 */
341 private int nextFreeNumber = 1;
342
343 /***
344 * The pool of worker thread used to execute the requested tasks.
345 */
346 private final ObjectPool threadPool;
347
348
349 /***
350 * Creates a new instance, using the default priority
351 * ({@link java.lang.Thread#NORM_PRIORITY}) for threads.
352 *
353 * @param baseThreadName the base name of worker threads -- the actual
354 * names are formed by appending the next available number
355 */
356 public TaskRunner(final String baseThreadName) {
357 this(baseThreadName, Thread.NORM_PRIORITY);
358 }
359
360 /***
361 * Creates a new instance.
362 *
363 * @param baseThreadName the base name of worker threads -- the actual
364 * names are formed by appending the next available number
365 * @param defaultPrio the priority to use for threads;
366 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
367 * {@link java.lang.Thread#MAX_PRIORITY}
368 * @throws IllegalArgumentException if the priority is not in the range
369 * {@link java.lang.Thread#MIN_PRIORITY} to
370 * {@link java.lang.Thread#MAX_PRIORITY}
371 */
372 public TaskRunner(final String baseThreadName, final int defaultPrio)
373 throws IllegalArgumentException {
374 super();
375 checkPriority(defaultPrio);
376 baseName = baseThreadName;
377 defaultPriority = defaultPrio;
378
379
380
381 threadPool = new StackObjectPool(new BasePoolableObjectFactory() {
382 public final void destroyObject(final Object object) {
383
384
385 ((WorkerThread) object).shouldTerminate();
386
387
388 }
389
390 public final Object makeObject() {
391 final String threadName;
392 synchronized (threadPool) {
393
394 threadName = baseName + nextFreeNumber++;
395 }
396
397 final WorkerThread worker = new WorkerThread(threadName);
398 if (worker.getPriority() != defaultPriority) {
399 worker.setPriority(defaultPriority);
400 }
401 worker.start();
402
403 return worker;
404 }
405 });
406 }
407
408 /***
409 * Helper methods that checks whether a priority is in the valid range
410 * and throws an exception otherwise.
411 *
412 * @param priority the priority to check
413 * @throws IllegalArgumentException if the priority is outside the valid
414 * range
415 */
416 private void checkPriority(final int priority)
417 throws IllegalArgumentException {
418 if ((priority < Thread.MIN_PRIORITY)
419 || (priority > Thread.MAX_PRIORITY)) {
420 throw new IllegalArgumentException("Priority " + priority
421 + "is outside the valid range " + Thread.MIN_PRIORITY
422 + " to " + Thread.MAX_PRIORITY);
423 }
424 }
425
426 /***
427 * Closes this task runner. All thread will be terminated after executing
428 * their current tasks, so no tasks will be lost. You must call this
429 * method to release each task runner you created, or your program might
430 * run forever (because the worker threads continue waiting for tasks
431 * even after all other threads have terminated). You must not call
432 * {@link #invoke(Runnable, String)} after calling this method.
433 *
434 * @param errorCount the number of errors (exceptions) that occurred during
435 * calls to this instance (0 if none); ignored by this method
436 */
437 public final void close(final int errorCount) {
438 if (!closed) {
439 closed = true;
440 try {
441
442 threadPool.clear();
443 threadPool.close();
444 } catch (RuntimeException re) {
445
446 throw re;
447 } catch (Exception e) {
448
449
450 throw new RuntimeException(
451 "Error while closing task runner", e);
452 }
453 }
454 }
455
456 /***
457 * Called by the garbage collector on an object when garbage collection
458 * determines that there are no more references to the object. Delegates
459 * to {@link #close(int)} to terminate all worker threads if not yet done.
460 * <b>You should not rely on this method but always call {@link #close(int)}
461 * yourself.</b>
462 */
463 protected final void finalize() {
464 close(0);
465 }
466
467 /***
468 * Returns the base name of worker threads. The actual names of worker
469 * threads are formed by appending the next available number to this name
470 * (baseName + 1, baseName + 2 etc.)
471 *
472 * @return the base name
473 */
474 public final String getBaseName() {
475 return baseName;
476 }
477
478 /***
479 * Returns the default priority to use for threads.
480 *
481 * @return the priority
482 */
483 public final int getDefaultPriority() {
484 return defaultPriority;
485 }
486
487 /***
488 * Invokes a task to be executed asynchronously. This method returns
489 * immediately; the given task is executed in another thread. The task
490 * is executed with the default priority of this task runner
491 * ({@link #getDefaultPriority()}).
492 *
493 * @param task the Runnable to be executed asynchronously
494 * @param taskName a name or short description of the new task --
495 * used for debugging purposes
496 */
497 public final void invoke(final Runnable task, final String taskName) {
498 invoke(task, taskName, getDefaultPriority());
499 }
500
501 /***
502 * Invokes a task to be executed asynchronously. This method returns
503 * immediately; the given task is executed in another thread.
504 *
505 * @param task the Runnable to be executed asynchronously
506 * @param taskName a name or short description of the new task --
507 * used for debugging purposes
508 * @param priority the priority to use for the thread executing the task;
509 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
510 * {@link java.lang.Thread#MAX_PRIORITY}
511 * @throws IllegalArgumentException if the priority is not in the range
512 * {@link java.lang.Thread#MIN_PRIORITY} to
513 * {@link java.lang.Thread#MAX_PRIORITY}
514 */
515 public final void invoke(final Runnable task, final String taskName,
516 final int priority) throws IllegalArgumentException {
517 checkPriority(priority);
518 final WorkerThread worker;
519 try {
520 worker = (WorkerThread) threadPool.borrowObject();
521 } catch (RuntimeException re) {
522
523 throw re;
524 } catch (Exception e) {
525
526
527 throw new RuntimeException(
528 "Could not borrow worker thread from pool", e);
529 }
530
531
532 if (worker.getPriority() != priority) {
533 worker.setPriority(priority);
534 }
535
536
537 worker.setTask(task, taskName);
538 }
539
540 /***
541 * Returns whether this task runner has been closed.
542 *
543 * @return <code>true</code> iff {@link #close(int)} has been called on this
544 * object
545 */
546 public final boolean isClosed() {
547 return closed;
548 }
549
550 /***
551 * Returns a string representation of this object.
552 *
553 * @return a textual representation
554 */
555 public String toString() {
556 return new ToStringBuilder(this).
557 append("default priority", defaultPriority).
558 append("is closed", closed).
559 append("base name of worker thread", baseName).
560 append("next free thread number", nextFreeNumber).
561 append("thread pool", threadPool).
562 toString();
563 }
564
565 }