View Javadoc

1   /*
2    * Copyright (C) 2003-2004 Christian Siefkes <christian@siefkes.net>.
3    * Development of this software is supported by the German Research Society,
4    * Berlin-Brandenburg Graduate School in Distributed Information Systems
5    * (DFG grant no. GRK 316).
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 2.1 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, visit
19   * http://www.gnu.org/licenses/lgpl.html or write to the Free Software
20   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21   */
22  package de.fu_berlin.ties.util;
23  
24  import org.apache.commons.lang.builder.ToStringBuilder;
25  import org.apache.commons.pool.BasePoolableObjectFactory;
26  import org.apache.commons.pool.ObjectPool;
27  import org.apache.commons.pool.impl.StackObjectPool;
28  
29  import de.fu_berlin.ties.Closeable;
30  
31  
32  /***
33   * Asynchronously executes any number of {@link java.lang.Runnable} tasks.
34   * Internally manages a pool of worker threads that are invoked to
35   * asychronously execute tasks as requested. The number of worker threads is
36   * updated on demand so every task is executed without delay.
37   *
38   * <p>Usually it should be sufficient to invoke tasks through the static
39   * {@link #invokeDefault(Runnable, String) invokeDefault} methods; in most
40   * cases there should be no reason to create instances of this class.
41   * <em>To avoid the creation of unnecessary instances, it is highly recommended
42   * to register your interest ({@link #registerInterest()}) in the default task
43   * runner prior to using it and to deregister
44   * ({@link #deregisterInterest()}) when you no longer need it.
45   * A good idea is to do this at the begin and end of your main method.
46   * You should register in a <code>finally</code> block and you <strong>must not
47   * forget</strong> to deregister, otherwise your program might run forever
48   * (because the worker threads continue waiting for tasks even after all other
49   * threads have terminated).</em>
50   *
51   * <p>When creating your own runner, you <strong>must</strong> finally call
52   * {@link #close(int)} to release each task runner you have created, or your
53   * program might run forever.
54   *
55   * @author Christian Siefkes
56   * @version $Revision: 1.11 $, $Date: 2004/10/11 17:08:24 $, $Author: siefkes $
57   */
58  public class TaskRunner implements Closeable {
59  
60      /***
61       * A helper threads that executes any tasks as requested and then sleeps
62       * until a new task is required or it is told to terminate.
63       */
64      private final class WorkerThread extends Thread {
65  
66          /***
67           * Indicates whether this thread should terminate (after executing any
68           * specified task).
69           */
70          private boolean shouldTerminate = false;
71  
72          /***
73           * The task to be execute by this thread.
74           */
75          private Runnable task = null;
76  
77          /***
78           * A name or short description of the current task -- used for debugging
79           * purposes.
80           */
81          private String taskName = null;
82  
83          /***
84           * Used to guard synchrization of this thread.
85           */
86          private final Object threadGuard = new Object();
87  
88          /***
89           * Creates a new instance.
90           *
91           * @param name the name of the new thread
92           */
93          private WorkerThread(final String name) {
94              super(name);
95          }
96  
97          /***
98           * The run method of this thread: executing the task provided by
99           * calling {@link #setTask(Runnable, String)} and then puts itself back
100          * into the pool and waits until a new task arrives or it
101          * {@link #shouldTerminate()}. Any throwables thrown by the executed
102          * {@link java.lang.Runnable} are catched and logged so this thread
103          * cannot be crashed by careless Runnables.
104          */
105         public void run() {
106             // local copies that not require synchronization
107             boolean doTerminate = false;
108             Runnable currentTask;
109             String currentTaskName;
110 
111             while (!doTerminate) {
112                 // get task, if any
113                 synchronized (threadGuard) {
114                     if ((task == null) && (!shouldTerminate) && (!closed)) {
115                         // nothing to do--wait until notified of status change
116                         try {
117                             threadGuard.wait();
118                         } catch (InterruptedException ie) {
119                             Util.LOG.warn(
120                                 "Interrupted while waiting for status change"
121                                 + "  -- this was not supposed to happen");
122                         }
123                     }
124 
125                     // Terminate when requested or when the outer class has
126                     // been closed.
127                     doTerminate = shouldTerminate || closed;
128                     currentTask = task;
129                     currentTaskName = taskName;
130 
131                     // set member fields to null so a new task can arrive
132                     task = null;
133                     taskName = null;
134                 }
135 
136                 // execute task, if any
137                 if (currentTask != null) {
138                     try {
139                         currentTask.run();
140                     } catch (Throwable t) {
141                         Util.LOG.error("Error while executing "
142                             + currentTaskName,
143                             t);
144                     }
145 
146                     // now put myself back into the thread pool (if still open)
147                     if (!closed) {
148                         try {
149                             threadPool.returnObject(this);
150                         } catch (Exception e) {
151                             doTerminate = true;
152                             Util.LOG.error("Could not return myself into the "
153                                 + "pool -- will now terminate myself",
154                                 e);
155                         }                        
156                     }
157                 }
158             }
159         }
160 
161         /***
162          * Sets the task to be executed by this thread and notifies the
163          * thread that new work has arrived.
164          *
165          * @param newTask the task to be executed by this thread
166          * @param newTaskName A name or short description of the new task --
167          * used for debugging purposes
168          * @throws IllegalStateException if there already is another task set
169          * (a thread can only execute a single given task at any time)
170          */
171         private void setTask(final Runnable newTask,
172             final String newTaskName) throws IllegalStateException {
173             synchronized (threadGuard) {
174                 if (task != null) {
175                     throw new IllegalStateException("Task of " + this.toString()
176                         + "already set to " + taskName
177                         + " -- must be null when setting a new task");
178                 }
179                 task = newTask;
180                 taskName = newTaskName;
181 
182                 // wake the thread if it's waiting
183                 threadGuard.notify();
184             }
185         }
186 
187         /***
188          * Tells this thread to terminate and notifies the thread of the status
189          * change. The thread will only terminate after executing the given
190          * task, if any is left.
191          */
192         private void shouldTerminate() {
193             synchronized (threadGuard) {
194                 shouldTerminate = true;
195 
196                 // wake the thread if it's waiting
197                 threadGuard.notify();
198             }
199         }
200 
201         /***
202          * Returns a string representation of this thread.
203          *
204          * @return a textual representation
205          */
206         public String toString() {
207             return new ToStringBuilder(this).
208                 appendSuper(super.toString()).
209                 append("task name", taskName).
210                 append("should terminate", shouldTerminate).
211                 toString();
212         }
213 
214     }
215 
216 
217     /***
218      * The base name of worker threads used by the default instance.
219      */
220     public static final String DEFAULT_NAME = "worker";
221 
222     /***
223      * The default instance used by the static methods. This instance should be
224      * sufficient for most purposes, so usually there should be no reason to
225      * create any other instances.
226      */
227     private static TaskRunner defaultRunner;
228 
229     /***
230      * The default executor used for tasks that can be queued (executed later).
231      */
232 
233     /***
234      * The number of callers that have registered their interest in the
235      * {@link #defaultRunner}. The default runner is alive while this number
236      * is greater than 0.
237      */
238     private static int interestCount = 0;
239 
240     /***
241      * Deregisters interest to use the default runner. You <strong>must</strong>
242      * have called {@link #registerInterest()} prior to calling this method.
243      * You should calls this method in a <code>finally</code> block to ensure
244      * it is always executed.
245      */
246     public static synchronized void deregisterInterest() {
247         if (interestCount > 0) {
248             interestCount--;
249 
250             if (interestCount == 0) {
251                 if (defaultRunner != null) {
252                     // no more interest: destroy the runner
253                     defaultRunner.close(0);
254                     defaultRunner = null;
255                     Util.LOG.debug("Terminated default task runner");
256                 }
257             }
258         } else {
259             Util.LOG.error("TaskRunner.deregisterInterest called but interest "
260                 + "count is already " + interestCount);
261         }
262     }
263 
264     /***
265      * Invokes a task to be executed asynchronously, using the default task
266      * runner. This method returns immediately; the given task is executed in
267      * another thread. The task is executed with the default priority
268      * ({@link Thread#NORM_PRIORITY}).
269      *
270      * @param task the Runnable to be executed asynchronously
271      * @param taskName a name or short description of the new task --
272      * used for debugging purposes
273      */
274     public static void invokeDefault(final Runnable task,
275             final String taskName) {
276         invokeDefault(task, taskName, Thread.NORM_PRIORITY);
277     }
278 
279     /***
280      * Invokes a task to be executed asynchronously using the default task
281      * runner. This method returns immediately; the given task is executed in
282      * another thread.
283      *
284      * @param task the Runnable to be executed asynchronously
285      * @param taskName a name or short description of the new task --
286      * used for debugging purposes
287      * @param priority the priority to use for the thread executing the task;
288      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
289      * {@link java.lang.Thread#MAX_PRIORITY}
290      * @throws IllegalArgumentException if the priority is not in the range
291      * {@link java.lang.Thread#MIN_PRIORITY} to
292      * {@link java.lang.Thread#MAX_PRIORITY}
293      */
294     public static synchronized void invokeDefault(final Runnable task,
295             final String taskName, final int priority)
296             throws IllegalArgumentException {
297         registerInterest();
298 
299         // initialize runner if not yet done
300         if (defaultRunner == null) {
301             defaultRunner = new TaskRunner(DEFAULT_NAME);
302             Util.LOG.debug("Initialized default task runner");
303         }
304 
305         try {
306             defaultRunner.invoke(task, taskName, priority);
307         } finally {
308             deregisterInterest();
309         }
310     }
311 
312     /***
313      * Registers interest to use the default runner. You <strong>must</strong>
314      * {@link #deregisterInterest()} some time after calling this method,
315      * otherwise your program will not terminate regularly.
316      */
317     public static synchronized void registerInterest() {
318         interestCount++;
319     }
320 
321 
322     /***
323      * The base name of worker threads. The actual names of worker threads
324      * are formed by appending {@link #nextFreeNumber} to this name.
325      */
326     private final String baseName;
327 
328     /***
329      * Whether this task runner has been closed.
330      */
331     private boolean closed = false;
332 
333     /***
334      * The default priority to use for threads.
335      */
336     private final int defaultPriority;
337 
338     /***
339      * The next available number to use when naming worker threads. Should
340      * be synchronized on {@link #threadPool}.
341      */
342     private int nextFreeNumber = 1;
343 
344     /***
345      * The pool of worker thread used to execute the requested tasks.
346      */
347     private final ObjectPool threadPool;
348 
349 
350     /***
351      * Creates a new instance, using the default priority
352      * ({@link java.lang.Thread#NORM_PRIORITY}) for threads.
353      *
354      * @param baseThreadName the base name of worker threads -- the actual
355      * names are formed by appending the next available number
356      */
357     public TaskRunner(final String baseThreadName) {
358         this(baseThreadName, Thread.NORM_PRIORITY);
359     }
360 
361     /***
362      * Creates a new instance.
363      *
364      * @param baseThreadName the base name of worker threads -- the actual
365      * names are formed by appending the next available number
366      * @param defaultPrio the priority to use for threads;
367      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
368      * {@link java.lang.Thread#MAX_PRIORITY}
369      * @throws IllegalArgumentException if the priority is not in the range
370      * {@link java.lang.Thread#MIN_PRIORITY} to
371      * {@link java.lang.Thread#MAX_PRIORITY}
372      */
373     public TaskRunner(final String baseThreadName, final int defaultPrio)
374             throws IllegalArgumentException {
375         super();
376         checkPriority(defaultPrio);
377         baseName = baseThreadName;
378         defaultPriority = defaultPrio;
379 
380         // define a factory that returns WorkerThreads and tells them to stop
381         // when they are dropped from the pool
382         threadPool = new StackObjectPool(new BasePoolableObjectFactory() {
383             public final void destroyObject(final Object object) {
384                 //Log.TIES.debug("Got request to destroy worker thread object");
385                 // must be a worker thread -- tell it to stop
386                 ((WorkerThread) object).shouldTerminate();
387                 //Log.TIES.debug("Ordered worker thread " + object.toString()
388                 //    + " to terminate");
389             }
390 
391             public final Object makeObject() {
392                 final String threadName;
393                 synchronized (threadPool) {
394                     // determine thread name and increase then next free number
395                     threadName = baseName + nextFreeNumber++;
396                 }
397                 //Log.TIES.debug("Creating worker thread " + threadName);
398                 final WorkerThread worker = new WorkerThread(threadName);
399                 if (worker.getPriority() != defaultPriority) {
400                     worker.setPriority(defaultPriority);
401                 }
402                 worker.start();
403                 //Log.TIES.debug("Started worker thread " + threadName);
404                 return worker;
405             }
406         });
407     }
408 
409     /***
410      * Helper methods that checks whether a priority is in the valid range
411      * and throws an exception otherwise.
412      *
413      * @param priority the priority to check
414      * @throws IllegalArgumentException if the priority is outside the valid
415      * range
416      */
417     private void checkPriority(final int priority)
418             throws IllegalArgumentException {
419         if ((priority < Thread.MIN_PRIORITY)
420                 || (priority > Thread.MAX_PRIORITY)) {
421             throw new IllegalArgumentException("Priority " + priority
422                 + "is outside the valid range " + Thread.MIN_PRIORITY
423                 + " to " + Thread.MAX_PRIORITY);
424         }
425     }
426 
427     /***
428      * Closes this task runner. All thread will be terminated after executing
429      * their current tasks, so no tasks will be lost. You must call this
430      * method to release each task runner you created, or your program might
431      * run forever (because the worker threads continue waiting for tasks
432      * even after all other threads have terminated). You must not call
433      * {@link #invoke(Runnable, String)} after calling this method.
434      *
435      * @param errorCount the number of errors (exceptions) that occurred during
436      * calls to this instance (0 if none); ignored by this method
437      */
438     public final void close(final int errorCount) {
439         if (!closed) { // otherwise there is nothing to do
440             closed = true;
441             try {
442                 // delegate to thread pool
443                 threadPool.clear();
444                 threadPool.close();
445             } catch (RuntimeException re) {
446                 // rethrow "as is"
447                 throw re;
448             } catch (Exception e) {
449                 // normally everything should work fine so we just wrap the
450                 // exception in a RuntimeException
451                 throw new RuntimeException(
452                     "Error while closing task runner", e);
453             }
454         }
455     }
456 
457     /***
458      * Called by the garbage collector on an object when garbage collection
459      * determines that there are no more references to the object. Delegates
460      * to {@link #close(int)} to terminate all worker threads if not yet done.
461      * <b>You should not rely on this method but always call {@link #close(int)}
462      * yourself.</b>
463      */
464     protected final void finalize() {
465         close(0);
466     }
467 
468     /***
469      * Returns the base name of worker threads. The actual names of worker
470      * threads are formed by appending the next available number to this name
471      * (baseName + 1, baseName + 2 etc.)
472      *
473      * @return the base name
474      */
475     public final String getBaseName() {
476         return baseName;
477     }
478 
479     /***
480      * Returns the default priority to use for threads.
481      *
482      * @return the priority
483      */
484     public final int getDefaultPriority() {
485         return defaultPriority;
486     }
487 
488     /***
489      * Invokes a task to be executed asynchronously. This method returns
490      * immediately; the given task is executed in another thread. The task
491      * is executed with the default priority of this task runner
492      * ({@link #getDefaultPriority()}).
493      *
494      * @param task the Runnable to be executed asynchronously
495      * @param taskName a name or short description of the new task --
496      * used for debugging purposes
497      */
498     public final void invoke(final Runnable task, final String taskName) {
499         invoke(task, taskName, getDefaultPriority());
500     }
501 
502     /***
503      * Invokes a task to be executed asynchronously. This method returns
504      * immediately; the given task is executed in another thread.
505      *
506      * @param task the Runnable to be executed asynchronously
507      * @param taskName a name or short description of the new task --
508      * used for debugging purposes
509      * @param priority the priority to use for the thread executing the task;
510      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
511      * {@link java.lang.Thread#MAX_PRIORITY}
512      * @throws IllegalArgumentException if the priority is not in the range
513      * {@link java.lang.Thread#MIN_PRIORITY} to
514      * {@link java.lang.Thread#MAX_PRIORITY}
515      */
516     public final void invoke(final Runnable task, final String taskName,
517             final int priority) throws IllegalArgumentException {
518         checkPriority(priority);
519         final WorkerThread worker;
520         try {
521             worker = (WorkerThread) threadPool.borrowObject();
522         } catch (RuntimeException re) {
523             // rethrow "as is"
524             throw re;
525         } catch (Exception e) {
526             // normally everything should work fine so we just wrap the
527             // exception in a RuntimeException
528             throw new RuntimeException(
529                 "Could not borrow worker thread from pool", e);
530         }
531 
532         // adjust priority
533         if (worker.getPriority() != priority) {
534             worker.setPriority(priority);
535         }
536 
537         // call worker thread to execute the task
538         worker.setTask(task, taskName);
539     }
540 
541     /***
542      * Returns whether this task runner has been closed.
543      *
544      * @return <code>true</code> iff {@link #close(int)} has been called on this
545      * object
546      */
547     public final boolean isClosed() {
548         return closed;
549     }
550 
551     /***
552      * Returns a string representation of this object.
553      *
554      * @return a textual representation
555      */
556     public String toString() {
557         return new ToStringBuilder(this).
558             append("default priority", defaultPriority).
559             append("is closed", closed).
560             append("base name of worker thread", baseName).
561             append("next free thread number", nextFreeNumber).
562             append("thread pool", threadPool).
563             toString();
564     }
565 
566 }