NotificationIndexCleanup.java
package gov.usgs.earthquake.distribution;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* NotificationIndexCleanup manages cleaning up expired notifications.
*
* Uses background thread to remove expired notifications while they exist, then
* uses wait/notify to pause until shutdown() or wakeUp() methods are called.
*
* NOTE: this class does not schedule periodic cleanup, and the wakeUp() method
* must be called periodically.
*/
public class NotificationIndexCleanup implements Runnable {
/** Static Logger to log across all instances */
private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName());
/** Notification Index */
public final NotificationIndex index;
/** listener that can take additional actions during cleanup */
public final Listener listener;
/** object used to synchronize state access between threads */
public final Object syncObject = new Object();
/** thread where cleanup loop runs */
public Thread cleanupThread = null;
/** whether thread should stop running */
private boolean stopThread = false;
/**
* Constructor
*
* @param index notificaiton index
* @param listener the listener
*/
public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) {
this.index = index;
this.listener = listener;
}
/**
* Notification cleanup thread loop.
*
* This method blocks and should probably not be called by you.
*/
public void run() {
final String indexName = this.index.getName();
LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting");
// run until thread stopped
while (!stopThread) {
List<Notification> expiredNotifications = null;
synchronized (syncObject) {
try {
expiredNotifications = this.index.findExpiredNotifications();
} catch (Exception e) {
LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications");
}
if (expiredNotifications == null || expiredNotifications.size() == 0) {
// Wait for expired notifications to process
try {
syncObject.wait();
} catch (InterruptedException ignore) {
// signal from another thread (stopThread checked above)
continue;
}
}
}
// remove batch of expired notifications
final List<Notification> removed = new ArrayList<>(expiredNotifications.size());
if (this.listener == null) {
removed.addAll(expiredNotifications);
} else {
// notify listener, remove only those successfully processed by listener
for (final Notification expired : expiredNotifications) {
synchronized (syncObject) {
if (stopThread) {
break;
}
}
try {
this.listener.onExpiredNotification(expired);
removed.add(expired);
} catch (Exception e) {
LOGGER.log(Level.WARNING, e,
() -> "[" + indexName + "] Listener exception processing expired notification");
}
}
}
try {
// remove in batch
this.index.removeNotifications(removed);
LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications");
} catch (Exception e) {
LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications");
}
}
LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting");
this.cleanupThread = null;
}
/**
* Start cleanup process.
*
* @throws Exception if clean up is already in progress
*/
public void startup() throws Exception {
synchronized (syncObject) {
if (this.cleanupThread != null) {
throw new IllegalStateException("Already started");
}
// start thread
stopThread = false;
this.cleanupThread = new Thread(this);
}
this.cleanupThread.start();
}
/**
* Stop cleanup process.
*
* @throws Exception if already stopped
*/
public void shutdown() throws Exception {
synchronized (syncObject) {
if (this.cleanupThread == null) {
throw new IllegalStateException("Already stopped");
}
// stop thread
stopThread = true;
this.cleanupThread.interrupt();
}
this.cleanupThread.join();
}
/**
* Wake up the background thread if it is waiting.
*/
public void wakeUp() {
synchronized (syncObject) {
syncObject.notify();
}
}
/**
* Interface for cleanup listeners to take additional steps before a
* notification is removed.
*/
public static interface Listener {
/**
* Interface specialization method to take additional steps when a notification
* is removed
*
* @param expired input of expired notification
* @throws Exception if error occurred
*/
public void onExpiredNotification(final Notification expired) throws Exception;
}
}