RoundRobinListenerNotifier.java

package gov.usgs.earthquake.distribution.roundrobinnotifier;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.ListenerNotifier;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationEvent;
import gov.usgs.earthquake.distribution.NotificationIndex;
import gov.usgs.earthquake.distribution.NotificationListener;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.util.DefaultConfigurable;

/**
 * Use round-robin queues to notify listeners.
 *
 * This attempts to prevent any one product source+type from blocking processing
 * of notifications from other product source+type.
 */
public class RoundRobinListenerNotifier extends DefaultConfigurable implements ListenerNotifier, Runnable {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(RoundRobinListenerNotifier.class.getName());

  /** List of indexes that have already been requeued. */
  private static final ArrayList<String> AUTOLOADED_INDEXES = new ArrayList<String>();

  /** The receiver using this notifier. */
  private final DefaultNotificationReceiver receiver;
  /** Registered notification listeners. */
  private final HashMap<NotificationListener, ListenerNotifierThread> listeners;
  /** Status/requeue thread. */
  private Thread thread;
  /** How often to print status and check for notifications to requeue. */
  private long statusInterval = 5000L;

  /**
   * Create new RoundRobinListenerNotifier.
   *
   * @param receiver the receiver using this notifier.
   */
  public RoundRobinListenerNotifier(final DefaultNotificationReceiver receiver) {
    this.receiver = receiver;
    this.listeners = new HashMap<NotificationListener, ListenerNotifierThread>();
    this.thread = null;
  }

  /**
   * Start the status/requeue thread.
   */
  public void startup() throws Exception {
    if (thread == null) {
      thread = new Thread(this);
      thread.start();

      requeue();
    }
  }

  /**
   * Stop the status/requeue thread.
   */
  public void shutdown() {
    if (thread != null) {
      thread.interrupt();
      thread = null;
    }
  }

  /**
   * Add a notification listener.
   */
  @Override
  public void addNotificationListener(NotificationListener listener) throws Exception {
    if (!listeners.containsKey(listener)) {
      ListenerNotifierThread notifier = new ListenerNotifierThread(listener);
      listeners.put(listener, notifier);
      notifier.start();
    }
  }

  /**
   * Remove a notification listener.
   */
  @Override
  public void removeNotificationListener(NotificationListener listener) throws Exception {
    if (listeners.containsKey(listener)) {
      ListenerNotifierThread notifier = listeners.remove(listener);
      notifier.stop();
    }
  }

  /**
   * Notify listeners.
   */
  @Override
  public void notifyListeners(NotificationEvent event) throws Exception {
    notifyListeners(event, listeners.values());
  }

  /**
   * Notify a specific list of listeners.
   *
   * Used during renotification to only notify listeners that have an index.
   *
   * @param event    notification.
   * @param toNotify list of listeners to notify.
   * @throws Exception if error occurs
   */
  protected void notifyListeners(NotificationEvent event, final Collection<ListenerNotifierThread> toNotify)
      throws Exception {
    Iterator<ListenerNotifierThread> iter = toNotify.iterator();
    while (iter.hasNext()) {
      iter.next().notify(event);
    }
  }

  /**
   * Run status/requeue tasks.
   */
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // run every 5 seconds
        Thread.sleep(statusInterval);

        Iterator<ListenerNotifierThread> iter = listeners.values().iterator();
        while (iter.hasNext()) {
          ListenerNotifierThread notifier = iter.next();
          // requeue errors
          notifier.requeueErrors();
          int queued = notifier.getQueue().size();
          int errors = notifier.getErrorQueue().size();
          // print status
          LOGGER.fine("[" + receiver.getName() + "-notifier] listener " + notifier.getListener().getName() + " "
              + queued + " queued, " + errors + " to retry");
        }
      } catch (InterruptedException ie) {
        // stopping
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "exception running notifier status", e);
      }
    }
  }

  /**
   * Requeue existing notifications at startup.
   *
   * @throws Exception if error occurs
   */
  protected void requeue() throws Exception {
    NotificationIndex index = receiver.getNotificationIndex();

    ArrayList<ListenerNotifierThread> toRenotify = new ArrayList<ListenerNotifierThread>();
    Iterator<ListenerNotifierThread> iter = listeners.values().iterator();
    while (iter.hasNext()) {
      ListenerNotifierThread notifier = iter.next();
      NotificationListener listener = notifier.getListener();
      if (listener instanceof DefaultNotificationListener
          && ((DefaultNotificationListener) listener).getNotificationIndex() != null) {
        // listener that has notification index
        String key = index.getName() + "|" + listener.getName();
        if (!AUTOLOADED_INDEXES.contains(key)) {
          // not already renotified
          toRenotify.add(notifier);
        }
      }
    }
    if (toRenotify.size() == 0) {
      // no listeners to renotify
      return;
    }

    LOGGER.fine("[" + receiver.getName() + "-notifier] requeuing notifications");
    Iterator<Notification> notifications = index
        .findNotifications((List<String>) null, (List<String>) null, (List<String>) null).iterator();
    while (notifications.hasNext()) {
      Notification notification = notifications.next();
      notifyListeners(new NotificationEvent(receiver, notification), toRenotify);
    }
    LOGGER.fine("[" + receiver.getName() + "-notifier] done requeuing notifications");
  }

}