DefaultNotificationReceiver.java
/*
* DefaultNotificationReceiver
*/
package gov.usgs.earthquake.distribution;
import gov.usgs.earthquake.aws.JsonNotification;
import gov.usgs.earthquake.distribution.roundrobinnotifier.RoundRobinListenerNotifier;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.SizeLimitInputStream;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.ObjectLock;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;
import javax.json.Json;
import java.util.logging.Level;
/**
* The core of product distribution.
*
* A DefaultNotificationReceiver receives notifications and notifies listeners
* of received notifications. NotificationListeners use the NotificationReceiver
* to retrieve products referenced by notifications.
*
* The NotificationReceiver uses a NotificationIndex to track received
* notifications, and a ProductStorage to store retrieved products.
*
* The DefaultNotificationReceiver implements the Configurable interface and
* uses the following configuration parameters:
*
* Each listener has a separate queue of notifications. Each listener is
* allocated one thread to process notifications from this queue.
*/
public class DefaultNotificationReceiver extends DefaultConfigurable
implements NotificationReceiver, NotificationIndexCleanup.Listener {
/** Logging object. */
private static final Logger LOGGER = Logger.getLogger(DefaultNotificationReceiver.class.getName());
/** Property referencing a notification index config section. */
public static final String NOTIFICATION_INDEX_PROPERTY = "index";
/** Shortcut to create a SQLite JDBCNotificationIndex. */
public static final String INDEX_FILE_PROPERTY = "indexFile";
/** Property referencing a product storage config section. */
public static final String PRODUCT_STORAGE_PROPERTY = "storage";
/** Shortcut to create a FileProductStorage. */
public static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";
/** Property referencing how long to store products in milliseconds. */
public static final String PRODUCT_STORAGE_MAX_AGE_PROPERTY = "storageAge";
/** Default max age to store products, 3600000 milliseconds = 1 hour. */
public static final String DEFAULT_PRODUCT_STORAGE_MAX_AGE = "3600000";
/**
* Property referencing how long to wait until checking for expired
* notifications/products.
*/
public static final String RECEIVER_CLEANUP_PROPERTY = "cleanupInterval";
/**
* Default time between checking for expired notifications/products, 900000
* milliseconds = 15 minutes.
*/
public static final String DEFAULT_RECEIVER_CLEANUP = "900000";
/** Property for connection Timeout */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Default connection timeout. 15 seconds */
public static final String DEFAULT_CONNECT_TIMEOUT = "15000";
/** Property for read timeout */
public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
/** default read timeout. 15 seconds */
public static final String DEFAULT_READ_TIMEOUT = "15000";
/** Property for listener notifier */
public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
/** Property for listener notifier to set to executor */
public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
/** Property to listener notifier to set to future */
public static final String FUTURE_LISTENER_NOTIFIER = "future";
/** Property to listener notifier to set to roundrobin */
public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";
/** The notification index where received notifications are stored. */
private NotificationIndex notificationIndex;
/** The product storage where retrieved products are stored. */
private ProductStorage productStorage;
/** How long to store retrieved product, in milliseconds. */
private Long productStorageMaxAge = 0L;
/** How long to wait until checking for expired notifications/products. */
private Long receiverCleanupInterval = 0L;
/** Timer that schedules receiver cleanup task. */
private Timer receiverCleanupTimer = new Timer();
/** Notification cleanup */
private NotificationIndexCleanup notificationCleanup = null;
private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT);
private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT);
private ListenerNotifier notifier;
/** A lock that is acquired when a product is being retrieved. */
private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();
/** Creates new ExecutorListenerNotifier to var notifier */
public DefaultNotificationReceiver() {
notifier = new ExecutorListenerNotifier(this);
}
/**
* Add a new notification listener.
*
* @param listener the listener to add. When notifications are received, this
* listener will be notified.
* @throws Exception exception
*/
public void addNotificationListener(NotificationListener listener) throws Exception {
notifier.addNotificationListener(listener);
}
/**
* Remove an existing notification listener.
*
* Any currently queued notifications are processed before shutting down.
*
* @param listener the listener to remove. When notifications are receive, this
* listener will no longer be notified.
* @throws Exception exception
*/
public void removeNotificationListener(NotificationListener listener) throws Exception {
notifier.removeNotificationListener(listener);
}
/**
* Store a notification and notify listeners.
*
* Updates the notification index before notifying listeners of the newly
* available product.
*
* @param notification the notification being received.
* @throws Exception if the notificationIndex throws an Exception.
*/
public void receiveNotification(Notification notification) throws Exception {
LOGGER.fine("[" + getName() + "] Notification Received " + notification.getProductId().toString());
if (notification.getExpirationDate().before(new Date())) {
LOGGER.finer("[" + getName() + "] skipping already expired notification for product id="
+ notification.getProductId().toString() + ", expiration=" + notification.getExpirationDate().toString());
} else {
// add notification to index
notificationIndex.addNotification(notification);
if (notification instanceof JsonNotification) {
LOGGER.finer("[" + getName() + "] json notification " + notification.getProductId());
} else if (notification instanceof URLNotification) {
LOGGER.finer(
"[" + getName() + "] notification URL=" + ((URLNotification) notification).getProductURL().toString());
}
notifyListeners(notification);
}
}
/**
* Send a notification to all registered NotificationListeners.
*
* Creates a NotificationEvent, with a reference to this object and calls each
* notificationListeners onNotification method in separate threads.
*
* This method usually returns before registered NotificationListeners have
* completed processing a notification.
*
* @param notification the notification being sent to listeners.
* @throws Exception exception
*/
protected void notifyListeners(final Notification notification) throws Exception {
LOGGER.finest("[" + getName() + "] notifying listeners for product id=" + notification.getProductId().toString());
// queue notification for listeners
NotificationEvent event = new NotificationEvent(this, notification);
notifier.notifyListeners(event);
}
/** @return "Using notifier" */
public String getListenerQueueStatus() {
return "Using notifier";
}
/**
* Search the notification index for expired notifications, removing any that
* are found. When a notification in the index is not a URLNotification, it
* represents a product in storage that will also be removed.
*
* @throws Exception if NotificationIndexCleanup throws an Exception.
*/
public void removeExpiredNotifications() throws Exception {
LOGGER.fine("[" + getName() + "] running receiver cleanup");
// use NotificationIndexCleanup to manage cleanup in separate thread
if (this.notificationCleanup == null) {
this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this);
this.notificationCleanup.startup();
} else {
this.notificationCleanup.wakeUp();
}
}
/**
* Callback from the NotificationIndexCleanup thread.
*
* Checks if Notification refers to a product in storage, which should also be
* removed.
*
* @param notification expired notification about to be removed.
* @throws Exception if error occurs sttempting to removing product
* @see ProductStorage
*/
public void onExpiredNotification(final Notification notification) throws Exception {
if (!(notification instanceof URLNotification)) {
// if it isn't a url notification, it's also in storage
productStorage.removeProduct(notification.getProductId());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("[" + getName() + "] removed expired product from receiver cache "
+ notification.getProductId().toString());
}
}
}
/**
* Retrieve a product by id.
*
* If this product is already in storage, load and return the product.
* Otherwise, search notifications for this product, and download the product
* into storage.
*
* @param id the product to retrieve
* @return the retrieved product, or null if not available.
* @throws Exception exception
*/
public Product retrieveProduct(ProductId id) throws Exception {
Product product = null;
String productIdString = id.toString();
LOGGER.finest("[" + getName() + "] acquiring retrieve lock id=" + productIdString);
retrieveLocks.acquireLock(id);
LOGGER.finest("[" + getName() + "] retrieve lock acquired id=" + productIdString);
try {
if (productStorage.hasProduct(id)) {
try {
LOGGER.finest("[" + getName() + "] storing product id=" + productIdString);
product = productStorage.getProduct(id);
LOGGER.finest("[" + getName() + "] product stored id=" + productIdString);
} catch (Exception e) {
LOGGER.log(Level.FINE, "[" + getName() + "] storage claims hasProduct, but threw exception", e);
product = null;
}
}
if (product == null) {
LOGGER.finer("[" + getName() + "] don't have product yet, searching notifications");
// don't have product yet, search notifications
Iterator<Notification> iter = notificationIndex.findNotifications(id).iterator();
Exception caughtException = null;
while (product == null && iter.hasNext()) {
Notification notification = iter.next();
if (!(notification instanceof URLNotification)) {
// only URL notifications include location info
continue;
}
InputStream in = null;
try {
URL productURL = ((URLNotification) notification).getProductURL();
ProductSource productSource = null;
SizeLimitInputStream sizeIn = null;
final Date beginConnect = new Date();
Date beginDownload = new Date();
if (productURL.getProtocol().equals("data")) {
Product tempProduct = new JsonProduct()
.getProduct(Json.createReader(StreamUtils.getInputStream(productURL)).readObject());
// JSON notification with embedded product
LOGGER.finer("[" + getName() + "] parsed json notification for " + tempProduct.getId().toString());
productSource = new ObjectProductSource(tempProduct);
} else {
// URL notification
LOGGER.finer("[" + getName() + "] notification url " + productURL.toString());
in = StreamUtils.getURLInputStream(productURL, connectTimeout, readTimeout);
beginDownload = new Date();
// use size limit with negative limit to count transfer size
sizeIn = new SizeLimitInputStream(in, -1);
productSource = IOUtil.autoDetectProductSource(sizeIn);
}
Notification storedNotification = storeProductSource(productSource);
final Date endDownload = new Date();
final long downloadTime = endDownload.getTime() - beginDownload.getTime();
LOGGER.fine(
"[" + getName() + "] receiver retrieved product" + " id=" + id.toString() + " (time = "
+ downloadTime + " ms)" + " from "
+ (productURL.getProtocol().equals("data") ? "data url" : productURL.toString()));
LOGGER.finest("[" + getName() + "] after store product, notification=" + storedNotification);
if (productStorage.hasProduct(id)) {
LOGGER.finer("[" + getName() + "] getting product from storage");
product = productStorage.getProduct(id);
LOGGER.finest("[" + getName() + "] after getProduct, product=" + product);
LOGGER.fine("[" + getName() + "] product downloaded from "
+ (productURL.getProtocol().equals("data") ? "data url" : productURL.toString()));
} else {
LOGGER.finer("[" + getName() + "] product not in storage id=" + productIdString);
}
} catch (ProductAlreadyInStorageException productAlreadyInStorageException) {
LOGGER.finer(() -> String.format("[%s] product already in storage id=%s", getName(), productIdString));
product = productStorage.getProduct(id);
continue;
} catch (ContentTypeNotSupportedException | InvalidSignatureException e) {
// Product was unable to be stored
LOGGER.warning(() -> String.format("[%s] exception storing product source, %s", getName(), e.getMessage()));
caughtException = e;
} catch (FileNotFoundException fileNotFoundException) {
LOGGER.warning(() -> String.format("[%s] exception while retrieving product, file not found", getName()));
caughtException = fileNotFoundException;
} catch (Exception e) {
if (e.getCause() instanceof ProductAlreadyInStorageException) {
LOGGER.finer(() -> String.format("[%s] product already in storage id=%s", getName(), productIdString));
product = productStorage.getProduct(id);
} else {
// log any exception that happened while retrieving product
LOGGER.warning(() -> String.format("[%s] exception while retrieving product", getName()));
}
} finally {
StreamUtils.closeStream(in);
}
}
// If product was not set after iterating through all the notifications, throw
// the last exception caught to allow upstream caller to handle appropriately
if (Objects.isNull(product) && !Objects.isNull(caughtException)) {
throw caughtException;
}
}
} finally {
LOGGER.finest("[" + getName() + "] releasing retrieve lock id=" + productIdString);
retrieveLocks.releaseLock(id);
LOGGER.finest("[" + getName() + "] retrieve lock released id=" + productIdString);
}
// return product
return product;
}
/**
* Calls the current <code>ProductStorage.storeProductSource</code> method.
*
* @param source The <code>ProductSource</code> to store.
* @return The <code>ProductId</code> of the product referenced by the given
* <code>ProductSource</code>.
* @throws Exception
* @see gov.usgs.earthquake.distribution.ProductStorage
*/
protected Notification storeProductSource(ProductSource source) throws Exception {
Notification notification = null;
// store product input
ProductId id = productStorage.storeProductSource(source);
// check if stored
if (productStorage.hasProduct(id)) {
// calculate storage expiration date
Date expirationDate = new Date(new Date().getTime() + productStorageMaxAge);
// update notification index
notification = new DefaultNotification(id, expirationDate);
notificationIndex.addNotification(notification);
}
return notification;
}
/**
* Send matching notifications to listener.
*
* Searches the NotificationIndex for matching notifications, and sends a
* NotificationEvent for each notification found.
*
* @param listener the listener to receive a NotificationEvent for each found
* notification.
* @param sources sources to include, or null for all.
* @param types types to include, or null for all.
* @param codes codes to include, or null for all.
* @throws Exception if the notification index or notification listener throw an
* exception.
*/
public void sendNotifications(NotificationListener listener, List<String> sources, List<String> types,
List<String> codes) throws Exception {
List<Notification> notifications = notificationIndex.findNotifications(sources, types, codes);
Iterator<Notification> iter = notifications.iterator();
while (iter.hasNext()) {
listener.onNotification(new NotificationEvent(this, iter.next()));
}
}
public void configure(Config config) throws Exception {
String notificationIndexName = config.getProperty(NOTIFICATION_INDEX_PROPERTY);
String notificationIndexFile = config.getProperty(INDEX_FILE_PROPERTY);
if (notificationIndexName == null && notificationIndexFile == null) {
throw new ConfigurationException("[" + getName() + "] 'index' is a required configuration property");
}
if (notificationIndexName != null) {
LOGGER.config("[" + getName() + "] loading notification index '" + notificationIndexName + "'");
notificationIndex = (NotificationIndex) Config.getConfig().getObject(notificationIndexName);
if (notificationIndex == null) {
throw new ConfigurationException(
"[" + getName() + "] index '" + notificationIndexName + "' is not properly configured");
}
} else {
LOGGER.config("[" + getName() + "] using notification index '" + notificationIndexFile + "'");
notificationIndex = new JDBCNotificationIndex(notificationIndexFile);
}
String productStorageName = config.getProperty(PRODUCT_STORAGE_PROPERTY);
String storageDirectory = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
if (productStorageName == null && storageDirectory == null) {
throw new ConfigurationException("[" + getName() + "] 'storage' is a required configuration property");
}
if (productStorageName != null) {
LOGGER.config("[" + getName() + "] loading product storage '" + productStorageName + "'");
productStorage = (ProductStorage) Config.getConfig().getObject(productStorageName);
if (productStorage == null) {
throw new ConfigurationException(
"[" + getName() + "] storage '" + productStorageName + "' is not properly configured");
}
} else {
LOGGER.config("[" + getName() + "] using storage directory '" + storageDirectory + "'");
productStorage = new FileProductStorage(new File(storageDirectory));
}
productStorageMaxAge = Long.parseLong(config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY,
// previously all lower-case
config.getProperty(PRODUCT_STORAGE_MAX_AGE_PROPERTY.toLowerCase(), DEFAULT_PRODUCT_STORAGE_MAX_AGE)));
LOGGER.config("[" + getName() + "] storage max age " + productStorageMaxAge + " ms");
receiverCleanupInterval = Long.parseLong(config.getProperty(RECEIVER_CLEANUP_PROPERTY, DEFAULT_RECEIVER_CLEANUP));
LOGGER.config("[" + getName() + "] receiver cleanup interval " + receiverCleanupInterval + " ms");
connectTimeout = Integer.parseInt(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
LOGGER.config("[" + getName() + "] receiver connect timeout " + connectTimeout + " ms");
readTimeout = Integer.parseInt(config.getProperty(READ_TIMEOUT_PROPERTY, DEFAULT_READ_TIMEOUT));
LOGGER.config("[" + getName() + "] receiver read timeout " + readTimeout + " ms");
String notifierType = config.getProperty(LISTENER_NOTIFIER_PROPERTY);
if (notifierType != null) {
if (notifierType.equals(EXECUTOR_LISTENER_NOTIFIER)) {
notifier = new ExecutorListenerNotifier(this);
LOGGER.config("[" + getName() + "] using executor listener notifier");
} else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
notifier = new FutureListenerNotifier(this);
} else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
notifier = new RoundRobinListenerNotifier(this);
LOGGER.config("[" + getName() + "] using round-robin listener notifier");
} else {
throw new ConfigurationException("Unknown notifier type " + notifierType);
}
}
}
public void shutdown() throws Exception {
receiverCleanupTimer.cancel();
if (notificationCleanup != null) {
try {
notificationCleanup.shutdown();
notificationCleanup = null;
} catch (Exception ignore) {
}
}
try {
notifier.shutdown();
} catch (Exception ignore) {
}
try {
notificationIndex.shutdown();
} catch (Exception ignore) {
}
try {
productStorage.shutdown();
} catch (Exception ignore) {
}
}
public void startup() throws Exception {
if (productStorage == null) {
throw new ConfigurationException("[" + getName() + "] storage has not been configured properly");
}
if (notificationIndex == null) {
throw new ConfigurationException("[" + getName() + "] index has not been configured properly");
}
productStorage.startup();
notificationIndex.startup();
// only schedule cleanup if interval is non-zero
if (receiverCleanupInterval > 0) {
receiverCleanupTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
removeExpiredNotifications();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] exception during receiver cleanup", e);
}
}
}, 0, receiverCleanupInterval);
}
// do this last since it may start processing
notifier.startup();
// ProductClient already started these listeners...
// Iterator<NotificationListener> iter = notificationListeners.keySet()
// .iterator();
// while (iter.hasNext()) {
// iter.next().startup();
// }
}
/**
* @return the notificationIndex
*/
public NotificationIndex getNotificationIndex() {
return notificationIndex;
}
/**
* @param notificationIndex the notificationIndex to set
*/
public void setNotificationIndex(NotificationIndex notificationIndex) {
this.notificationIndex = notificationIndex;
}
/**
* @return the productStorage
*/
public ProductStorage getProductStorage() {
return productStorage;
}
/**
* @param productStorage the productStorage to set
*/
public void setProductStorage(ProductStorage productStorage) {
this.productStorage = productStorage;
}
/**
* @return the productStorageMaxAge
*/
public Long getProductStorageMaxAge() {
return productStorageMaxAge;
}
/**
* @param productStorageMaxAge the productStorageMaxAge to set
*/
public void setProductStorageMaxAge(Long productStorageMaxAge) {
this.productStorageMaxAge = productStorageMaxAge;
}
/**
* @return the QueueStatus or null if ExecutorListenerNotifier doesn't exist
*/
public Map<String, Integer> getQueueStatus() {
if (notifier instanceof ExecutorListenerNotifier) {
return ((ExecutorListenerNotifier) notifier).getStatus();
}
return null;
}
/**
* Throttle notifier queues
*
* @throws InterruptedException InterruptedException
*/
public void throttleQueues() throws InterruptedException {
if (notifier instanceof ExecutorListenerNotifier) {
((ExecutorListenerNotifier) notifier).throttleQueues();
}
}
/**
* @return receiverCleanupInterval
*/
public Long getReceiverCleanupInterval() {
return receiverCleanupInterval;
}
/**
* @param receiverCleanupInterval the receiverCleanupInterval to set
*/
public void setReceiverCleanupInterval(Long receiverCleanupInterval) {
this.receiverCleanupInterval = receiverCleanupInterval;
}
/**
* @return connectionTimeout
*/
public int getConnectTimeout() {
return connectTimeout;
}
/**
* @param connectTimeout int connectionTimeout to set
*/
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
/**
* @return ListenerNotifier
*/
public ListenerNotifier getNotifier() {
return this.notifier;
}
/**
* @param notifier ListenerNotifier to set
*/
public void setNotifier(final ListenerNotifier notifier) {
this.notifier = notifier;
}
/**
* @return readTimeout
*/
public int getReadTimeout() {
return readTimeout;
}
/**
* @param readTimeout int readTimeout to set
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
}