DefaultNotificationReceiver.java

/*
 * DefaultNotificationReceiver
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.aws.JsonNotification;
import gov.usgs.earthquake.distribution.roundrobinnotifier.RoundRobinListenerNotifier;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.SizeLimitInputStream;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.ObjectLock;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;

import javax.json.Json;

import java.util.logging.Level;

/**
 * The core of product distribution.
 *
 * A DefaultNotificationReceiver receives notifications and notifies listeners
 * of received notifications. NotificationListeners use the NotificationReceiver
 * to retrieve products referenced by notifications.
 *
 * The NotificationReceiver uses a NotificationIndex to track received
 * notifications, and a ProductStorage to store retrieved products.
 *
 * The DefaultNotificationReceiver implements the Configurable interface and
 * uses the following configuration parameters:
 *
 * Each listener has a separate queue of notifications. Each listener is
 * allocated one thread to process notifications from this queue.
 */
public class DefaultNotificationReceiver extends DefaultConfigurable
    implements NotificationReceiver, NotificationIndexCleanup.Listener {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(DefaultNotificationReceiver.class.getName());

  /** Property referencing a notification index config section. */
  public static final String NOTIFICATION_INDEX_PROPERTY = "index";

  /** Shortcut to create a SQLite JDBCNotificationIndex. */
  public static final String INDEX_FILE_PROPERTY = "indexFile";

  /** Property referencing a product storage config section. */
  public static final String PRODUCT_STORAGE_PROPERTY = "storage";

  /** Shortcut to create a FileProductStorage. */
  public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

  /** Property referencing how long to store products in milliseconds. */
  public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageAge";

  /** Default max age to store products, 3600000 milliseconds = 1 hour. */
  public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "3600000";

  /**
   * Property referencing how long to wait until checking for expired
   * notifications/products.
   */
  public static final String RECEIVER_CLEANUP_PROPERTY = "cleanupInterval";

  /**
   * Default time between checking for expired notifications/products, 900000
   * milliseconds = 15 minutes.
   */
  public static final String DEFAULT_RECEIVER_CLEANUP = "900000";

  /** Property for connection Timeout */
  public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  /** Default connection timeout. 15 seconds */
  public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
  /** Property for read timeout */
  public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
  /** default read timeout. 15 seconds */
  public static final String DEFAULT_READ_TIMEOUT = "15000";

  /** Property for listener notifier */
  public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
  /** Property for listener notifier to set to executor */
  public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
  /** Property to listener notifier to set to future */
  public static final String FUTURE_LISTENER_NOTIFIER = "future";
  /** Property to listener notifier to set to roundrobin */
  public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";

  /** The notification index where received notifications are stored. */
  private NotificationIndex notificationIndex;

  /** The product storage where retrieved products are stored. */
  private ProductStorage productStorage;

  /** How long to store retrieved product, in milliseconds. */
  private Long productStorageMaxAge = 0L;

  /** How long to wait until checking for expired notifications/products. */
  private Long receiverCleanupInterval = 0L;

  /** Timer that schedules receiver cleanup task. */
  private Timer receiverCleanupTimer = new Timer();

  /** Notification cleanup */
  private NotificationIndexCleanup notificationCleanup = null;

  private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT);
  private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT);

  private ListenerNotifier notifier;

  /** A lock that is acquired when a product is being retrieved. */
  private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();

  /** Creates new ExecutorListenerNotifier to var notifier */
  public DefaultNotificationReceiver() {
    notifier = new ExecutorListenerNotifier(this);
  }

  /**
   * Add a new notification listener.
   *
   * @param listener the listener to add. When notifications are received, this
   *                 listener will be notified.
   * @throws Exception exception
   */
  public void addNotificationListener(NotificationListener listener) throws Exception {
    notifier.addNotificationListener(listener);
  }

  /**
   * Remove an existing notification listener.
   *
   * Any currently queued notifications are processed before shutting down.
   *
   * @param listener the listener to remove. When notifications are receive, this
   *                 listener will no longer be notified.
   * @throws Exception exception
   */
  public void removeNotificationListener(NotificationListener listener) throws Exception {
    notifier.removeNotificationListener(listener);
  }

  /**
   * Store a notification and notify listeners.
   *
   * Updates the notification index before notifying listeners of the newly
   * available product.
   *
   * @param notification the notification being received.
   * @throws Exception if the notificationIndex throws an Exception.
   */
  public void receiveNotification(Notification notification) throws Exception {
    LOGGER.fine("[" + getName() + "] Notification Received " + notification.getProductId().toString());
    if (notification.getExpirationDate().before(new Date())) {
      LOGGER.finer("[" + getName() + "] skipping already expired notification for product id="
          + notification.getProductId().toString() + ", expiration=" + notification.getExpirationDate().toString());
    } else {
      // add notification to index
      notificationIndex.addNotification(notification);

      if (notification instanceof JsonNotification) {
        LOGGER.finer("[" + getName() + "] json notification " + notification.getProductId());
      } else if (notification instanceof URLNotification) {
        LOGGER.finer(
            "[" + getName() + "] notification URL=" + ((URLNotification) notification).getProductURL().toString());
      }

      notifyListeners(notification);
    }
  }

  /**
   * Send a notification to all registered NotificationListeners.
   *
   * Creates a NotificationEvent, with a reference to this object and calls each
   * notificationListeners onNotification method in separate threads.
   *
   * This method usually returns before registered NotificationListeners have
   * completed processing a notification.
   *
   * @param notification the notification being sent to listeners.
   * @throws Exception exception
   */
  protected void notifyListeners(final Notification notification) throws Exception {

    LOGGER.finest("[" + getName() + "] notifying listeners for product id=" + notification.getProductId().toString());

    // queue notification for listeners
    NotificationEvent event = new NotificationEvent(this, notification);
    notifier.notifyListeners(event);
  }

  /** @return "Using notifier" */
  public String getListenerQueueStatus() {
    return "Using notifier";
  }

  /**
   * Search the notification index for expired notifications, removing any that
   * are found. When a notification in the index is not a URLNotification, it
   * represents a product in storage that will also be removed.
   *
   * @throws Exception if NotificationIndexCleanup throws an Exception.
   */
  public void removeExpiredNotifications() throws Exception {
    LOGGER.fine("[" + getName() + "] running receiver cleanup");
    // use NotificationIndexCleanup to manage cleanup in separate thread
    if (this.notificationCleanup == null) {
      this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
      this.notificationCleanup.startup();
    } else {
      this.notificationCleanup.wakeUp();
    }
  }

  /**
   * Callback from the NotificationIndexCleanup thread.
   *
   * Checks if Notification refers to a product in storage, which should also be
   * removed.
   *
   * @param notification expired notification about to be removed.
   * @throws Exception if error occurs sttempting to removing product
   * @see ProductStorage
   */
  public void onExpiredNotification(final Notification notification) throws Exception {
    if (!(notification instanceof URLNotification)) {
      // if it isn't a url notification, it's also in storage
      productStorage.removeProduct(notification.getProductId());
      if (LOGGER.isLoggable(Level.FINEST)) {
        LOGGER.finest("[" + getName() + "] removed expired product from receiver cache "
            + notification.getProductId().toString());
      }
    }
  }

  /**
   * Retrieve a product by id.
   *
   * If this product is already in storage, load and return the product.
   * Otherwise, search notifications for this product, and download the product
   * into storage.
   *
   * @param id the product to retrieve
   * @return the retrieved product, or null if not available.
   * @throws Exception exception
   */
  public Product retrieveProduct(ProductId id) throws Exception {
    Product product = null;
    String productIdString = id.toString();

    LOGGER.finest("[" + getName() + "] acquiring retrieve lock id=" + productIdString);
    retrieveLocks.acquireLock(id);
    LOGGER.finest("[" + getName() + "] retrieve lock acquired id=" + productIdString);
    try {
      if (productStorage.hasProduct(id)) {
        try {
          LOGGER.finest("[" + getName() + "] storing product id=" + productIdString);
          product = productStorage.getProduct(id);
          LOGGER.finest("[" + getName() + "] product stored id=" + productIdString);
        } catch (Exception e) {
          LOGGER.log(Level.FINE, "[" + getName() + "] storage claims hasProduct, but threw exception", e);
          product = null;
        }
      }

      if (product == null) {
        LOGGER.finer("[" + getName() + "] don't have product yet, searching notifications");
        // don't have product yet, search notifications
        Iterator<Notification> iter = notificationIndex.findNotifications(id).iterator();

        Exception caughtException = null;
        while (product == null && iter.hasNext()) {
          Notification notification = iter.next();
          if (!(notification instanceof URLNotification)) {
            // only URL notifications include location info
            continue;
          }

          InputStream in = null;
          try {
            URL productURL = ((URLNotification) notification).getProductURL();

            ProductSource productSource = null;
            SizeLimitInputStream sizeIn = null;

            final Date beginConnect = new Date();
            Date beginDownload = new Date();
            if (productURL.getProtocol().equals("data")) {
              Product tempProduct = new JsonProduct()
                  .getProduct(Json.createReader(StreamUtils.getInputStream(productURL)).readObject());
              // JSON notification with embedded product
              LOGGER.finer("[" + getName() + "] parsed json notification for " + tempProduct.getId().toString());
              productSource = new ObjectProductSource(tempProduct);
            } else {
              // URL notification
              LOGGER.finer("[" + getName() + "] notification url " + productURL.toString());

              in = StreamUtils.getURLInputStream(productURL, connectTimeout, readTimeout);
              beginDownload = new Date();
              // use size limit with negative limit to count transfer size
              sizeIn = new SizeLimitInputStream(in, -1);
              productSource = IOUtil.autoDetectProductSource(sizeIn);
            }

            Notification storedNotification = storeProductSource(productSource);

            final Date endDownload = new Date();

            final long downloadTime = endDownload.getTime() - beginDownload.getTime();

            LOGGER.fine(
                "[" + getName() + "] receiver retrieved product" + " id=" + id.toString() + " (time = "
                    + downloadTime + " ms)" + " from "
                    + (productURL.getProtocol().equals("data") ? "data url" : productURL.toString()));

            LOGGER.finest("[" + getName() + "] after store product, notification=" + storedNotification);

            if (productStorage.hasProduct(id)) {
              LOGGER.finer("[" + getName() + "] getting product from storage");
              product = productStorage.getProduct(id);
              LOGGER.finest("[" + getName() + "] after getProduct, product=" + product);

              LOGGER.fine("[" + getName() + "] product downloaded from "
                  + (productURL.getProtocol().equals("data") ? "data url" : productURL.toString()));
            } else {
              LOGGER.finer("[" + getName() + "] product not in storage id=" + productIdString);
            }
          } catch (ProductAlreadyInStorageException productAlreadyInStorageException) {
            LOGGER.finer(() -> String.format("[%s] product already in storage id=%s", getName(), productIdString));
            product = productStorage.getProduct(id);
            continue;
          } catch (ContentTypeNotSupportedException | InvalidSignatureException e) {
            // Product was unable to be stored
            LOGGER.warning(() -> String.format("[%s] exception storing product source, %s", getName(), e.getMessage()));
            caughtException = e;
          } catch (FileNotFoundException fileNotFoundException) {
            LOGGER.warning(() -> String.format("[%s] exception while retrieving product, file not found", getName()));
            caughtException = fileNotFoundException;
          } catch (Exception e) {
            if (e.getCause() instanceof ProductAlreadyInStorageException) {
              LOGGER.finer(() -> String.format("[%s] product already in storage id=%s", getName(), productIdString));
              product = productStorage.getProduct(id);
            } else {
              // log any exception that happened while retrieving product
              LOGGER.warning(() -> String.format("[%s] exception while retrieving product", getName()));
            }
          } finally {
            StreamUtils.closeStream(in);
          }
        }

        // If product was not set after iterating through all the notifications, throw
        // the last exception caught to allow upstream caller to handle appropriately
        if (Objects.isNull(product) && !Objects.isNull(caughtException)) {
          throw caughtException;
        }
      }
    } finally {
      LOGGER.finest("[" + getName() + "] releasing retrieve lock id=" + productIdString);
      retrieveLocks.releaseLock(id);
      LOGGER.finest("[" + getName() + "] retrieve lock released id=" + productIdString);
    }

    // return product
    return product;
  }

  /**
   * Calls the current <code>ProductStorage.storeProductSource</code> method.
   *
   * @param source The <code>ProductSource</code> to store.
   * @return The <code>ProductId</code> of the product referenced by the given
   *         <code>ProductSource</code>.
   * @throws Exception
   * @see gov.usgs.earthquake.distribution.ProductStorage
   */
  protected Notification storeProductSource(ProductSource source) throws Exception {
    Notification notification = null;

    // store product input
    ProductId id = productStorage.storeProductSource(source);

    // check if stored
    if (productStorage.hasProduct(id)) {
      // calculate storage expiration date
      Date expirationDate = new Date(new Date().getTime() + productStorageMaxAge);

      // update notification index
      notification = new DefaultNotification(id, expirationDate);
      notificationIndex.addNotification(notification);
    }

    return notification;
  }

  /**
   * Send matching notifications to listener.
   *
   * Searches the NotificationIndex for matching notifications, and sends a
   * NotificationEvent for each notification found.
   *
   * @param listener the listener to receive a NotificationEvent for each found
   *                 notification.
   * @param sources  sources to include, or null for all.
   * @param types    types to include, or null for all.
   * @param codes    codes to include, or null for all.
   * @throws Exception if the notification index or notification listener throw an
   *                   exception.
   */
  public void sendNotifications(NotificationListener listener, List<String> sources, List<String> types,
      List<String> codes) throws Exception {
    List<Notification> notifications = notificationIndex.findNotifications(sources, types, codes);
    Iterator<Notification> iter = notifications.iterator();
    while (iter.hasNext()) {
      listener.onNotification(new NotificationEvent(this, iter.next()));
    }
  }

  public void configure(Config config) throws Exception {
    String notificationIndexName = config.getProperty(NOTIFICATION_INDEX_PROPERTY);
    String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
    if (notificationIndexName == null && notificationIndexFile == null) {
      throw new ConfigurationException("[" + getName() + "] 'index' is a required configuration property");
    }
    if (notificationIndexName != null) {
      LOGGER.config("[" + getName() + "] loading notification index '" + notificationIndexName + "'");
      notificationIndex = (NotificationIndex) Config.getConfig().getObject(notificationIndexName);
      if (notificationIndex == null) {
        throw new ConfigurationException(
            "[" + getName() + "] index '" + notificationIndexName + "' is not properly configured");
      }
    } else {
      LOGGER.config("[" + getName() + "] using notification index '" + notificationIndexFile + "'");
      notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
    }

    String productStorageName = config.getProperty(PRODUCT_STORAGE_PROPERTY);
    String storageDirectory = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
    if (productStorageName == null && storageDirectory == null) {
      throw new ConfigurationException("[" + getName() + "] 'storage' is a required configuration property");
    }
    if (productStorageName != null) {
      LOGGER.config("[" + getName() + "] loading product storage '" + productStorageName + "'");
      productStorage = (ProductStorage) Config.getConfig().getObject(productStorageName);
      if (productStorage == null) {
        throw new ConfigurationException(
            "[" + getName() + "] storage '" + productStorageName + "' is not properly configured");
      }
    } else {
      LOGGER.config("[" + getName() + "] using storage directory '" + storageDirectory + "'");
      productStorage = new FileProductStorage(new File(storageDirectory));
    }

    productStorageMaxAge = Long.parseLong(config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY,
        // previously all lower-case
        config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY.toLowerCase(), DEFAULT_PRODUCT_STORAGE_MAX_AGE)));
    LOGGER.config("[" + getName() + "] storage max age " + productStorageMaxAge + " ms");

    receiverCleanupInterval = Long.parseLong(config.getProperty(RECEIVER_CLEANUP_PROPERTY, DEFAULT_RECEIVER_CLEANUP));
    LOGGER.config("[" + getName() + "] receiver cleanup interval " + receiverCleanupInterval + " ms");

    connectTimeout = Integer.parseInt(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
    LOGGER.config("[" + getName() + "] receiver connect timeout " + connectTimeout + " ms");

    readTimeout = Integer.parseInt(config.getProperty(READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
    LOGGER.config("[" + getName() + "] receiver read timeout " + readTimeout + " ms");

    String notifierType = config.getProperty(LISTENER_NOTIFIER_PROPERTY);
    if (notifierType != null) {
      if (notifierType.equals(EXECUTOR_LISTENER_NOTIFIER)) {
        notifier = new ExecutorListenerNotifier(this);
        LOGGER.config("[" + getName() + "] using executor listener notifier");
      } else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
        notifier = new FutureListenerNotifier(this);
      } else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
        notifier = new RoundRobinListenerNotifier(this);
        LOGGER.config("[" + getName() + "] using round-robin listener notifier");
      } else {
        throw new ConfigurationException("Unknown notifier type " + notifierType);
      }
    }
  }

  public void shutdown() throws Exception {
    receiverCleanupTimer.cancel();
    if (notificationCleanup != null) {
      try {
        notificationCleanup.shutdown();
        notificationCleanup = null;
      } catch (Exception ignore) {
      }
    }
    try {
      notifier.shutdown();
    } catch (Exception ignore) {
    }
    try {
      notificationIndex.shutdown();
    } catch (Exception ignore) {
    }
    try {
      productStorage.shutdown();
    } catch (Exception ignore) {
    }
  }

  public void startup() throws Exception {
    if (productStorage == null) {
      throw new ConfigurationException("[" + getName() + "] storage has not been configured properly");
    }
    if (notificationIndex == null) {
      throw new ConfigurationException("[" + getName() + "] index has not been configured properly");
    }
    productStorage.startup();
    notificationIndex.startup();

    // only schedule cleanup if interval is non-zero
    if (receiverCleanupInterval > 0) {
      receiverCleanupTimer.scheduleAtFixedRate(new TimerTask() {
        public void run() {
          try {
            removeExpiredNotifications();
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "[" + getName() + "] exception during receiver cleanup", e);
          }
        }
      }, 0, receiverCleanupInterval);
    }

    // do this last since it may start processing
    notifier.startup();

    // ProductClient already started these listeners...
    // Iterator<NotificationListener> iter = notificationListeners.keySet()
    // .iterator();
    // while (iter.hasNext()) {
    // iter.next().startup();
    // }
  }

  /**
   * @return the notificationIndex
   */
  public NotificationIndex getNotificationIndex() {
    return notificationIndex;
  }

  /**
   * @param notificationIndex the notificationIndex to set
   */
  public void setNotificationIndex(NotificationIndex notificationIndex) {
    this.notificationIndex = notificationIndex;
  }

  /**
   * @return the productStorage
   */
  public ProductStorage getProductStorage() {
    return productStorage;
  }

  /**
   * @param productStorage the productStorage to set
   */
  public void setProductStorage(ProductStorage productStorage) {
    this.productStorage = productStorage;
  }

  /**
   * @return the productStorageMaxAge
   */
  public Long getProductStorageMaxAge() {
    return productStorageMaxAge;
  }

  /**
   * @param productStorageMaxAge the productStorageMaxAge to set
   */
  public void setProductStorageMaxAge(Long productStorageMaxAge) {
    this.productStorageMaxAge = productStorageMaxAge;
  }

  /**
   * @return the QueueStatus or null if ExecutorListenerNotifier doesn't exist
   */
  public Map<String, Integer> getQueueStatus() {
    if (notifier instanceof ExecutorListenerNotifier) {
      return ((ExecutorListenerNotifier) notifier).getStatus();
    }
    return null;
  }

  /**
   * Throttle notifier queues
   *
   * @throws InterruptedException InterruptedException
   */
  public void throttleQueues() throws InterruptedException {
    if (notifier instanceof ExecutorListenerNotifier) {
      ((ExecutorListenerNotifier) notifier).throttleQueues();
    }
  }

  /**
   * @return receiverCleanupInterval
   */
  public Long getReceiverCleanupInterval() {
    return receiverCleanupInterval;
  }

  /**
   * @param receiverCleanupInterval the receiverCleanupInterval to set
   */
  public void setReceiverCleanupInterval(Long receiverCleanupInterval) {
    this.receiverCleanupInterval = receiverCleanupInterval;
  }

  /**
   * @return connectionTimeout
   */
  public int getConnectTimeout() {
    return connectTimeout;
  }

  /**
   * @param connectTimeout int connectionTimeout to set
   */
  public void setConnectTimeout(int connectTimeout) {
    this.connectTimeout = connectTimeout;
  }

  /**
   * @return ListenerNotifier
   */
  public ListenerNotifier getNotifier() {
    return this.notifier;
  }

  /**
   * @param notifier ListenerNotifier to set
   */
  public void setNotifier(final ListenerNotifier notifier) {
    this.notifier = notifier;
  }

  /**
   * @return readTimeout
   */
  public int getReadTimeout() {
    return readTimeout;
  }

  /**
   * @param readTimeout int readTimeout to set
   */
  public void setReadTimeout(int readTimeout) {
    this.readTimeout = readTimeout;
  }

}