AwsProductReceiver.java
package gov.usgs.earthquake.aws;
import java.io.IOException;
import java.io.StringReader;
import java.net.http.HttpClient;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Objects;
import java.util.Optional;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.naming.ConfigurationException;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.earthquake.distribution.WebSocketClient;
import gov.usgs.earthquake.distribution.WebSocketListener;
import gov.usgs.earthquake.product.InvalidProductIdException;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
/**
* Receives notifications from a PDL notification web socket.
*
* After initial connection, ignores broadcasts until catch up process is
* complete.
*
* Catch up involves sending a "products_created_after" request with the latest
* notification "created" timestamp, and processing products until either the
* last product matches the last broadcast or there are no more products after
* the latest notification "created" timestamp.
*/
public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {
/** Initialzation of logger. For us later in file. */
public static final Logger LOGGER = Logger.getLogger(AwsProductReceiver.class.getName());
/** Variable for URI string */
public static final String URI_PROPERTY = "url";
/** Variable for createdAfter string */
public static final String CREATED_AFTER_PROPERTY = "createdAfter";
/** Variable for trackingIndex string */
public static final String TRACKING_INDEX_PROPERTY = "trackingIndex";
/** Variable for trackingFileName string */
public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
/** Variable for connectAttempts string */
public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
/** Variable for connectTimeout string */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Variable for initialCatchUpAge string */
public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";
/** Variable for productsCreatedAfterClient string */
public static final String PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY = "productsCreatedAfterClient";
/** Variable for pingIntervalMillis string */
public static final String PING_INTERVAL = "pingInterval";
/** Variable for pingWait string */
public static final String PING_WAIT = "pingWait";
/** Variable for anyMessageIntervalMillis string */
public static final String ANY_MESSAGE_INTERVAL = "anyMessageInterval";
/** Variable for tracking file. Links to data/AwsReceiver.json */
public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
/** Variable for connect attempts. Set to 5 */
public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
/** Variable for timeout. Set to 1000 */
public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
/** Variable for catchup age. Set to 7.0 */
public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";
/** Default for time in between pings to server */
public static final String DEFAULT_PING_INTERVAL_MILLIS = "15000";
/**
* Default for how long to wait for pong response to ping before closing or
* restarting connection
*/
public static final String DEFAULT_PING_WAIT_MILLIS = "4000";
/** Default for interval for any message */
public static final String DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = "0";
private URI uri = null;
private String trackingFileName;
private int attempts;
private long timeout;
private long pingIntervalMillis;
private long pingWaitMillis;
private long anyMessageIntervalMillis;
private TrackingIndex trackingIndex;
private WebSocketClient client;
private ProductsCreatedAfterClient restClient;
/** Microsecond timestamp of last message that has been processed */
protected Instant createdAfter = null;
/** How far back to check when first connecting. */
protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);
/** last broadcast message that has been processed (used for catch up) */
protected JsonNotification previousBroadcast = null;
/** whether to process broadcast messages (after catching up). */
protected boolean processBroadcast = false;
/** whether currenting catching up. */
protected boolean catchUpRunning = false;
/** sync object for catchUp state. */
protected final Object catchUpSync = new Object();
/** thread where catch up process runs. */
protected Thread catchUpThread = null;
/** whether thread should continue running (shutdown flag) */
protected boolean catchUpThreadRunning = false;
/** last catch up message sent (for response timeouts) */
protected Instant lastCatchUpSent = null;
@Override
public void configure(Config config) throws Exception {
super.configure(config);
uri = new URI(config.getProperty(URI_PROPERTY));
attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
initialCatchUpAge = Double.valueOf(config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));
pingIntervalMillis = Long.parseLong(config.getProperty(PING_INTERVAL, DEFAULT_PING_INTERVAL_MILLIS));
pingWaitMillis = Long.parseLong(config.getProperty(PING_WAIT, DEFAULT_PING_WAIT_MILLIS));
anyMessageIntervalMillis = Long
.parseLong(config.getProperty(ANY_MESSAGE_INTERVAL, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS));
String restClientName = config.getProperty(PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY);
if (restClientName != null) {
this.restClient = (ProductsCreatedAfterClient) Config.getConfig().getObject(restClientName);
} else {
throw new ConfigurationException(
"[" + this.getName() + "] invalid or missing ProductsCreatedAfterClient configuration");
}
final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
if (trackingIndexName != null) {
LOGGER.config("[" + getName() + "] loading tracking index " + trackingIndexName);
try {
// read object from global config
trackingIndex = (TrackingIndex) Config.getConfig().getObject(trackingIndexName);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] error loading tracking index " + trackingIndexName, e);
}
} else {
trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY);
if (trackingFileName != null) {
LOGGER.config("[" + getName() + "] creating tracking index at" + trackingFileName);
trackingIndex = new TrackingIndex(TrackingIndex.DEFAULT_DRIVER, "jdbc:sqlite:" + trackingFileName);
}
}
}
/**
* Called when connection is first opened.
*
* Start catch up process.
*/
@Override
public void onOpen(Session session) throws IOException {
LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId() + " " + Instant.now().toString());
// start catch up process
LOGGER.info("[" + getName() + "] Starting catch up");
// ignore broadcast until caught up
this.setProcessBroadcast(false);
startCatchUp();
}
/**
* Called when connection is closed, either because shutdown on this end or
* closed by server.
*/
@Override
public void onClose(Session session, CloseReason closeReason) {
LOGGER.info("[" + getName() + "] onClose " + closeReason.toString() + " " + Instant.now().toString());
// cannot catch up when not connected, restart in onOpen
stopCatchUp();
}
/**
* Inspect the received json message and take appropriate action.
*
* @param json json formatted message
* @throws Exception when format error
*/
synchronized public void onJsonMessage(JsonObject json) throws Exception {
final String action = json.getString("action");
if ("broadcast".equals(action)) {
onBroadcast(json);
} else if ("product".equals(action)) {
onProduct(json);
} else if ("products_created_after".equals(action)) {
onProductsCreatedAfter(json);
}
}
/**
* Message handler function passed to WebSocketClient
*
* Parses the message as JSON, and checks "action" property to route message for
* handling.
*
* Synchronized to process messages in order, since onProductsCreatedAfter
* compares state of latest product to determine whether caught up and if
* broadcasts should be processed.
*
* @param message Message notification - string
*/
@Override
synchronized public void onMessage(String message) throws IOException {
try (final JsonReader reader = Json.createReader(new StringReader(message))) {
// parse message
final JsonObject json = reader.readObject();
onJsonMessage(json);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing message '" + message + "'", e);
throw new IOException(e);
}
}
/**
* Handle a message with "action"="broadcast".
*
* If caught up process notification as usual, otherwise save notification to
* help detect when caught up.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(json.getJsonObject("notification"));
Optional<ProductId> broadcastedPreviousProductId = Optional.empty();
try {
broadcastedPreviousProductId = Optional.of(ProductId.fromJson(json.getJsonObject("previousProductId")));
} catch (InvalidProductIdException ignored) {
}
ProductId broadcastedProductId = notification.getProductId();
LOGGER.finer("[" + getName() + "]" + " onBroadcast(" + notification.getProductId() + ")" + " sequence="
+ broadcastedProductId + ", previousBroadcast=" + this.previousBroadcast);
boolean isBroadcastedInOrder = broadcastedPreviousProductId.isEmpty()
|| (broadcastedPreviousProductId.isPresent()
&& Objects.nonNull(previousBroadcast)
&& broadcastedPreviousProductId.get().equals(this.previousBroadcast.getProductId()));
if (this.isProcessBroadcast() && !isBroadcastedInOrder) {
// may have missed message
LOGGER.info(
"[" + getName() + "] broadcast ids out of sequence" + " (at " + broadcastedPreviousProductId.get()
+ ", received "
+ broadcastedProductId + ")" + ", switching to catch up mode");
this.setProcessBroadcast(false);
startCatchUp();
}
previousBroadcast = notification;
// process message if not in catch up mode
if (this.isProcessBroadcast()) {
onJsonNotification(notification);
}
}
/**
* Process a received notification and update current "created" timestamp.
*
* @param notification JSON Notification
* @throws Exception Exception
*/
protected void onJsonNotification(final JsonNotification notification) throws Exception {
// receive and notify listeners
receiveNotification(notification);
// update tracking file
this.createdAfter = notification.created;
writeTrackingData();
// send heartbeat
HeartbeatListener.sendHeartbeatMessage(getName(), "createdAfter", createdAfter.toString());
}
/**
* Handle a message with "action"="product", which is received during catch up.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onProduct(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(json.getJsonObject("notification"));
LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
onJsonNotification(notification);
}
/**
* Handle a message with "action"="products_created_after", which is received
* during catch up.
*
* Indicates the end of a response from a "products_created_after" request.
* Check whether caught up, and either switch to broadcast mode or continue
* catch up process.
*
* @param json JSON Message
* @throws Exception Exception
*/
protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
final String after = json.getString("created_after");
final int count = json.getInt("count");
LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after + ", " + count + " products)");
// notify background thread that a response was received,
// as well as pausing messages until restarted below (if needed)
stopCatchUp();
// check whether caught up
if (
// if a broadcast received during catchup,
(previousBroadcast != null &&
// and createdAfter is at or after last broadcast
createdAfter.compareTo(previousBroadcast.created) >= 0)
// or no additional products returned
|| count == 0) {
// caught up
LOGGER.info("[" + getName() + "] Caught up, switching to broadcast");
this.setProcessBroadcast(true);
} else {
// keep catching up
startCatchUp();
}
}
/**
* Catch up process.
*
* Do not run directly, use {@link #startCatchUpThread()} and
* {@link #stopCatchUpThread()} to start and stop the process.
*
* Process waits until {@link #startCatchUp()} is called, and uses
* {@link #throttleQueues()} between sends.
*/
@Override
public void run() {
while (catchUpThreadRunning) {
try {
synchronized (catchUpSync) {
if (!catchUpRunning) {
catchUpSync.wait();
continue;
}
if (lastCatchUpSent != null) {
// message already sent, wait for timeout
Instant now = Instant.now();
Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
if (now.isBefore(timeout)) {
catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
continue;
} else {
// timed out
LOGGER.warning("No products_created_after response" + ", sent at " + lastCatchUpSent.toString());
// fall through
}
}
}
// ready to send, but block until done throttling
throttleQueues();
try {
synchronized (catchUpSync) {
// connection may have closed while throttling
if (!catchUpRunning) {
continue;
}
sendProductsCreatedAfter();
// track when sent
lastCatchUpSent = Instant.now();
}
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
if (catchUpThreadRunning && catchUpRunning) {
// wait before next attempt
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
// probably stopping
}
}
}
} catch (InterruptedException e) {
// probably stopping
}
}
}
/**
* Send a catch-up request to the REST endpoint.
*
* The server will reply with a REST payload containing zero or more
* "action"="product" messages, and then one "action"="products_created_after"
* message to indicate the request is complete.
*
* @throws IOException IOException
*/
protected void sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(Math.round(initialCatchUpAge * 86400));
}
try {
this.restClient.send(this.createdAfter);
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[" + this.getName() + "] failed to send products_created_after", ex);
throw new IOException(ex);
}
}
/**
* Notify running background thread to start catch up process.
*/
protected void startCatchUp() {
// notify background thread to start catch up
synchronized (catchUpSync) {
catchUpRunning = true;
// clear sent time
lastCatchUpSent = null;
catchUpSync.notify();
}
}
/**
* Start background thread for catch up process.
*/
protected void startCatchUpThread() {
if (catchUpThread != null) {
throw new IllegalStateException("catchUp thread already started");
}
synchronized (catchUpSync) {
catchUpThreadRunning = true;
catchUpThread = new Thread(this);
}
catchUpThread.start();
}
/**
* Notify running background thread to stop catch up process.
*/
protected void stopCatchUp() {
synchronized (catchUpSync) {
// stop catch up
catchUpRunning = false;
// clear sent time
lastCatchUpSent = null;
catchUpSync.notify();
}
}
/**
* Stop background thread for catch up process.
*/
protected void stopCatchUpThread() {
if (catchUpThread == null) {
return;
}
// stop catch up thread
try {
synchronized (catchUpSync) {
// orderly shutdown
catchUpThreadRunning = false;
catchUpSync.notify();
}
// interrupt just in case
catchUpThread.interrupt();
catchUpThread.join();
} catch (Exception e) {
LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
} finally {
catchUpThread = null;
}
}
@Override
public void onConnectFail() {
// client failed to connect
LOGGER.info("[" + getName() + "] onConnectFail " + Instant.now().toString());
}
@Override
public void onReconnectFail() {
// failed to reconnect after close
LOGGER.info("[" + getName() + "] onReconnectFail " + Instant.now().toString());
}
/**
* Reads createdAfter from a tracking file if it exists, then connects to web
* socket.
*
* @throws Exception Exception
*/
@Override
public void startup() throws Exception {
super.startup();
if (trackingIndex == null) {
trackingIndex = new TrackingIndex();
}
trackingIndex.startup();
// read sequence from tracking file if other parameters agree
JsonObject json = readTrackingData();
if (json != null && json.getString(URI_PROPERTY).equals(uri.toString())) {
createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
}
// Listen for messages from REST client
this.restClient.addReceiver(this);
this.restClient.startup();
// open websocket
client = new WebSocketClient(uri, this, attempts, timeout, true, pingIntervalMillis, pingWaitMillis,
anyMessageIntervalMillis);
// start catch up process
startCatchUpThread();
}
/**
* Closes web socket
*
* @throws Exception Exception
*/
@Override
public void shutdown() throws Exception {
// stop catch up process
stopCatchUpThread();
// close socket
try {
client.shutdown();
} catch (Exception e) {
}
// End REST client
try {
this.restClient.removeReceiver(this);
this.restClient.shutdown();
} catch (Exception ex) {
}
super.shutdown();
}
/**
* Reads tracking file.
*
* @return JsonObject tracking file
* @throws Exception Exception
*/
public JsonObject readTrackingData() throws Exception {
// use name as key
return trackingIndex.getTrackingData(getName());
}
/**
* Writes tracking file.
*
* @throws Exception Exception
*/
public void writeTrackingData() throws Exception {
JsonObject json = Json.createObjectBuilder().add(URI_PROPERTY, uri.toString())
.add(CREATED_AFTER_PROPERTY, createdAfter.toString()).build();
// use name as key
trackingIndex.setTrackingData(getName(), json);
}
/**
* Getter for rest client
*
* @return ProductsCreatedAfterClient
*/
public ProductsCreatedAfterClient getRestClient() {
return this.restClient;
}
/**
* Setter for rest client
*
* @param restClient object
*/
public void setRestClient(ProductsCreatedAfterClient restClient) {
this.restClient = restClient;
}
/**
* Getter for URI
*
* @return URI
*/
public URI getURI() {
return uri;
}
/**
* Setter for URI
*
* @param uri URI
*/
public void setURI(final URI uri) {
this.uri = uri;
}
/**
* Getter for trackingFileName
*
* @return name of tracking file
*/
public String getTrackingFileName() {
return trackingFileName;
}
/**
* Setter for trackingFileName
*
* @param trackingFileName trackingFileName
*/
public void setTrackingFileName(final String trackingFileName) {
this.trackingFileName = trackingFileName;
}
/**
* Getter for createdAfter
*
* @return createdAfter
*/
public Instant getCreatedAfter() {
return createdAfter;
}
/**
* Setter for createdAfter
*
* @param createdAfter createdAfter
*/
public void setCreatedAfter(final Instant createdAfter) {
this.createdAfter = createdAfter;
}
/**
* Getter for attempts
*
* @return attempts
*/
public int getAttempts() {
return attempts;
}
/**
* Setter for attempts
*
* @param attempts attempts
*/
public void setAttempts(final int attempts) {
this.attempts = attempts;
}
/**
* Getter for timeout
*
* @return timeout
*/
public long getTimeout() {
return timeout;
}
/**
* Setter for timeout
*
* @param timeout long timeout
*/
public void setTimeout(final long timeout) {
this.timeout = timeout;
}
/**
* Getter for returning previous broadcast
*
* @return JsonNotification
*/
public JsonNotification getPreviousBroadcast() {
return previousBroadcast;
}
/**
* Setter for previous broadcast
*
* @param previousBroadcast the JsonNotification of the most recent broadcast
*/
public void setPreviousBroadcast(JsonNotification previousBroadcast) {
this.previousBroadcast = previousBroadcast;
}
/**
* Getter for returning if receiver is processing broadcasts
*
* @return JsonNotification
*/
public boolean isProcessBroadcast() {
return processBroadcast;
}
/**
* Setter for processing broadcasts
*
*/
public void setProcessBroadcast(boolean processBroadcast) {
this.processBroadcast = processBroadcast;
}
}