RelayProductListener.java
/*
* RelayProductListener
*/
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;
import gov.usgs.util.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Listen for products and uses any number of given product senders to send
* products as they are received.
*/
public class RelayProductListener extends DefaultNotificationListener {
/** Logging object. */
private static final Logger LOGGER = Logger.getLogger(RelayProductListener.class.getName());
/** Property for list of senders */
public static final String SENDERS_PROPERTY = "senders";
/** Sender used to send products. */
private List<ProductSender> senders = new ArrayList<>();
/**
* Empty constructor for configurable.
*/
public RelayProductListener() {
}
/**
* Construct a RelayProductListener using a list of ProductSenders.
*
* @param senders the senders to use.
*/
public RelayProductListener(final List<ProductSender> senders) {
this.senders = senders;
}
/**
* Send a product with each sender.
*
* @param product product to send
*/
public void onProduct(final Product product) {
LOGGER.info("Relaying product " + product.getId().toString() + " " + product.getId().getUpdateTime());
ProductSender currentSender = null;
Iterator<ProductSender> iter = senders.iterator();
while (iter.hasNext()) {
try {
currentSender = iter.next();
currentSender.sendProduct(product);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Error relaying product with sender: " + currentSender.getName(), e);
}
}
}
/**
* Configure listener and add any senders provided
*
*/
public void configure(Config config) throws Exception {
// read DefaultNotificationListener properties
super.configure(config);
String currentSenderName = null;
Iterator<String> iter = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
while (iter.hasNext()) {
try {
currentSenderName = iter.next();
LOGGER.config("Loading sender '" + currentSenderName + "'");
ProductSender sender = (ProductSender) Config.getConfig().getObject(currentSenderName);
if (sender == null) {
throw new ConfigurationException("Sender " + currentSenderName + "is not properly configured");
}
senders.add(sender);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception while trying to configure sender: " + currentSenderName, e);
}
}
}
/*
* Set name for this and any senders
*
*/
public void setName(final String name) {
super.setName(name);
}
/**
* Call the senders shutdown method.
*/
public void shutdown() throws Exception {
super.shutdown();
Iterator<ProductSender> iter = senders.iterator();
ProductSender currentSender = null;
while (iter.hasNext()) {
try {
currentSender = iter.next();
currentSender.shutdown();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception while trying to shut down sender: " + currentSender.getName(), e);
}
}
}
/**
* Call the senders startup method.
*/
public void startup() throws Exception {
super.startup();
Iterator<ProductSender> iter = senders.iterator();
while (iter.hasNext()) {
iter.next().startup();
}
}
public List<ProductSender> getSenders() {
return senders;
}
public void setSenders(List<ProductSender> senders) {
this.senders = senders;
}
}