ExecutorTask.java

/*
 * ExecutorTask
 *
 * $Id$
 * $URL$
 */
package gov.usgs.util;

import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A wrapper for Runnable or Callable objects for use with an ExecutorService.
 *
 * Can be used to schedule interrupt based timeouts, multiple attempts, and
 * Future style exception tracking for Runnable or Callable objects.
 *
 * @param <T> return type for callable.
 */
public class ExecutorTask<T> implements Future<T>, Runnable {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(ExecutorTask.class.getName());

  /** Default number of milliseconds to wait before a retry. */
  public static final long DEFAULT_RETRY_DELAY = 0L;

  /** Default number of tries to run this task. */
  public static final int DEFAULT_NUM_TRIES = 1;

  /** Default timeout for this task. */
  public static final long DEFAULT_TIMEOUT = 0L;

  /** ExecutorService used to execute this task. */
  protected ExecutorService service;

  /** The callable to be called. */
  protected Callable<T> callable;

  /** Timeout for task. */
  protected long timeout = DEFAULT_TIMEOUT;

  /** Number of tries to execute this task. */
  protected int maxTries = DEFAULT_NUM_TRIES;

  /** Number of milliseconds to wait before trying again. */
  protected long retryDelay = DEFAULT_RETRY_DELAY;

  /** Timer used to schedule retries, when they have a non-zero delay. */
  protected Timer retryTimer;

  /** The future from the executor service. */
  protected T result;

  /** List of exceptions thrown, up to maxTries in length. */
  ArrayList<Exception> exceptions;

  /** Whether this task is complete. */
  protected Boolean done = false;

  /** Whether this task has been canceled. */
  protected Boolean cancelled = false;

  /** Number of tries used. */
  protected int numTries = 0;

  /** The thread where this is running, used to interrupt. */
  protected Thread runThread = null;

  /** Name for this task. */
  protected String name = null;

  /** A synchronized object */
  protected final Object syncObject = new Object();

  /**
   * Construct a new ExecutorTask
   *
   * @param service  ExecutorService that this task will be submitted to.
   * @param maxTries maximum number of tries callable can throw an exception or
   *                 timeout before giving up. &lt; 1 means never run.
   * @param timeout  number of milliseconds to allow callable to run before it is
   *                 interrupted. &lt;= 0 means never timeout.
   * @param callable the callable to call. To work well, the callable should
   *                 handle interrupts gracefully.
   * @see InterruptedException
   */
  public ExecutorTask(ExecutorService service, int maxTries, long timeout, Callable<T> callable) {
    this(service, maxTries, timeout, callable, null, DEFAULT_RETRY_DELAY);
  }

  /**
   * Wraps a runnable and result using the CallableRunnable class.
   *
   * @param service  ExecutorService that this task will be submitted to.
   * @param maxTries maximum number of tries callable can throw an exception or
   *                 timeout before giving up. &lt; 1 means never run.
   * @param timeout  number of milliseconds to allow callable to run before it is
   *                 interrupted. &lt;= 0 means never timeout.
   * @param runnable a runnable
   * @param result   the result
   *
   * @see java.util.concurrent.Executors#callable(Runnable, Object)
   */
  public ExecutorTask(ExecutorService service, int maxTries, long timeout, Runnable runnable, T result) {
    this(service, maxTries, timeout, Executors.callable(runnable, result));
  }

  /**
   * Construct a new ExecutorTask
   *
   * @param service    ExecutorService that this task will be submitted to.
   * @param maxTries   maximum number of tries callable can throw an exception or
   *                   timeout before giving up. &lt; 1 means never run.
   * @param timeout    number of milliseconds to allow callable to run before it
   *                   is interrupted. &lt;= 0 means never timeout.
   * @param callable   the callable to call. To work well, the callable should
   *                   handle interrupts gracefully.
   * @param retryTimer a timer used to schedule retries when retryDelay is
   *                   non-zero.
   * @param retryDelay the number of milliseconds to wait before retrying after an
   *                   exception.
   * @see InterruptedException
   */
  public ExecutorTask(ExecutorService service, int maxTries, long timeout, Callable<T> callable, Timer retryTimer,
      long retryDelay) {
    this.service = service;
    this.maxTries = maxTries;
    this.timeout = timeout;
    this.callable = callable;
    this.exceptions = new ArrayList<Exception>(maxTries);
    this.retryTimer = retryTimer;
    this.retryDelay = retryDelay;
  }

