DefaultNotificationListener.java

/*
 * DefaultNotificationListener
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.ObjectLock;
import gov.usgs.util.StringUtils;

import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A base implementation of a notification listener. Implements functionality
 * that is useful for most notification listeners.
 *
 * Sub classes should override the onProduct(Product) method to add custom
 * processing.
 *
 * The DefaultNotificationListener extends the AbstractListener and can use any
 * of those configuration parameters.
 *
 * @see gov.usgs.earthquake.product.AbstractListener
 */
public class DefaultNotificationListener extends AbstractListener
    implements NotificationListener, NotificationIndexCleanup.Listener {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(DefaultNotificationListener.class.getName());

  /** Property referencing a notification index config section. */
  public static final String NOTIFICATION_INDEX_PROPERTY = "listenerIndex";
  /** Property for listener index file */
  public static final String INDEX_FILE_PROPERTY = "listenerIndexFile";

  /** How long to wait until checking for expired notifications/products. */
  public static final String CLEANUP_INTERVAL_PROPERTY = "cleanupInterval";
  /** Default time to wait for cleanup. 1h */
  public static final String DEFAULT_CLEANUP_INTERVAL = "3600000";

  /** Property for concurrentProducts */
  public static final String CONCURRENT_PRODUCTS_PROPERTY = "concurrentProducts";
  /** How many products to process at a time. */
  public static final String DEFAULT_CONCURRENT_PRODUCTS = "1";

  /** Whether or not to process products more than once. */
  public static final String PROCESS_DUPLICATES = "processDuplicates";
  /** Default for process duplicates. False */
  public static final String DEFAULT_PROCESS_DUPLICATES = "false";

  /** Filter products based on content paths they contain. */
  public static final String INCLUDE_PATHS_PROPERTY = "includePaths";
  /** Property for exludePaths */
  public static final String EXCLUDE_PATHS_PROPERTY = "excludePaths";

  /** Optional notification index. */
  private NotificationIndex notificationIndex = null;

  /** How often to run cleanup task, in ms, <=0 = off. */
  private Long cleanupInterval = 0L;

  /** Timer that schedules sender cleanup task. */
  private Timer cleanupTimer = null;

  /** Notification index cleanup. */
  private NotificationIndexCleanup notificationCleanup = null;

  /** How many products to process at the same time. */
  private int concurrentProducts = 1;

  /** Whether or not to process products that have already been processed. */
  private boolean processDuplicates = false;

  /** Array of content paths to search. */
  private final ArrayList<String> includePaths = new ArrayList<String>();

  /** Array of content paths to search. */
  private final ArrayList<String> excludePaths = new ArrayList<String>();

  /**
   * Locks used to keep concurrent listener from processing product more than once
   * at the same time.
   */
  private ObjectLock<ProductId> storageLocks = new ObjectLock<ProductId>();

  /**
   * Implement the NotificationListener interface.
   *
   * This method calls accept, and if accept returns true, retrieves the product
   * and calls onProduct.
   */
  public void onNotification(final NotificationEvent event) throws Exception {
    Notification notification = event.getNotification();
    ProductId id = notification.getProductId();
    String productId = id.toString();

    LOGGER.finest("[" + getName() + "] received notification for id=" + productId);

    if (!accept(id)) {
      LOGGER.finest("[" + getName() + "] rejected notification for id=" + productId);
      return;
    }

    // only allow one thread to process a product
    storageLocks.acquireLock(id);
    try {
      if (!onBeforeProcessNotification(notification)) {
        return;
      }

      LOGGER.finer("[" + getName() + "] processing notification for id=" + productId);

      Product product;
      try {
        product = event.getProduct();
      } catch (ContentTypeNotSupportedException | InvalidSignatureException | FileNotFoundException exception) {
        // Catching these exceptions stops us from continueing on to onProduct
        LOGGER.warning(
            String.format("[%s] not processing product for id=%s, %s", getName(), productId, exception.getMessage()));
        return;
      }

      if (product == null) {
        throw new ContinuableListenerException("retrieved product null," + " notification id=" + productId);
      }

      if (!onBeforeProcessProduct(product)) {
        return;
      }
      LOGGER.finer("[" + getName() + "] processing product for id=" + productId);
      onProduct(product);

      onAfterProcessNotification(notification);
    } finally {
      // be sure to release lock when done/error
      storageLocks.releaseLock(id);
    }
  }

  /**
   * Called by onNotification when a product is retrieved.
   *
   * @param product a product whose notification was accepted.
   * @throws Exception if error occurs
   */
  public void onProduct(final Product product) throws Exception {
    // subclasses do stuff here
    ProductId id = product.getId();
    StringBuffer b = new StringBuffer("[" + getName() + "] product processed source=" + id.getSource() + ", type="
        + id.getType() + ", code=" + id.getCode() + ", updateTime=" + id.getUpdateTime().toString());

    Map<String, String> properties = product.getProperties();
    Iterator<String> iter = properties.keySet().iterator();
    while (iter.hasNext()) {
      String name = iter.next();
      b.append(", ").append(name).append("=").append(properties.get(name));
    }

    LOGGER.info(b.toString());
    System.out.println(b.toString());
  }

  /**
   * Called just before this listener processes a notification.
   *
   * @param notification notification about to be processed.
   * @return true to process the notification, false to skip
   * @throws Exception if error occurs
   */
  protected boolean onBeforeProcessNotification(final Notification notification) throws Exception {
    // only check if we care
    if (!processDuplicates && this.notificationIndex != null) {
      List<Notification> notifications = this.notificationIndex.findNotifications(notification.getProductId());
      if (notifications.size() > 0) {
        LOGGER.finer("[" + getName() + "] skipping existing product " + notification.getProductId().toString());
        return false;
      }
    }

    return true;
  }

  /**
   * Called after a product has been downloaded, but before onProduct is called.
   *
   * Sometimes a listener cannot tell whether it should process a product until
   * its contents are available. This is where the "includePaths" and
   * "excludePaths" are evaluated.
   *
   * @param product product about to be processed.
   * @return true to process the product, false to skip
   * @throws Exception if error occurs
   */
  protected boolean onBeforeProcessProduct(final Product product) throws Exception {
    if (excludePaths.size() != 0) {
      Map<String, Content> contents = product.getContents();
      Iterator<String> pathIter = excludePaths.iterator();
      while (pathIter.hasNext()) {
        String path = pathIter.next();
        if (contents.containsKey(path)) {
          // contains at least one matching include path
          LOGGER.fine("[" + getName() + "] skipping product " + product.getId().toString() + ", '" + path
              + "' matches excludePaths");
          return false;
        }
      }
    }

    if (includePaths.size() != 0) {
      boolean containsPath = false;
      Map<String, Content> contents = product.getContents();
      Iterator<String> pathIter = includePaths.iterator();
      while (pathIter.hasNext()) {
        String path = pathIter.next();
        if (contents.containsKey(path)) {
          // contains at least one matching include path
          containsPath = true;
          break;
        }
      }
      if (!containsPath) {
        LOGGER.fine(
            "[" + getName() + "] skipping product " + product.getId().toString() + ", does not match includePaths");
        return false;
      }
    }

    return true;
  }

  /**
   * Called when this listener has successfully processed a notification.
   *
   * @param notification notification that was processed.
   * @throws Exception if error occurs
   */
  protected void onAfterProcessNotification(final Notification notification) throws Exception {
    if (this.notificationIndex != null) {
      this.notificationIndex.addNotification(notification);
    }
  }

  /**
   * Called when an expired notification is being removed from the index.
   *
   * @param notification to be removed
   * @throws Exception if error occurs
   */
  @Override
  public void onExpiredNotification(final Notification notification) throws Exception {
    // nothing to do
  }

  /**
   * Periodic cleanup task.
   *
   * Called every cleanupInterval milliseconds.
   *
   * @throws Exception if could not create new notificationCleanup
   */
  public void cleanup() throws Exception {
    LOGGER.finer("[" + getName() + "] running listener cleanup");
    if (this.notificationCleanup == null) {
      this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
      this.notificationCleanup.startup();
    } else {
      this.notificationCleanup.wakeUp();
    }
  }

  @Override
  public void startup() throws Exception {
    super.startup();
    if (this.notificationIndex != null) {
      this.notificationIndex.startup();
    }

    // only schedule cleanup if interval is non-zero
    if (cleanupInterval > 0) {
      cleanupTimer = new Timer();
      cleanupTimer.scheduleAtFixedRate(new TimerTask() {
        public void run() {
          try {
            cleanup();
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "[" + getName() + "] exception during cleanup", e);
          }
        }
      }, 0, cleanupInterval);
    }
  }

  @Override
  public void shutdown() throws Exception {
    super.shutdown();
    if (this.notificationCleanup != null) {
      try {
        this.notificationCleanup.shutdown();
      } catch (Exception e) {
        LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e);
      } finally {
        this.notificationCleanup = null;
      }
    }
    try {
      this.notificationIndex.shutdown();
    } catch (Exception e) {
      // ignore
    }
    try {
      this.cleanupTimer.cancel();
    } catch (Exception e) {
      // ignore
    }
  }

  @Override
  public void configure(final Config config) throws Exception {
    super.configure(config);

    String notificationIndexName = config.getProperty(NOTIFICATION_INDEX_PROPERTY);
    String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
    if (notificationIndexName != null) {
      LOGGER.config("[" + getName() + "] loading notification index '" + notificationIndexName + "'");
      notificationIndex = (NotificationIndex) Config.getConfig().getObject(notificationIndexName);
      if (notificationIndex == null) {
        throw new ConfigurationException(
            "[" + getName() + "] index '" + notificationIndexName + "' is not properly configured");
      }
    } else if (notificationIndexFile != null) {
      LOGGER.config("[" + getName() + "] using notification index '" + notificationIndexFile + "'");
      notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
    }

    cleanupInterval = Long.parseLong(config.getProperty(CLEANUP_INTERVAL_PROPERTY, DEFAULT_CLEANUP_INTERVAL));
    LOGGER.config("[" + getName() + "] cleanup interval = " + processDuplicates);

    concurrentProducts = Integer
        .parseInt(config.getProperty(CONCURRENT_PRODUCTS_PROPERTY, DEFAULT_CONCURRENT_PRODUCTS));
    LOGGER.config("[" + getName() + "] concurrent products = " + concurrentProducts);

    processDuplicates = Boolean.valueOf(config.getProperty(PROCESS_DUPLICATES, DEFAULT_PROCESS_DUPLICATES));
    LOGGER.config("[" + getName() + "] process duplicates = " + processDuplicates);

    includePaths.addAll(StringUtils.split(config.getProperty(INCLUDE_PATHS_PROPERTY), ","));
    LOGGER.config("[" + getName() + "] include paths = " + includePaths);

    excludePaths.addAll(StringUtils.split(config.getProperty(EXCLUDE_PATHS_PROPERTY), ","));
    LOGGER.config("[" + getName() + "] exclude paths = " + excludePaths);
  }

  /** @return notificationIndex */
  public NotificationIndex getNotificationIndex() {
    return notificationIndex;
  }

  /** @param notificationIndex to set */
  public void setNotificationIndex(NotificationIndex notificationIndex) {
    this.notificationIndex = notificationIndex;
  }

  /** @return cleanupInterval */
  public Long getCleanupInterval() {
    return cleanupInterval;
  }

  /** @param cleanupInterval long to set */
  public void setCleanupInterval(Long cleanupInterval) {
    this.cleanupInterval = cleanupInterval;
  }

  /** @return concurrentProducts */
  public int getConcurrentProducts() {
    return concurrentProducts;
  }

  /** @param concurrentProducts int to set */
  public void setConcurrentProducts(int concurrentProducts) {
    this.concurrentProducts = concurrentProducts;
  }

  /** @return processDuplicates */
  public boolean isProcessDuplicates() {
    return processDuplicates;
  }

  /** @param processDuplicates boolean to set */
  public void setProcessDuplicates(boolean processDuplicates) {
    this.processDuplicates = processDuplicates;
  }

  /**
   * @return the includePaths
   */
  public ArrayList<String> getIncludePaths() {
    return includePaths;
  }

  /**
   * @return the excludePaths
   */
  public ArrayList<String> getExcludePaths() {
    return excludePaths;
  }

}