RelayProductListener.java

/*
 * RelayProductListener
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;
import gov.usgs.util.StringUtils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Listen for products and uses any number of given product senders to send
 * products as they are received.
 */
public class RelayProductListener extends DefaultNotificationListener {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(RelayProductListener.class.getName());

  /** Property for list of senders */
  public static final String SENDERS_PROPERTY = "senders";

  /** Sender used to send products. */
  private List<ProductSender> senders = new ArrayList<>();

  /**
   * Empty constructor for configurable.
   */
  public RelayProductListener() {
  }

  /**
   * Construct a RelayProductListener using a list of ProductSenders.
   *
   * @param senders the senders to use.
   */
  public RelayProductListener(final List<ProductSender> senders) {
    this.senders = senders;
  }

  /**
   * Send a product with each sender.
   * 
   * @param product product to send
   */
  public void onProduct(final Product product) {
    LOGGER.info("Relaying product " + product.getId().toString() + " " + product.getId().getUpdateTime());
    ProductSender currentSender = null;
    Iterator<ProductSender> iter = senders.iterator();
    while (iter.hasNext()) {
      try {
        currentSender = iter.next();
        currentSender.sendProduct(product);
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "Error relaying product with sender: " + currentSender.getName(), e);
      }
    }
  }

  /**
   * Configure listener and add any senders provided
   * 
   */
  public void configure(Config config) throws Exception {
    // read DefaultNotificationListener properties
    super.configure(config);
    String currentSenderName = null;
    Iterator<String> iter = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
    while (iter.hasNext()) {
      try {
        currentSenderName = iter.next();
        LOGGER.config("Loading sender '" + currentSenderName + "'");
        ProductSender sender = (ProductSender) Config.getConfig().getObject(currentSenderName);
        if (sender == null) {
          throw new ConfigurationException("Sender " + currentSenderName + "is not properly configured");
        }
        senders.add(sender);
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "Exception while trying to configure sender: " + currentSenderName, e);
      }
    }
  }

  /*
   * Set name for this and any senders
   * 
   */
  public void setName(final String name) {
    super.setName(name);
  }

  /**
   * Call the senders shutdown method.
   */
  public void shutdown() throws Exception {
    super.shutdown();
    Iterator<ProductSender> iter = senders.iterator();
    ProductSender currentSender = null;
    while (iter.hasNext()) {
      try {
        currentSender = iter.next();
        currentSender.shutdown();
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "Exception while trying to shut down sender: " + currentSender.getName(), e);
      }
    }
  }

  /**
   * Call the senders startup method.
   */
  public void startup() throws Exception {
    super.startup();
    Iterator<ProductSender> iter = senders.iterator();
    while (iter.hasNext()) {
      iter.next().startup();
    }
  }

  public List<ProductSender> getSenders() {
    return senders;
  }

  public void setSenders(List<ProductSender> senders) {
    this.senders = senders;
  }

}