1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package de.fu_berlin.ties.util;
23
24 import org.apache.commons.lang.builder.ToStringBuilder;
25 import org.apache.commons.pool.BasePoolableObjectFactory;
26 import org.apache.commons.pool.ObjectPool;
27 import org.apache.commons.pool.impl.StackObjectPool;
28
29 import de.fu_berlin.ties.Closeable;
30
31
32 /***
33 * Asynchronously executes any number of {@link java.lang.Runnable} tasks.
34 * Internally manages a pool of worker threads that are invoked to
35 * asychronously execute tasks as requested. The number of worker threads is
36 * updated on demand so every task is executed without delay.
37 *
38 * <p>Usually it should be sufficient to invoke tasks through the static
39 * {@link #invokeDefault(Runnable, String) invokeDefault} methods; in most
40 * cases there should be no reason to create instances of this class.
41 * <em>To avoid the creation of unnecessary instances, it is highly recommended
42 * to register your interest ({@link #registerInterest()}) in the default task
43 * runner prior to using it and to deregister
44 * ({@link #deregisterInterest()}) when you no longer need it.
45 * A good idea is to do this at the begin and end of your main method.
46 * You should register in a <code>finally</code> block and you <strong>must not
47 * forget</strong> to deregister, otherwise your program might run forever
48 * (because the worker threads continue waiting for tasks even after all other
49 * threads have terminated).</em>
50 *
51 * <p>When creating your own runner, you <strong>must</strong> finally call
52 * {@link #close(int)} to release each task runner you have created, or your
53 * program might run forever.
54 *
55 * @author Christian Siefkes
56 * @version $Revision: 1.11 $, $Date: 2004/10/11 17:08:24 $, $Author: siefkes $
57 */
58 public class TaskRunner implements Closeable {
59
60 /***
61 * A helper threads that executes any tasks as requested and then sleeps
62 * until a new task is required or it is told to terminate.
63 */
64 private final class WorkerThread extends Thread {
65
66 /***
67 * Indicates whether this thread should terminate (after executing any
68 * specified task).
69 */
70 private boolean shouldTerminate = false;
71
72 /***
73 * The task to be execute by this thread.
74 */
75 private Runnable task = null;
76
77 /***
78 * A name or short description of the current task -- used for debugging
79 * purposes.
80 */
81 private String taskName = null;
82
83 /***
84 * Used to guard synchrization of this thread.
85 */
86 private final Object threadGuard = new Object();
87
88 /***
89 * Creates a new instance.
90 *
91 * @param name the name of the new thread
92 */
93 private WorkerThread(final String name) {
94 super(name);
95 }
96
97 /***
98 * The run method of this thread: executing the task provided by
99 * calling {@link #setTask(Runnable, String)} and then puts itself back
100 * into the pool and waits until a new task arrives or it
101 * {@link #shouldTerminate()}. Any throwables thrown by the executed
102 * {@link java.lang.Runnable} are catched and logged so this thread
103 * cannot be crashed by careless Runnables.
104 */
105 public void run() {
106
107 boolean doTerminate = false;
108 Runnable currentTask;
109 String currentTaskName;
110
111 while (!doTerminate) {
112
113 synchronized (threadGuard) {
114 if ((task == null) && (!shouldTerminate) && (!closed)) {
115
116 try {
117 threadGuard.wait();
118 } catch (InterruptedException ie) {
119 Util.LOG.warn(
120 "Interrupted while waiting for status change"
121 + " -- this was not supposed to happen");
122 }
123 }
124
125
126
127 doTerminate = shouldTerminate || closed;
128 currentTask = task;
129 currentTaskName = taskName;
130
131
132 task = null;
133 taskName = null;
134 }
135
136
137 if (currentTask != null) {
138 try {
139 currentTask.run();
140 } catch (Throwable t) {
141 Util.LOG.error("Error while executing "
142 + currentTaskName,
143 t);
144 }
145
146
147 if (!closed) {
148 try {
149 threadPool.returnObject(this);
150 } catch (Exception e) {
151 doTerminate = true;
152 Util.LOG.error("Could not return myself into the "
153 + "pool -- will now terminate myself",
154 e);
155 }
156 }
157 }
158 }
159 }
160
161 /***
162 * Sets the task to be executed by this thread and notifies the
163 * thread that new work has arrived.
164 *
165 * @param newTask the task to be executed by this thread
166 * @param newTaskName A name or short description of the new task --
167 * used for debugging purposes
168 * @throws IllegalStateException if there already is another task set
169 * (a thread can only execute a single given task at any time)
170 */
171 private void setTask(final Runnable newTask,
172 final String newTaskName) throws IllegalStateException {
173 synchronized (threadGuard) {
174 if (task != null) {
175 throw new IllegalStateException("Task of " + this.toString()
176 + "already set to " + taskName
177 + " -- must be null when setting a new task");
178 }
179 task = newTask;
180 taskName = newTaskName;
181
182
183 threadGuard.notify();
184 }
185 }
186
187 /***
188 * Tells this thread to terminate and notifies the thread of the status
189 * change. The thread will only terminate after executing the given
190 * task, if any is left.
191 */
192 private void shouldTerminate() {
193 synchronized (threadGuard) {
194 shouldTerminate = true;
195
196
197 threadGuard.notify();
198 }
199 }
200
201 /***
202 * Returns a string representation of this thread.
203 *
204 * @return a textual representation
205 */
206 public String toString() {
207 return new ToStringBuilder(this).
208 appendSuper(super.toString()).
209 append("task name", taskName).
210 append("should terminate", shouldTerminate).
211 toString();
212 }
213
214 }
215
216
217 /***
218 * The base name of worker threads used by the default instance.
219 */
220 public static final String DEFAULT_NAME = "worker";
221
222 /***
223 * The default instance used by the static methods. This instance should be
224 * sufficient for most purposes, so usually there should be no reason to
225 * create any other instances.
226 */
227 private static TaskRunner defaultRunner;
228
229 /***
230 * The default executor used for tasks that can be queued (executed later).
231 */
232
233 /***
234 * The number of callers that have registered their interest in the
235 * {@link #defaultRunner}. The default runner is alive while this number
236 * is greater than 0.
237 */
238 private static int interestCount = 0;
239
240 /***
241 * Deregisters interest to use the default runner. You <strong>must</strong>
242 * have called {@link #registerInterest()} prior to calling this method.
243 * You should calls this method in a <code>finally</code> block to ensure
244 * it is always executed.
245 */
246 public static synchronized void deregisterInterest() {
247 if (interestCount > 0) {
248 interestCount--;
249
250 if (interestCount == 0) {
251 if (defaultRunner != null) {
252
253 defaultRunner.close(0);
254 defaultRunner = null;
255 Util.LOG.debug("Terminated default task runner");
256 }
257 }
258 } else {
259 Util.LOG.error("TaskRunner.deregisterInterest called but interest "
260 + "count is already " + interestCount);
261 }
262 }
263
264 /***
265 * Invokes a task to be executed asynchronously, using the default task
266 * runner. This method returns immediately; the given task is executed in
267 * another thread. The task is executed with the default priority
268 * ({@link Thread#NORM_PRIORITY}).
269 *
270 * @param task the Runnable to be executed asynchronously
271 * @param taskName a name or short description of the new task --
272 * used for debugging purposes
273 */
274 public static void invokeDefault(final Runnable task,
275 final String taskName) {
276 invokeDefault(task, taskName, Thread.NORM_PRIORITY);
277 }
278
279 /***
280 * Invokes a task to be executed asynchronously using the default task
281 * runner. This method returns immediately; the given task is executed in
282 * another thread.
283 *
284 * @param task the Runnable to be executed asynchronously
285 * @param taskName a name or short description of the new task --
286 * used for debugging purposes
287 * @param priority the priority to use for the thread executing the task;
288 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
289 * {@link java.lang.Thread#MAX_PRIORITY}
290 * @throws IllegalArgumentException if the priority is not in the range
291 * {@link java.lang.Thread#MIN_PRIORITY} to
292 * {@link java.lang.Thread#MAX_PRIORITY}
293 */
294 public static synchronized void invokeDefault(final Runnable task,
295 final String taskName, final int priority)
296 throws IllegalArgumentException {
297 registerInterest();
298
299
300 if (defaultRunner == null) {
301 defaultRunner = new TaskRunner(DEFAULT_NAME);
302 Util.LOG.debug("Initialized default task runner");
303 }
304
305 try {
306 defaultRunner.invoke(task, taskName, priority);
307 } finally {
308 deregisterInterest();
309 }
310 }
311
312 /***
313 * Registers interest to use the default runner. You <strong>must</strong>
314 * {@link #deregisterInterest()} some time after calling this method,
315 * otherwise your program will not terminate regularly.
316 */
317 public static synchronized void registerInterest() {
318 interestCount++;
319 }
320
321
322 /***
323 * The base name of worker threads. The actual names of worker threads
324 * are formed by appending {@link #nextFreeNumber} to this name.
325 */
326 private final String baseName;
327
328 /***
329 * Whether this task runner has been closed.
330 */
331 private boolean closed = false;
332
333 /***
334 * The default priority to use for threads.
335 */
336 private final int defaultPriority;
337
338 /***
339 * The next available number to use when naming worker threads. Should
340 * be synchronized on {@link #threadPool}.
341 */
342 private int nextFreeNumber = 1;
343
344 /***
345 * The pool of worker thread used to execute the requested tasks.
346 */
347 private final ObjectPool threadPool;
348
349
350 /***
351 * Creates a new instance, using the default priority
352 * ({@link java.lang.Thread#NORM_PRIORITY}) for threads.
353 *
354 * @param baseThreadName the base name of worker threads -- the actual
355 * names are formed by appending the next available number
356 */
357 public TaskRunner(final String baseThreadName) {
358 this(baseThreadName, Thread.NORM_PRIORITY);
359 }
360
361 /***
362 * Creates a new instance.
363 *
364 * @param baseThreadName the base name of worker threads -- the actual
365 * names are formed by appending the next available number
366 * @param defaultPrio the priority to use for threads;
367 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
368 * {@link java.lang.Thread#MAX_PRIORITY}
369 * @throws IllegalArgumentException if the priority is not in the range
370 * {@link java.lang.Thread#MIN_PRIORITY} to
371 * {@link java.lang.Thread#MAX_PRIORITY}
372 */
373 public TaskRunner(final String baseThreadName, final int defaultPrio)
374 throws IllegalArgumentException {
375 super();
376 checkPriority(defaultPrio);
377 baseName = baseThreadName;
378 defaultPriority = defaultPrio;
379
380
381
382 threadPool = new StackObjectPool(new BasePoolableObjectFactory() {
383 public final void destroyObject(final Object object) {
384
385
386 ((WorkerThread) object).shouldTerminate();
387
388
389 }
390
391 public final Object makeObject() {
392 final String threadName;
393 synchronized (threadPool) {
394
395 threadName = baseName + nextFreeNumber++;
396 }
397
398 final WorkerThread worker = new WorkerThread(threadName);
399 if (worker.getPriority() != defaultPriority) {
400 worker.setPriority(defaultPriority);
401 }
402 worker.start();
403
404 return worker;
405 }
406 });
407 }
408
409 /***
410 * Helper methods that checks whether a priority is in the valid range
411 * and throws an exception otherwise.
412 *
413 * @param priority the priority to check
414 * @throws IllegalArgumentException if the priority is outside the valid
415 * range
416 */
417 private void checkPriority(final int priority)
418 throws IllegalArgumentException {
419 if ((priority < Thread.MIN_PRIORITY)
420 || (priority > Thread.MAX_PRIORITY)) {
421 throw new IllegalArgumentException("Priority " + priority
422 + "is outside the valid range " + Thread.MIN_PRIORITY
423 + " to " + Thread.MAX_PRIORITY);
424 }
425 }
426
427 /***
428 * Closes this task runner. All thread will be terminated after executing
429 * their current tasks, so no tasks will be lost. You must call this
430 * method to release each task runner you created, or your program might
431 * run forever (because the worker threads continue waiting for tasks
432 * even after all other threads have terminated). You must not call
433 * {@link #invoke(Runnable, String)} after calling this method.
434 *
435 * @param errorCount the number of errors (exceptions) that occurred during
436 * calls to this instance (0 if none); ignored by this method
437 */
438 public final void close(final int errorCount) {
439 if (!closed) {
440 closed = true;
441 try {
442
443 threadPool.clear();
444 threadPool.close();
445 } catch (RuntimeException re) {
446
447 throw re;
448 } catch (Exception e) {
449
450
451 throw new RuntimeException(
452 "Error while closing task runner", e);
453 }
454 }
455 }
456
457 /***
458 * Called by the garbage collector on an object when garbage collection
459 * determines that there are no more references to the object. Delegates
460 * to {@link #close(int)} to terminate all worker threads if not yet done.
461 * <b>You should not rely on this method but always call {@link #close(int)}
462 * yourself.</b>
463 */
464 protected final void finalize() {
465 close(0);
466 }
467
468 /***
469 * Returns the base name of worker threads. The actual names of worker
470 * threads are formed by appending the next available number to this name
471 * (baseName + 1, baseName + 2 etc.)
472 *
473 * @return the base name
474 */
475 public final String getBaseName() {
476 return baseName;
477 }
478
479 /***
480 * Returns the default priority to use for threads.
481 *
482 * @return the priority
483 */
484 public final int getDefaultPriority() {
485 return defaultPriority;
486 }
487
488 /***
489 * Invokes a task to be executed asynchronously. This method returns
490 * immediately; the given task is executed in another thread. The task
491 * is executed with the default priority of this task runner
492 * ({@link #getDefaultPriority()}).
493 *
494 * @param task the Runnable to be executed asynchronously
495 * @param taskName a name or short description of the new task --
496 * used for debugging purposes
497 */
498 public final void invoke(final Runnable task, final String taskName) {
499 invoke(task, taskName, getDefaultPriority());
500 }
501
502 /***
503 * Invokes a task to be executed asynchronously. This method returns
504 * immediately; the given task is executed in another thread.
505 *
506 * @param task the Runnable to be executed asynchronously
507 * @param taskName a name or short description of the new task --
508 * used for debugging purposes
509 * @param priority the priority to use for the thread executing the task;
510 * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
511 * {@link java.lang.Thread#MAX_PRIORITY}
512 * @throws IllegalArgumentException if the priority is not in the range
513 * {@link java.lang.Thread#MIN_PRIORITY} to
514 * {@link java.lang.Thread#MAX_PRIORITY}
515 */
516 public final void invoke(final Runnable task, final String taskName,
517 final int priority) throws IllegalArgumentException {
518 checkPriority(priority);
519 final WorkerThread worker;
520 try {
521 worker = (WorkerThread) threadPool.borrowObject();
522 } catch (RuntimeException re) {
523
524 throw re;
525 } catch (Exception e) {
526
527
528 throw new RuntimeException(
529 "Could not borrow worker thread from pool", e);
530 }
531
532
533 if (worker.getPriority() != priority) {
534 worker.setPriority(priority);
535 }
536
537
538 worker.setTask(task, taskName);
539 }
540
541 /***
542 * Returns whether this task runner has been closed.
543 *
544 * @return <code>true</code> iff {@link #close(int)} has been called on this
545 * object
546 */
547 public final boolean isClosed() {
548 return closed;
549 }
550
551 /***
552 * Returns a string representation of this object.
553 *
554 * @return a textual representation
555 */
556 public String toString() {
557 return new ToStringBuilder(this).
558 append("default priority", defaultPriority).
559 append("is closed", closed).
560 append("base name of worker thread", baseName).
561 append("next free thread number", nextFreeNumber).
562 append("thread pool", threadPool).
563 toString();
564 }
565
566 }