  /**
   * Run calls the callable, scheduling timeout interruption, catching exceptions,
   * and potentially resubmitting to the executor service.
   */
  @Override
  public void run() {
    // used to schedule timeout
    Timer timeoutTimer = new Timer();

    try {
      // synchronized (this) {
      if (done || cancelled || numTries >= maxTries) {
        // already done, cancelled, or out of attempts
        return;
      }

      // otherwise,
      ++numTries;
      // signal that we are running
      runThread = Thread.currentThread();
      if (timeout > 0) {
        // schedule interrupt
        final Thread currentThread = runThread;
        timeoutTimer.schedule(new TimerTask() {
          @Override
          public void run() {
            LOGGER.fine("Interrupting executor thread");
            currentThread.interrupt();
          }
        }, timeout);
      }
      // }

      // compute result, outside synchronized
      result = callable.call();

      // synchronized (this) {
      // signal that we are done running
      runThread = null;

      // computed without exceptions, done
      setDone();
      // }
    } catch (Exception e) {
      LOGGER.log(Level.INFO, "Exception executing task", e);
      // synchronized (this) {
      // signal that we are not running
      runThread = null;

      // track this exception
      exceptions.add(e);

      // try to resubmit
      if (!cancelled && numTries < maxTries) {
        LOGGER.info("Resubmitting task to executor " + numTries + "/" + maxTries + " attempts");
        SubmitTaskToExecutor retryTask = new SubmitTaskToExecutor(this);
        if (retryDelay <= 0L || retryTimer == null) {
          retryTask.run();
        } else {
          retryTimer.schedule(retryTask, retryDelay);
        }
      } else {
        // cancelled or out of tries, done
        setDone();
      }
      // }
    } finally {
      // cancel timeout based interrupt
      timeoutTimer.cancel();
    }
  }

  /**
   * Called when task is completed, either successfully, or unsuccessfully and has
   * no more tries
   */
  protected void setDone() {
    // done running, either successfully or because out of tries
    done = true;
    // notify anyone waiting for task to complete
    synchronized (syncObject) {
      syncObject.notifyAll();
    }
  }

  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (cancelled || done) {
      // already canceled or complete
      return cancelled;
    }

    cancelled = true;
    if (runThread != null && mayInterruptIfRunning) {
      // running, try to interrupt
      runThread.interrupt();
    }

    // thread may still be running, if it doesn't handle interrupts well,
    // but Future interface says we are done
    setDone();

    return cancelled;
  }

  @Override
  public boolean isCancelled() {
    return cancelled;
  }

  @Override
  public boolean isDone() {
    return done;
  }

  /**
   * Get the result returned by the callable.
   */
  @Override
  public T get() throws InterruptedException, ExecutionException {
    while (!cancelled && !done && numTries < maxTries) {
      synchronized (syncObject) {
        syncObject.wait();
      }
    }

    if (numTries == maxTries && exceptions.size() == maxTries) {
      // can't execute any more, signal using most recent exception
      throw new ExecutionException(exceptions.get(maxTries - 1));
    }

    return result;
  }

  /**
   * Get the result returned by the callable.
   */
  @Override
  public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (!cancelled && !done && numTries < maxTries) {
      synchronized (syncObject) {
        unit.timedWait(syncObject, timeout);
      }
    }

    if (!cancelled && !done) {
      // must have timed out
      throw new TimeoutException();
    }

    if (numTries == maxTries && exceptions.size() == maxTries) {
      // can't execute any more, signal using most recent exception
      throw new ExecutionException(exceptions.get(maxTries - 1));
    }

    return result;
  }

  /**
   * Number of tries used.
   *
   * @return actual number of attempts.
   */
  public int getNumTries() {
    return numTries;
  }

  /**
   * Maximum number of tries before giving up.
   *
   * @return maximum number of attempts.
   */
  public int getMaxTries() {
    return maxTries;
  }

  /**
   * Any exceptions thrown, during any execution attempt.
   *
   * @return array of thrown exceptions. should contain no more than numTries
   *         exceptions.
   */
  public ArrayList<Exception> getExceptions() {
    return exceptions;
  }

  /**
   * The callable object that is/was called.
   *
   * @return The callable object for this task. If this task was created using a
   *         runnable, this was created using Executors.callable(Runnable).
   */
  public Callable<T> getCallable() {
    return callable;
  }

  /** @return name */
  public String getName() {
    return this.name;
  }

  /** @param name to set */
  public void setName(final String name) {
    this.name = name;
  }

  /**
   * @return the retryDelay
   */
  public long getRetryDelay() {
    return retryDelay;
  }

  /**
   * @param retryDelay the retryDelay to set
   */
  public void setRetryDelay(long retryDelay) {
    this.retryDelay = retryDelay;
  }

  /**
   * @return the retryTimer
   */
  public Timer getRetryTimer() {
    return retryTimer;
  }

  /**
   * @param retryTimer the retryTimer to set
   */
  public void setRetryTimer(Timer retryTimer) {
    this.retryTimer = retryTimer;
  }

  /**
   * Submit an ExecutorTask to an ExecutorService.
   *
   * Used to defer resubmission of a task after it fails, but scheduling its
   * resubmission using a timer.
   */
  private class SubmitTaskToExecutor extends TimerTask {

    /** The task to resubmit. */
    private ExecutorTask<T> task;

    /**
     * Construct a new SubmitTaskToExecutor instance.
     *
     * @param task the task to resubmit.
     */
    public SubmitTaskToExecutor(final ExecutorTask<T> task) {
      this.task = task;
    }

    /**
     * Submits the task to the executor.
     */
    public void run() {
      service.submit(task);
    }

  }

}