View Javadoc

1   /*
2    * Copyright (C) 2003-2006 Christian Siefkes <christian@siefkes.net>.
3    * Development of this software is supported by the German Research Society,
4    * Berlin-Brandenburg Graduate School in Distributed Information Systems
5    * (DFG grant no. GRK 316).
6    *
7    * This program is free software; you can redistribute it and/or modify
8    * it under the terms of the GNU General Public License as published by
9    * the Free Software Foundation; either version 2 of the License, or
10   * (at your option) any later version.
11   *
12   * This program is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   * GNU General Public License for more details.
16   *
17   * You should have received a copy of the GNU General Public License
18   * along with this program; if not, visit
19   * http://www.gnu.org/licenses/gpl.html or write to the Free Software
20   * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
21   */
22  package de.fu_berlin.ties.util;
23  
24  import org.apache.commons.lang.builder.ToStringBuilder;
25  import org.apache.commons.pool.BasePoolableObjectFactory;
26  import org.apache.commons.pool.ObjectPool;
27  import org.apache.commons.pool.impl.StackObjectPool;
28  
29  import de.fu_berlin.ties.Closeable;
30  
31  /***
32   * Asynchronously executes any number of {@link java.lang.Runnable} tasks.
33   * Internally manages a pool of worker threads that are invoked to
34   * asychronously execute tasks as requested. The number of worker threads is
35   * updated on demand so every task is executed without delay.
36   *
37   * <p>Usually it should be sufficient to invoke tasks through the static
38   * {@link #invokeDefault(Runnable, String) invokeDefault} methods; in most
39   * cases there should be no reason to create instances of this class.
40   * <em>To avoid the creation of unnecessary instances, it is highly recommended
41   * to register your interest ({@link #registerInterest()}) in the default task
42   * runner prior to using it and to deregister
43   * ({@link #deregisterInterest()}) when you no longer need it.
44   * A good idea is to do this at the begin and end of your main method.
45   * You should register in a <code>finally</code> block and you <strong>must not
46   * forget</strong> to deregister, otherwise your program might run forever
47   * (because the worker threads continue waiting for tasks even after all other
48   * threads have terminated).</em>
49   *
50   * <p>When creating your own runner, you <strong>must</strong> finally call
51   * {@link #close(int)} to release each task runner you have created, or your
52   * program might run forever.
53   *
54   * @author Christian Siefkes
55   * @version $Revision: 1.15 $, $Date: 2006/10/21 16:04:27 $, $Author: siefkes $
56   */
57  public class TaskRunner implements Closeable {
58  
59      /***
60       * A helper threads that executes any tasks as requested and then sleeps
61       * until a new task is required or it is told to terminate.
62       */
63      private final class WorkerThread extends Thread {
64  
65          /***
66           * Indicates whether this thread should terminate (after executing any
67           * specified task).
68           */
69          private boolean shouldTerminate = false;
70  
71          /***
72           * The task to be execute by this thread.
73           */
74          private Runnable task = null;
75  
76          /***
77           * A name or short description of the current task -- used for debugging
78           * purposes.
79           */
80          private String taskName = null;
81  
82          /***
83           * Used to guard synchrization of this thread.
84           */
85          private final Object threadGuard = new Object();
86  
87          /***
88           * Creates a new instance.
89           *
90           * @param name the name of the new thread
91           */
92          private WorkerThread(final String name) {
93              super(name);
94          }
95  
96          /***
97           * The run method of this thread: executing the task provided by
98           * calling {@link #setTask(Runnable, String)} and then puts itself back
99           * into the pool and waits until a new task arrives or it
100          * {@link #shouldTerminate()}. Any throwables thrown by the executed
101          * {@link java.lang.Runnable} are catched and logged so this thread
102          * cannot be crashed by careless Runnables.
103          */
104         public void run() {
105             // local copies that not require synchronization
106             boolean doTerminate = false;
107             Runnable currentTask;
108             String currentTaskName;
109 
110             while (!doTerminate) {
111                 // get task, if any
112                 synchronized (threadGuard) {
113                     if ((task == null) && (!shouldTerminate) && (!closed)) {
114                         // nothing to do--wait until notified of status change
115                         try {
116                             threadGuard.wait();
117                         } catch (InterruptedException ie) {
118                             Util.LOG.warn(
119                                 "Interrupted while waiting for status change"
120                                 + "  -- this was not supposed to happen");
121                         }
122                     }
123 
124                     // Terminate when requested or when the outer class has
125                     // been closed.
126                     doTerminate = shouldTerminate || closed;
127                     currentTask = task;
128                     currentTaskName = taskName;
129 
130                     // set member fields to null so a new task can arrive
131                     task = null;
132                     taskName = null;
133                 }
134 
135                 // execute task, if any
136                 if (currentTask != null) {
137                     try {
138                         currentTask.run();
139                     } catch (Throwable t) {
140                         Util.LOG.error("Error while executing "
141                             + currentTaskName,
142                             t);
143                     }
144 
145                     // now put myself back into the thread pool (if still open)
146                     if (!closed) {
147                         try {
148                             threadPool.returnObject(this);
149                         } catch (Exception e) {
150                             doTerminate = true;
151                             Util.LOG.error("Could not return myself into the "
152                                 + "pool -- will now terminate myself",
153                                 e);
154                         }
155                     }
156                 }
157             }
158         }
159 
160         /***
161          * Sets the task to be executed by this thread and notifies the
162          * thread that new work has arrived.
163          *
164          * @param newTask the task to be executed by this thread
165          * @param newTaskName A name or short description of the new task --
166          * used for debugging purposes
167          * @throws IllegalStateException if there already is another task set
168          * (a thread can only execute a single given task at any time)
169          */
170         private void setTask(final Runnable newTask,
171             final String newTaskName) throws IllegalStateException {
172             synchronized (threadGuard) {
173                 if (task != null) {
174                     throw new IllegalStateException("Task of " + this.toString()
175                         + "already set to " + taskName
176                         + " -- must be null when setting a new task");
177                 }
178                 task = newTask;
179                 taskName = newTaskName;
180 
181                 // wake the thread if it's waiting
182                 threadGuard.notify();
183             }
184         }
185 
186         /***
187          * Tells this thread to terminate and notifies the thread of the status
188          * change. The thread will only terminate after executing the given
189          * task, if any is left.
190          */
191         private void shouldTerminate() {
192             synchronized (threadGuard) {
193                 shouldTerminate = true;
194 
195                 // wake the thread if it's waiting
196                 threadGuard.notify();
197             }
198         }
199 
200         /***
201          * Returns a string representation of this thread.
202          *
203          * @return a textual representation
204          */
205         public String toString() {
206             return new ToStringBuilder(this).
207                 appendSuper(super.toString()).
208                 append("task name", taskName).
209                 append("should terminate", shouldTerminate).
210                 toString();
211         }
212 
213     }
214 
215 
216     /***
217      * The base name of worker threads used by the default instance.
218      */
219     public static final String DEFAULT_NAME = "worker";
220 
221     /***
222      * The default instance used by the static methods. This instance should be
223      * sufficient for most purposes, so usually there should be no reason to
224      * create any other instances.
225      */
226     private static TaskRunner defaultRunner;
227 
228     /***
229      * The default executor used for tasks that can be queued (executed later).
230      */
231 
232     /***
233      * The number of callers that have registered their interest in the
234      * {@link #defaultRunner}. The default runner is alive while this number
235      * is greater than 0.
236      */
237     private static int interestCount = 0;
238 
239     /***
240      * Deregisters interest to use the default runner. You <strong>must</strong>
241      * have called {@link #registerInterest()} prior to calling this method.
242      * You should calls this method in a <code>finally</code> block to ensure
243      * it is always executed.
244      */
245     public static synchronized void deregisterInterest() {
246         if (interestCount > 0) {
247             interestCount--;
248 
249             if (interestCount == 0) {
250                 if (defaultRunner != null) {
251                     // no more interest: destroy the runner
252                     defaultRunner.close(0);
253                     defaultRunner = null;
254                     Util.LOG.debug("Terminated default task runner");
255                 }
256             }
257         } else {
258             Util.LOG.error("TaskRunner.deregisterInterest called but interest "
259                 + "count is already " + interestCount);
260         }
261     }
262 
263     /***
264      * Invokes a task to be executed asynchronously, using the default task
265      * runner. This method returns immediately; the given task is executed in
266      * another thread. The task is executed with the default priority
267      * ({@link Thread#NORM_PRIORITY}).
268      *
269      * @param task the Runnable to be executed asynchronously
270      * @param taskName a name or short description of the new task --
271      * used for debugging purposes
272      */
273     public static void invokeDefault(final Runnable task,
274             final String taskName) {
275         invokeDefault(task, taskName, Thread.NORM_PRIORITY);
276     }
277 
278     /***
279      * Invokes a task to be executed asynchronously using the default task
280      * runner. This method returns immediately; the given task is executed in
281      * another thread.
282      *
283      * @param task the Runnable to be executed asynchronously
284      * @param taskName a name or short description of the new task --
285      * used for debugging purposes
286      * @param priority the priority to use for the thread executing the task;
287      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
288      * {@link java.lang.Thread#MAX_PRIORITY}
289      * @throws IllegalArgumentException if the priority is not in the range
290      * {@link java.lang.Thread#MIN_PRIORITY} to
291      * {@link java.lang.Thread#MAX_PRIORITY}
292      */
293     public static synchronized void invokeDefault(final Runnable task,
294             final String taskName, final int priority)
295             throws IllegalArgumentException {
296         registerInterest();
297 
298         // initialize runner if not yet done
299         if (defaultRunner == null) {
300             defaultRunner = new TaskRunner(DEFAULT_NAME);
301             Util.LOG.debug("Initialized default task runner");
302         }
303 
304         try {
305             defaultRunner.invoke(task, taskName, priority);
306         } finally {
307             deregisterInterest();
308         }
309     }
310 
311     /***
312      * Registers interest to use the default runner. You <strong>must</strong>
313      * {@link #deregisterInterest()} some time after calling this method,
314      * otherwise your program will not terminate regularly.
315      */
316     public static synchronized void registerInterest() {
317         interestCount++;
318     }
319 
320 
321     /***
322      * The base name of worker threads. The actual names of worker threads
323      * are formed by appending {@link #nextFreeNumber} to this name.
324      */
325     private final String baseName;
326 
327     /***
328      * Whether this task runner has been closed.
329      */
330     private boolean closed = false;
331 
332     /***
333      * The default priority to use for threads.
334      */
335     private final int defaultPriority;
336 
337     /***
338      * The next available number to use when naming worker threads. Should
339      * be synchronized on {@link #threadPool}.
340      */
341     private int nextFreeNumber = 1;
342 
343     /***
344      * The pool of worker thread used to execute the requested tasks.
345      */
346     private final ObjectPool threadPool;
347 
348 
349     /***
350      * Creates a new instance, using the default priority
351      * ({@link java.lang.Thread#NORM_PRIORITY}) for threads.
352      *
353      * @param baseThreadName the base name of worker threads -- the actual
354      * names are formed by appending the next available number
355      */
356     public TaskRunner(final String baseThreadName) {
357         this(baseThreadName, Thread.NORM_PRIORITY);
358     }
359 
360     /***
361      * Creates a new instance.
362      *
363      * @param baseThreadName the base name of worker threads -- the actual
364      * names are formed by appending the next available number
365      * @param defaultPrio the priority to use for threads;
366      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
367      * {@link java.lang.Thread#MAX_PRIORITY}
368      * @throws IllegalArgumentException if the priority is not in the range
369      * {@link java.lang.Thread#MIN_PRIORITY} to
370      * {@link java.lang.Thread#MAX_PRIORITY}
371      */
372     public TaskRunner(final String baseThreadName, final int defaultPrio)
373             throws IllegalArgumentException {
374         super();
375         checkPriority(defaultPrio);
376         baseName = baseThreadName;
377         defaultPriority = defaultPrio;
378 
379         // define a factory that returns WorkerThreads and tells them to stop
380         // when they are dropped from the pool
381         threadPool = new StackObjectPool(new BasePoolableObjectFactory() {
382             public final void destroyObject(final Object object) {
383                 //Log.TIES.debug("Got request to destroy worker thread object");
384                 // must be a worker thread -- tell it to stop
385                 ((WorkerThread) object).shouldTerminate();
386                 //Log.TIES.debug("Ordered worker thread " + object.toString()
387                 //    + " to terminate");
388             }
389 
390             public final Object makeObject() {
391                 final String threadName;
392                 synchronized (threadPool) {
393                     // determine thread name and increase then next free number
394                     threadName = baseName + nextFreeNumber++;
395                 }
396                 //Log.TIES.debug("Creating worker thread " + threadName);
397                 final WorkerThread worker = new WorkerThread(threadName);
398                 if (worker.getPriority() != defaultPriority) {
399                     worker.setPriority(defaultPriority);
400                 }
401                 worker.start();
402                 //Log.TIES.debug("Started worker thread " + threadName);
403                 return worker;
404             }
405         });
406     }
407 
408     /***
409      * Helper methods that checks whether a priority is in the valid range
410      * and throws an exception otherwise.
411      *
412      * @param priority the priority to check
413      * @throws IllegalArgumentException if the priority is outside the valid
414      * range
415      */
416     private void checkPriority(final int priority)
417             throws IllegalArgumentException {
418         if ((priority < Thread.MIN_PRIORITY)
419                 || (priority > Thread.MAX_PRIORITY)) {
420             throw new IllegalArgumentException("Priority " + priority
421                 + "is outside the valid range " + Thread.MIN_PRIORITY
422                 + " to " + Thread.MAX_PRIORITY);
423         }
424     }
425 
426     /***
427      * Closes this task runner. All thread will be terminated after executing
428      * their current tasks, so no tasks will be lost. You must call this
429      * method to release each task runner you created, or your program might
430      * run forever (because the worker threads continue waiting for tasks
431      * even after all other threads have terminated). You must not call
432      * {@link #invoke(Runnable, String)} after calling this method.
433      *
434      * @param errorCount the number of errors (exceptions) that occurred during
435      * calls to this instance (0 if none); ignored by this method
436      */
437     public final void close(final int errorCount) {
438         if (!closed) { // otherwise there is nothing to do
439             closed = true;
440             try {
441                 // delegate to thread pool
442                 threadPool.clear();
443                 threadPool.close();
444             } catch (RuntimeException re) {
445                 // rethrow "as is"
446                 throw re;
447             } catch (Exception e) {
448                 // normally everything should work fine so we just wrap the
449                 // exception in a RuntimeException
450                 throw new RuntimeException(
451                     "Error while closing task runner", e);
452             }
453         }
454     }
455 
456     /***
457      * Called by the garbage collector on an object when garbage collection
458      * determines that there are no more references to the object. Delegates
459      * to {@link #close(int)} to terminate all worker threads if not yet done.
460      * <b>You should not rely on this method but always call {@link #close(int)}
461      * yourself.</b>
462      */
463     protected final void finalize() {
464         close(0);
465     }
466 
467     /***
468      * Returns the base name of worker threads. The actual names of worker
469      * threads are formed by appending the next available number to this name
470      * (baseName + 1, baseName + 2 etc.)
471      *
472      * @return the base name
473      */
474     public final String getBaseName() {
475         return baseName;
476     }
477 
478     /***
479      * Returns the default priority to use for threads.
480      *
481      * @return the priority
482      */
483     public final int getDefaultPriority() {
484         return defaultPriority;
485     }
486 
487     /***
488      * Invokes a task to be executed asynchronously. This method returns
489      * immediately; the given task is executed in another thread. The task
490      * is executed with the default priority of this task runner
491      * ({@link #getDefaultPriority()}).
492      *
493      * @param task the Runnable to be executed asynchronously
494      * @param taskName a name or short description of the new task --
495      * used for debugging purposes
496      */
497     public final void invoke(final Runnable task, final String taskName) {
498         invoke(task, taskName, getDefaultPriority());
499     }
500 
501     /***
502      * Invokes a task to be executed asynchronously. This method returns
503      * immediately; the given task is executed in another thread.
504      *
505      * @param task the Runnable to be executed asynchronously
506      * @param taskName a name or short description of the new task --
507      * used for debugging purposes
508      * @param priority the priority to use for the thread executing the task;
509      * should be in the range of {@link java.lang.Thread#MIN_PRIORITY} to
510      * {@link java.lang.Thread#MAX_PRIORITY}
511      * @throws IllegalArgumentException if the priority is not in the range
512      * {@link java.lang.Thread#MIN_PRIORITY} to
513      * {@link java.lang.Thread#MAX_PRIORITY}
514      */
515     public final void invoke(final Runnable task, final String taskName,
516             final int priority) throws IllegalArgumentException {
517         checkPriority(priority);
518         final WorkerThread worker;
519         try {
520             worker = (WorkerThread) threadPool.borrowObject();
521         } catch (RuntimeException re) {
522             // rethrow "as is"
523             throw re;
524         } catch (Exception e) {
525             // normally everything should work fine so we just wrap the
526             // exception in a RuntimeException
527             throw new RuntimeException(
528                 "Could not borrow worker thread from pool", e);
529         }
530 
531         // adjust priority
532         if (worker.getPriority() != priority) {
533             worker.setPriority(priority);
534         }
535 
536         // call worker thread to execute the task
537         worker.setTask(task, taskName);
538     }
539 
540     /***
541      * Returns whether this task runner has been closed.
542      *
543      * @return <code>true</code> iff {@link #close(int)} has been called on this
544      * object
545      */
546     public final boolean isClosed() {
547         return closed;
548     }
549 
550     /***
551      * Returns a string representation of this object.
552      *
553      * @return a textual representation
554      */
555     public String toString() {
556         return new ToStringBuilder(this).
557             append("default priority", defaultPriority).
558             append("is closed", closed).
559             append("base name of worker thread", baseName).
560             append("next free thread number", nextFreeNumber).
561             append("thread pool", threadPool).
562             toString();
563     }
564 
565 }