Package gov.usgs.earthquake.aws
Class AwsProductReceiver
java.lang.Object
gov.usgs.util.DefaultConfigurable
gov.usgs.earthquake.distribution.DefaultNotificationReceiver
gov.usgs.earthquake.aws.AwsProductReceiver
- All Implemented Interfaces:
NotificationIndexCleanup.Listener,NotificationReceiver,WebSocketListener,Configurable,Runnable
public class AwsProductReceiver
extends DefaultNotificationReceiver
implements Runnable, WebSocketListener
Receives notifications from a PDL notification web socket.
After initial connection, ignores broadcasts until catch up process is
complete.
Catch up involves sending a "products_created_after" request with the latest
notification "created" timestamp, and processing products until either the
last product matches the last broadcast or there are no more products after
the latest notification "created" timestamp.
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final StringVariable for anyMessageIntervalMillis stringprotected booleanwhether currently catching up.protected final Objectsync object for catchUp state.protected Threadthread where catch up process runs.protected booleanwhether thread should continue running (shutdown flag)static final StringVariable for createdAfter stringprotected InstantMicrosecond timestamp of last message that has been processedprotected ProductIdProductId of last message that has been processedstatic final StringVariable for catchup age.static final StringVariable for tracking file.static final StringVariable for initialCatchUpAge stringprotected doubleHow far back to check when first connecting.protected Instantlast catch up message sent (for response timeouts)static final LoggerInitialzation of logger.static final StringVariable for pingIntervalMillis stringstatic final StringVariable for pingWait stringprotected JsonNotificationlast broadcast message that has been processed (used for catch up)protected booleanwhether to process broadcast messages (after catching up).static final StringVariable for productsCreatedAfterClient stringstatic final StringVariable for trackingFileName stringstatic final StringVariable for trackingIndex stringstatic final StringVariable for URI stringstatic final Stringstatic final StringDeprecated.static final StringDeprecated.Fields inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT, DEFAULT_PRODUCT_STORAGE_MAX_AGE, DEFAULT_READ_TIMEOUT, DEFAULT_RECEIVER_CLEANUP, EXECUTOR_LISTENER_NOTIFIER, FUTURE_LISTENER_NOTIFIER, INDEX_FILE_PROPERTY, LISTENER_NOTIFIER_PROPERTY, NOTIFICATION_INDEX_PROPERTY, PRODUCT_STORAGE_MAX_AGE_PROPERTY, PRODUCT_STORAGE_PROPERTY, READ_TIMEOUT_PROPERTY, RECEIVER_CLEANUP_PROPERTY, ROUNDROBIN_LISTENER_NOTIFIER, STORAGE_DIRECTORY_PROPERTY -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidProcess configuration settings.Getter for createdAfterGetter for returning previous broadcastGetter for rest clientGetter for trackingFileNamegetURI()Getter for URIbooleanGetter for returning if receiver is processing broadcastsprotected voidonBroadcast(javax.json.JsonObject json) Handle a message with "action"="broadcast".voidonClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason) Called when connection is closed, either because shutdown on this end or closed by server.voidInterface method to be overriden by WebSocket files and AwsProductReceivervoidonJsonMessage(javax.json.JsonObject json) Inspect the received json message and take appropriate action.protected voidonJsonNotification(JsonNotification notification) Process a received notification and update current "created" timestamp.voidMessage handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling.voidonOpen(javax.websocket.Session session) Called when connection is first opened.protected voidonProduct(javax.json.JsonObject json) Handle a message with "action"="product", which is received during catch up.protected voidonProductsCreatedAfter(javax.json.JsonObject json) Handle a message with "action"="products_created_after", which is received during catch up.voidInterface method to be overriden by WebSocket files and AwsProductReceiverjavax.json.JsonObjectReads tracking file.voidrun()Catch up process.protected voidSend a catch-up request to the REST endpoint.voidsetCreatedAfter(Instant createdAfter) Setter for createdAftervoidsetPreviousBroadcast(JsonNotification previousBroadcast) Setter for previous broadcastvoidsetProcessBroadcast(boolean processBroadcast) Setter for processing broadcastsvoidsetRestClient(ProductsCreatedAfterClient restClient) Setter for rest clientvoidsetTrackingFileName(String trackingFileName) Setter for trackingFileNamevoidSetter for URIvoidshutdown()Closes web socketprotected voidNotify running background thread to start catch up process.protected voidStart background thread for catch up process.voidstartup()Reads createdAfter from a tracking file if it exists, then connects to web socket.protected voidNotify running background thread to stop catch up process.protected voidStop background thread for catch up process.voidWrites tracking file.Methods inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
addNotificationListener, getConnectTimeout, getListenerQueueStatus, getNotificationIndex, getNotifier, getProductStorage, getProductStorageMaxAge, getQueueStatus, getReadTimeout, getReceiverCleanupInterval, notifyListeners, onExpiredNotification, receiveNotification, removeExpiredNotifications, removeNotificationListener, retrieveProduct, sendNotifications, setConnectTimeout, setNotificationIndex, setNotifier, setProductStorage, setProductStorageMaxAge, setReadTimeout, setReceiverCleanupInterval, storeProductSource, throttleQueuesMethods inherited from class gov.usgs.util.DefaultConfigurable
getName, setNameMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface gov.usgs.util.Configurable
getName, setName
-
Field Details
-
LOGGER
Initialzation of logger. For us later in file. -
URI_PROPERTY
Variable for URI string- See Also:
-
CREATED_AFTER_PROPERTY
Variable for createdAfter string- See Also:
-
TRACKING_INDEX_PROPERTY
Variable for trackingIndex string- See Also:
-
TRACKING_FILE_NAME_PROPERTY
Variable for trackingFileName string- See Also:
-
INITIAL_CATCHUP_AGE_PROPERTY
Variable for initialCatchUpAge string- See Also:
-
PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY
Variable for productsCreatedAfterClient string- See Also:
-
PING_INTERVAL
Variable for pingIntervalMillis string- See Also:
-
PING_WAIT
Variable for pingWait string- See Also:
-
ANY_MESSAGE_INTERVAL
Variable for anyMessageIntervalMillis string- See Also:
-
DEFAULT_TRACKING_FILE_NAME
Variable for tracking file. Links to data/AwsReceiver.json- See Also:
-
DEFAULT_INITIAL_CATCHUP_AGE
Variable for catchup age. Set to 7.0- See Also:
-
WEBSOCKET_CONNECT_TIMEOUT_PROPERTY
Deprecated.- See Also:
-
WEBSOCKET_CONNECT_RETRIES_PROPERTY
Deprecated.- See Also:
-
WEBSOCKET_CLIENT_PROPERTY
- See Also:
-
createdAfter
Microsecond timestamp of last message that has been processed -
createdAfterId
ProductId of last message that has been processed -
initialCatchUpAge
protected double initialCatchUpAgeHow far back to check when first connecting. -
previousBroadcast
last broadcast message that has been processed (used for catch up) -
processBroadcast
protected boolean processBroadcastwhether to process broadcast messages (after catching up). -
catchUpRunning
protected boolean catchUpRunningwhether currently catching up. -
catchUpSync
sync object for catchUp state. -
catchUpThread
thread where catch up process runs. -
catchUpThreadRunning
protected boolean catchUpThreadRunningwhether thread should continue running (shutdown flag) -
lastCatchUpSent
last catch up message sent (for response timeouts)
-
-
Constructor Details
-
AwsProductReceiver
public AwsProductReceiver()
-
-
Method Details
-
configure
Description copied from class:DefaultConfigurableProcess configuration settings. Called before startup().- Specified by:
configurein interfaceConfigurable- Overrides:
configurein classDefaultNotificationReceiver- Parameters:
config- the Config object with settings.- Throws:
Exception- if configuration exceptions occur.
-
onOpen
Called when connection is first opened. Start catch up process.- Specified by:
onOpenin interfaceWebSocketListener- Parameters:
session- Session to open- Throws:
IOException- IOException
-
onClose
public void onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason) Called when connection is closed, either because shutdown on this end or closed by server.- Specified by:
onClosein interfaceWebSocketListener- Parameters:
session- Session to closecloseReason- Reason for closing session
-
onJsonMessage
Inspect the received json message and take appropriate action.- Parameters:
json- json formatted message- Throws:
Exception- when format error
-
onMessage
Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.- Specified by:
onMessagein interfaceWebSocketListener- Parameters:
message- Message notification - string- Throws:
IOException- IOException
-
onBroadcast
Handle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.- Parameters:
json- JSON Message- Throws:
Exception- Exception
-
onJsonNotification
Process a received notification and update current "created" timestamp.- Parameters:
notification- JSON Notification- Throws:
Exception- Exception
-
onProduct
Handle a message with "action"="product", which is received during catch up.- Parameters:
json- JSON Message- Throws:
Exception- Exception
-
onProductsCreatedAfter
Handle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.- Parameters:
json- JSON Message- Throws:
Exception- Exception
-
run
public void run()Catch up process. Do not run directly, usestartCatchUpThread()andstopCatchUpThread()to start and stop the process. Process waits untilstartCatchUp()is called, and usesDefaultNotificationReceiver.throttleQueues()between sends. -
sendProductsCreatedAfter
Send a catch-up request to the REST endpoint. The server will reply with a REST payload containing zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.- Throws:
IOException- IOException
-
startCatchUp
protected void startCatchUp()Notify running background thread to start catch up process. -
startCatchUpThread
protected void startCatchUpThread()Start background thread for catch up process. -
stopCatchUp
protected void stopCatchUp()Notify running background thread to stop catch up process. -
stopCatchUpThread
protected void stopCatchUpThread()Stop background thread for catch up process. -
onConnectFail
public void onConnectFail()Description copied from interface:WebSocketListenerInterface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onConnectFailin interfaceWebSocketListener
-
onReconnectFail
public void onReconnectFail()Description copied from interface:WebSocketListenerInterface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onReconnectFailin interfaceWebSocketListener
-
startup
Reads createdAfter from a tracking file if it exists, then connects to web socket.- Specified by:
startupin interfaceConfigurable- Overrides:
startupin classDefaultNotificationReceiver- Throws:
Exception- Exception
-
shutdown
Closes web socket- Specified by:
shutdownin interfaceConfigurable- Overrides:
shutdownin classDefaultNotificationReceiver- Throws:
Exception- Exception
-
readTrackingData
Reads tracking file.- Returns:
- JsonObject tracking file
- Throws:
Exception- Exception
-
writeTrackingData
Writes tracking file.- Throws:
Exception- Exception
-
getRestClient
Getter for rest client- Returns:
- ProductsCreatedAfterClient
-
setRestClient
Setter for rest client- Parameters:
restClient- object
-
getURI
Getter for URI- Returns:
- URI
-
setURI
Setter for URI- Parameters:
uri- URI
-
getTrackingFileName
Getter for trackingFileName- Returns:
- name of tracking file
-
setTrackingFileName
Setter for trackingFileName- Parameters:
trackingFileName- trackingFileName
-
getCreatedAfter
Getter for createdAfter- Returns:
- createdAfter
-
setCreatedAfter
Setter for createdAfter- Parameters:
createdAfter- createdAfter
-
getPreviousBroadcast
Getter for returning previous broadcast- Returns:
- JsonNotification
-
setPreviousBroadcast
Setter for previous broadcast- Parameters:
previousBroadcast- the JsonNotification of the most recent broadcast
-
isProcessBroadcast
public boolean isProcessBroadcast()Getter for returning if receiver is processing broadcasts- Returns:
- JsonNotification
-
setProcessBroadcast
public void setProcessBroadcast(boolean processBroadcast) Setter for processing broadcasts
-