ExecutorListenerNotifier.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.aws.JsonNotificationIndex;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.ExecutorTask;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;

/** Executes notifcations on separate threads */
public class ExecutorListenerNotifier extends DefaultConfigurable implements ListenerNotifier {

  private static final Logger LOGGER = Logger.getLogger(ExecutorListenerNotifier.class.getName());

  private static ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();

  private DefaultNotificationReceiver receiver;

  /**
   * Notification listeners registered to receive notifications, and an
   * ExecutorService that delivers Notifications to each in a separate thread.
   */
  protected Map<NotificationListener, ExecutorService> notificationListeners = new HashMap<NotificationListener, ExecutorService>();

  /**
   * Make sure listener will accept notification before queueing it for
   * processing.
   */
  protected boolean acceptBeforeQueuing = true;

  /**
   * Timer used to retry tasks when they fail and listeners have configured
   * retryDelay.
   */
  protected Timer retryTimer = new Timer();

  /** When queue size reaches this level, start throttling */
  protected int throttleStartThreshold = 50000;

  /** When queue size reaches this level, stop throttling */
  protected int throttleStopThreshold = 25000;

  /** When throttling, wait this many milliseconds between queue size checks. */
  protected long throttleWaitInterval = 5000L;

  /**
   * Constructor
   *
   * @param receiver DefaultNotificationReceiver
   */
  public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
    this.receiver = receiver;
  }

  /**
   * Add a new notification listener.
   *
   * @param listener the listener to add. When notifications are received, this
   *                 listener will be notified.
   */
  @Override
  public void addNotificationListener(NotificationListener listener) throws Exception {
    if (!notificationListeners.containsKey(listener)) {
      // fixed thread pool allows us to inspect the queue length...
      int concurrentProducts = 1;
      if (listener instanceof DefaultNotificationListener) {
        concurrentProducts = ((DefaultNotificationListener) listener).getConcurrentProducts();
      }
      ExecutorService listenerExecutor = Executors.newFixedThreadPool(concurrentProducts);
      notificationListeners.put(listener, listenerExecutor);
    }
  }

  /**
   * Remove an existing notification listener.
   *
   * Any currently queued notifications are processed before shutting down.
   *
   * @param listener the listener to remove. When notifications are receive, this
   *                 listener will no longer be notified.
   */
  @Override
  public void removeNotificationListener(NotificationListener listener) throws Exception {
    // remove listener from map
    ExecutorService listenerExecutor = notificationListeners.remove(listener);

    // shutdown executor thread
    listenerExecutor.shutdown();

    // Could use shutdownNow() instead?
    // however, shutdown() gives all listeners a chance to
    // process all notifications, but may keep client from shutting down
    // quickly. Also, see DefaultNotificationReceiver.shutdown().
  }

  /**
   * Send a notification to all registered NotificationListeners.
   *
   * Creates a NotificationEvent, with a reference to this object and calls each
   * notificationListeners onNotification method in separate threads.
   *
   * This method usually returns before registered NotificationListeners have
   * completed processing a notification.
   *
   * @param event the notification being sent to listeners.
   * @throws Exception if error occurs
   */
  @Override
  public void notifyListeners(final NotificationEvent event) throws Exception {
    this.notifyListeners(event, this.notificationListeners.keySet());
  }

  /**
   * Calls queueNotification with event and listener for each listener
   *
   * @param event     NotificationEvent
   * @param listeners Collection of NotificationListeners
   * @throws Exception if error occurs
   */
  public void notifyListeners(final NotificationEvent event, final Collection<NotificationListener> listeners)
      throws Exception {

    Iterator<NotificationListener> iter = listeners.iterator();
    while (iter.hasNext()) {
      NotificationListener listener = iter.next();
      // only requeue for default notification listeners
      queueNotification(listener, event);
    }
  }

  /**
   * @param listener NotificationListener
   * @param event    NotificationEvent
   */
  protected void queueNotification(final NotificationListener listener, final NotificationEvent event) {
    if (acceptBeforeQueuing && listener instanceof DefaultNotificationListener) {
      DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener;
      if (!defaultListener.accept(event.getNotification().getProductId())) {
        return;
      }
    }

    // determine retry delay
    long retryDelay = 0L;
    if (listener instanceof AbstractListener) {
      retryDelay = ((AbstractListener) listener).getRetryDelay();
    }

    ExecutorService listenerExecutor = notificationListeners.get(listener);
    ExecutorTask<Void> listenerTask = new ExecutorTask<Void>(listenerExecutor, listener.getMaxTries(),
        listener.getTimeout(), new NotificationListenerCallable(listener, event), retryTimer, retryDelay);
    listenerExecutor.submit(listenerTask);

    // log how many notifications are pending
    if (listenerExecutor instanceof ThreadPoolExecutor) {
      BlockingQueue<Runnable> pending = ((ThreadPoolExecutor) listenerExecutor).getQueue();
      LOGGER.fine("[" + event.getNotificationReceiver().getName() + "] listener (" + listener.getName() + ") has "
          + pending.size() + " queued notifications");
    }
  }

  @Override
  public void shutdown() throws Exception {
    // remove all listeners
    Iterator<NotificationListener> iter = new ArrayList<NotificationListener>(notificationListeners.keySet())
        .iterator();
    while (iter.hasNext()) {
      removeNotificationListener(iter.next());
    }
  }

  @Override
  public void startup() throws Exception {
    super.startup();

    NotificationIndex index = receiver.getNotificationIndex();

    // filter down to listeners who can handle requeueing gracefully
    ArrayList<NotificationListener> gracefulListeners = new ArrayList<NotificationListener>();
    Iterator<NotificationListener> iter = this.notificationListeners.keySet().iterator();
    while (iter.hasNext()) {
      NotificationListener listener = iter.next();
      // make sure each index only notifies each listener once
      String key = listener.getName() + '|' + index.getName();
      if (AUTOLOADED_INDEXES.contains(key)) {
        // already loaded this notification index for this listener
        // another receiver is sharing this notification index
      } else if (listener instanceof DefaultNotificationListener
          && ((DefaultNotificationListener) listener).getNotificationIndex() != null) {
        gracefulListeners.add(listener);
        AUTOLOADED_INDEXES.add(key);
      }
    }

    if (gracefulListeners.size() == 0) {
      // don't bother searching if nobody is listening
      return;
    }

    LOGGER.info("[" + receiver.getName() + "] requeueing notification index '" + index.getName() + "'");
    // find all existing notifications
    List<Notification> allNotifications = null;

    // for json index, push intersection into database if only one listener
    if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) {
      NotificationIndex listenerIndex = ((DefaultNotificationListener) gracefulListeners.get(0)).getNotificationIndex();
      if (listenerIndex instanceof JsonNotificationIndex
          && !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite")) {
        // get intersection when potentially sharing database
        allNotifications = this.getMissingNotifications(index, listenerIndex);
      }
    }

    if (allNotifications == null) {
      // fallback to previous behavior
      allNotifications = index.findNotifications((List<String>) null, (List<String>) null, (List<String>) null);
    }
    LOGGER.info("Done finding existing notifications");

    // queue them for processing in case they were previous missed
    Date now = new Date();
    int count = 0;
    for (final Notification notification : allNotifications) {
      NotificationEvent event = new NotificationEvent(receiver, notification);
      count += 1;
      if (event.getNotification().getExpirationDate().after(now)) {
        // still valid
        this.notifyListeners(event, gracefulListeners);
      }

      // try to keep queue size managable during restart
      throttleQueues(allNotifications.size() - count);
    }
    LOGGER.info("All notifications queued");

    // keep track that we've processed this notification index
    AUTOLOADED_INDEXES.add(index.getName());
  }

  /** @return default notification receiver */
  public DefaultNotificationReceiver getReceiver() {
    return receiver;
  }

  /** @param receiver of the default notification variety */
  public void setReceiver(DefaultNotificationReceiver receiver) {
    this.receiver = receiver;
  }

  /** @return map of status */
  public Map<String, Integer> getStatus() {
    HashMap<String, Integer> status = new HashMap<String, Integer>();

    for (final NotificationListener listener : notificationListeners.keySet()) {
      ExecutorService listenerExecutor = notificationListeners.get(listener);
      if (listenerExecutor instanceof ThreadPoolExecutor) {
        // check how many notifications are pending
        int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
        status.put(receiver.getName() + " - " + listener.getName(), size);
      }
    }

    return status;
  }

  /**
   * Query listener notification index for missing notifications
   *
   * @param notificationIndex     listener index
   * @param executorListenerIndex executur index
   * @return missing notifications
   * @throws Exception if missing notifications cannot be queried
   */
  public List<Notification> getMissingNotifications(NotificationIndex notificationIndex,
      NotificationIndex executorListenerIndex) throws Exception {
    List<Notification> allNotifications = null;
    Integer count = 0;
    Integer maxAttempts = 3;
    while (count < maxAttempts) {
      try {
        allNotifications = ((JsonNotificationIndex) notificationIndex)
            .getMissingNotifications(((JsonNotificationIndex) executorListenerIndex).getTable());
        LOGGER.info(
            "[" + this.getName() + "] Found missing notifications for requeuing (" + allNotifications.size() + ")");
        break;
      } catch (Exception e) {
        count++;
        if (count >= maxAttempts) {
          // last attempt, throw exception
          LOGGER.log(Level.SEVERE, "[" + this.getName() + "] Failed to load missing notifications", e);
          throw e;
        } else {
          LOGGER.log(Level.WARNING, "[" + this.getName() + "] Exception loading missing notifications, attempt ("
              + count + " out of " + maxAttempts + ")", e);
        }
      }
    }

    return allNotifications;
  }

  /**
   * Check queue status and return length of longest queue.
   *
   * @return length of longest queue, or null if no queue lengths.
   */
  public Integer getMaxQueueSize() {
    Integer maxSize = null;
    for (final NotificationListener listener : notificationListeners.keySet()) {
      ExecutorService listenerExecutor = notificationListeners.get(listener);
      if (listenerExecutor instanceof ThreadPoolExecutor) {
        // check how many notifications are pending
        int size = ((ThreadPoolExecutor) listenerExecutor).getQueue().size();
        if (maxSize == null || size > maxSize) {
          maxSize = size;
        }
      }
    }
    return maxSize;
  }

  /**
   * If longest queue has more than 50k notifications, wait until longest queue
   * has 25k notifications before returning.
   *
   * @throws InterruptedException if error occurs
   */
  public void throttleQueues() throws InterruptedException {
    throttleQueues(null);
  }

  /**
   * If longest queue has more than 50k notifications, wait until longest queue
   * has 25k notifications before returning.
   *
   * @param remaining integer
   * @throws InterruptedException if error occurs
   */
  public void throttleQueues(Integer remaining) throws InterruptedException {
    // try to keep queue size managable during restart
    int limit = throttleStartThreshold;
    // track whether any throttles occurred
    boolean throttled = false;

    while (true) {
      final Integer size = getMaxQueueSize();
      if (size == null || size <= limit) {
        // within limit
        if (throttled) {
          LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
        }
        break;
      }

      throttled = true;
      LOGGER.info("[" + getName() + "]" + " queueing throttled until below " + throttleStopThreshold + " (" + "size="
          + size + ", remaining=" + (remaining == null ? "?" : remaining) + ")");
      // too many messages queued
      // set limit to stop threshold
      limit = throttleStopThreshold;
      // wait for listener to do some processing
      // 5s is a little low, but don't want to wait too long
      Thread.sleep(throttleWaitInterval);
    }
  }

  /**
   * NOTE: messing with the executors map is not a good idea.
   *
   * @return the map of listeners and their executors.
   */
  public Map<NotificationListener, ExecutorService> getExecutors() {
    return notificationListeners;
  }

  /** @return int throttle start threshold */
  public int getThrottleStartThreshold() {
    return this.throttleStartThreshold;
  }

  /** @param n int throttle start threshold */
  public void setThrottleStartThreshold(final int n) {
    this.throttleStartThreshold = n;
  }

  /** @return int throttle stop threshold */
  public int getThrottleStopThreshold() {
    return this.throttleStopThreshold;
  }

  /** @param n int throttle stop threshold */
  public void setThrottleStopThreshold(final int n) {
    this.throttleStopThreshold = n;
  }

  /** @return int throttle wait interval */
  public long getThrottleWaitInterval() {
    return this.throttleWaitInterval;
  }

  /** @param ms long throttle wait interval in ms */
  public void setThrottleWaitInterval(final long ms) {
    this.throttleWaitInterval = ms;
  }

}