RoundRobinBlockingQueue.java

package gov.usgs.earthquake.util;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Round Robin Blocking Queue.
 *
 * {@link #put(Object)} and {@link #take()} are recommended, as other methods
 * internally call these methods.
 *
 * @param <T> queue item type.
 */
public class RoundRobinBlockingQueue<T> extends RoundRobinQueue<T> implements BlockingQueue<T> {

  private final ReentrantLock changeLock;
  private final Condition notEmptyCondition;

  /** Constructor */
  public RoundRobinBlockingQueue() {
    changeLock = new ReentrantLock();
    notEmptyCondition = changeLock.newCondition();
  }

  /**
   * Add an item to the queue.
   */
  @Override
  public boolean add(T e) {
    try {
      changeLock.lockInterruptibly();
    } catch (InterruptedException ie) {
      throw new RuntimeException(ie);
    }
    try {
      super.add(e);
      notEmptyCondition.signal();
    } finally {
      changeLock.unlock();
    }
    return true;
  }

  /**
   * Check if queue contains an item.
   */
  @Override
  public boolean contains(Object e) {
    try {
      changeLock.lockInterruptibly();
    } catch (InterruptedException ie) {
      throw new RuntimeException(ie);
    }
    try {
      return super.contains(e);
    } finally {
      changeLock.unlock();
    }
  }

  /**
   * Offer an item to the queue.
   *
   * Calls {@link #add(Object)}, but returns false if any exceptions thrown.
   */
  @Override
  public boolean offer(T e) {
    try {
      return add(e);
    } catch (Exception ex) {
      return false;
    }
  }

  /**
   * Offer an item to the queue.
   *
   * Same as {@link #offer(Object)}, this is an unbounded queue.
   */
  @Override
  public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
    changeLock.tryLock(timeout, unit);
    try {
      super.add(e);
      notEmptyCondition.signal();
    } finally {
      changeLock.unlock();
    }
    return true;
  }

  /**
   * Retrieves and removes the head of this queue, waiting up to the specified
   * wait time if necessary for an element to become available.
   */
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    changeLock.lockInterruptibly();
    try {
      if (isEmpty()) {
        notEmptyCondition.await(timeout, unit);
      }
      try {
        return remove();
      } catch (Exception e) {
        return null;
      }
    } finally {
      changeLock.unlock();
    }
  }

  /**
   * Put an item in the queue.
   *
   * @throws InterruptedException if interrupted while acquiring lock.
   */
  @Override
  public void put(T e) throws InterruptedException {
    changeLock.lockInterruptibly();
    try {
      super.add(e);
      notEmptyCondition.signal();
    } catch (RuntimeException re) {
      // may be thrown by add if interrupted
      if (re.getCause() instanceof InterruptedException) {
        throw (InterruptedException) re.getCause();
      }
    } finally {
      changeLock.unlock();
    }
  }

  /**
   * Unbounded queues return Integer.MAX_VALUE.
   *
   * @return Integer.MAX_VALUE;
   */
  @Override
  public int remainingCapacity() {
    return Integer.MAX_VALUE;
  }

  /**
   * Remove an object from the queue.
   */
  @Override
  public boolean remove(Object e) {
    try {
      changeLock.lockInterruptibly();
    } catch (InterruptedException ie) {
      throw new RuntimeException(ie);
    }
    try {
      return super.remove(e);
    } finally {
      changeLock.unlock();
    }
  }

  /**
   * Remove an object from the queue.
   */
  @Override
  public T take() throws InterruptedException {
    changeLock.lockInterruptibly();
    try {
      while (isEmpty()) {
        notEmptyCondition.await();
      }
      return super.remove();
    } finally {
      changeLock.unlock();
    }
  }

  /**
   * Empty queue into a collection.
   */
  @Override
  public int drainTo(Collection<? super T> c) {
    return drainTo(c, -1);
  }

  /**
   * Empty queue into a collection, stopping after max elements.
   */
  @Override
  public int drainTo(Collection<? super T> c, int max) {
    try {
      changeLock.lockInterruptibly();
    } catch (InterruptedException e) {
      // none drained
      return 0;
    }
    try {
      int count = 0;
      while (!isEmpty() && (max < 0 || count < max)) {
        c.add(remove());
        count++;
      }
      return count;
    } finally {
      changeLock.unlock();
    }
  }

}