Package gov.usgs.earthquake.aws
Class AwsProductReceiver
java.lang.Object
gov.usgs.util.DefaultConfigurable
gov.usgs.earthquake.distribution.DefaultNotificationReceiver
gov.usgs.earthquake.aws.AwsProductReceiver
- All Implemented Interfaces:
NotificationIndexCleanup.Listener
,NotificationReceiver
,WebSocketListener
,Configurable
,Runnable
public class AwsProductReceiver
extends DefaultNotificationReceiver
implements Runnable, WebSocketListener
Receives notifications from a PDL notification web socket.
After initial connection, ignores broadcasts until catch up process is
complete.
Catch up involves sending a "products_created_after" request with the latest
notification "created" timestamp, and processing products until either the
last product matches the last broadcast or there are no more products after
the latest notification "created" timestamp.
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final String
Variable for anyMessageIntervalMillis stringprotected boolean
whether currenting catching up.protected final Object
sync object for catchUp state.protected Thread
thread where catch up process runs.protected boolean
whether thread should continue running (shutdown flag)static final String
Variable for connectAttempts stringstatic final String
Variable for connectTimeout stringstatic final String
Variable for createdAfter stringprotected Instant
Microsecond timestamp of last message that has been processedstatic final String
Default for interval for any messagestatic final String
Variable for connect attempts.static final String
Variable for timeout.static final String
Variable for catchup age.static final String
Default for time in between pings to serverstatic final String
Default for how long to wait for pong response to ping before closing or restarting connectionstatic final String
Variable for tracking file.static final String
Variable for initialCatchUpAge stringprotected double
How far back to check when first connecting.protected Instant
last catch up message sent (for response timeouts)static final Logger
Initialzation of logger.static final String
Variable for pingIntervalMillis stringstatic final String
Variable for pingWait stringprotected JsonNotification
last broadcast message that has been processed (used for catch up)protected boolean
whether to process broadcast messages (after catching up).static final String
Variable for productsCreatedAfterClient stringstatic final String
Variable for trackingFileName stringstatic final String
Variable for trackingIndex stringstatic final String
Variable for URI stringFields inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
DEFAULT_PRODUCT_STORAGE_MAX_AGE, DEFAULT_READ_TIMEOUT, DEFAULT_RECEIVER_CLEANUP, EXECUTOR_LISTENER_NOTIFIER, FUTURE_LISTENER_NOTIFIER, INDEX_FILE_PROPERTY, LISTENER_NOTIFIER_PROPERTY, NOTIFICATION_INDEX_PROPERTY, PRODUCT_STORAGE_MAX_AGE_PROPERTY, PRODUCT_STORAGE_PROPERTY, READ_TIMEOUT_PROPERTY, RECEIVER_CLEANUP_PROPERTY, ROUNDROBIN_LISTENER_NOTIFIER, STORAGE_DIRECTORY_PROPERTY
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
Process configuration settings.int
Getter for attemptsGetter for createdAfterGetter for returning previous broadcastGetter for rest clientlong
Getter for timeoutGetter for trackingFileNamegetURI()
Getter for URIboolean
Getter for returning if receiver is processing broadcastsprotected void
onBroadcast
(javax.json.JsonObject json) Handle a message with "action"="broadcast".void
onClose
(javax.websocket.Session session, javax.websocket.CloseReason closeReason) Called when connection is closed, either because shutdown on this end or closed by server.void
Interface method to be overriden by WebSocket files and AwsProductReceivervoid
onJsonMessage
(javax.json.JsonObject json) Inspect the received json message and take appropriate action.protected void
onJsonNotification
(JsonNotification notification) Process a received notification and update current "created" timestamp.void
Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling.void
onOpen
(javax.websocket.Session session) Called when connection is first opened.protected void
onProduct
(javax.json.JsonObject json) Handle a message with "action"="product", which is received during catch up.protected void
onProductsCreatedAfter
(javax.json.JsonObject json) Handle a message with "action"="products_created_after", which is received during catch up.void
Interface method to be overriden by WebSocket files and AwsProductReceiverjavax.json.JsonObject
Reads tracking file.void
run()
Catch up process.protected void
Send a catch-up request to the REST endpoint.void
setAttempts
(int attempts) Setter for attemptsvoid
setCreatedAfter
(Instant createdAfter) Setter for createdAftervoid
setPreviousBroadcast
(JsonNotification previousBroadcast) Setter for previous broadcastvoid
setProcessBroadcast
(boolean processBroadcast) Setter for processing broadcastsvoid
setRestClient
(ProductsCreatedAfterClient restClient) Setter for rest clientvoid
setTimeout
(long timeout) Setter for timeoutvoid
setTrackingFileName
(String trackingFileName) Setter for trackingFileNamevoid
Setter for URIvoid
shutdown()
Closes web socketprotected void
Notify running background thread to start catch up process.protected void
Start background thread for catch up process.void
startup()
Reads createdAfter from a tracking file if it exists, then connects to web socket.protected void
Notify running background thread to stop catch up process.protected void
Stop background thread for catch up process.void
Writes tracking file.Methods inherited from class gov.usgs.earthquake.distribution.DefaultNotificationReceiver
addNotificationListener, getConnectTimeout, getListenerQueueStatus, getNotificationIndex, getNotifier, getProductStorage, getProductStorageMaxAge, getQueueStatus, getReadTimeout, getReceiverCleanupInterval, notifyListeners, onExpiredNotification, receiveNotification, removeExpiredNotifications, removeNotificationListener, retrieveProduct, sendNotifications, setConnectTimeout, setNotificationIndex, setNotifier, setProductStorage, setProductStorageMaxAge, setReadTimeout, setReceiverCleanupInterval, storeProductSource, throttleQueues
Methods inherited from class gov.usgs.util.DefaultConfigurable
getName, setName
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface gov.usgs.util.Configurable
getName, setName
-
Field Details
-
LOGGER
Initialzation of logger. For us later in file. -
URI_PROPERTY
Variable for URI string- See Also:
-
CREATED_AFTER_PROPERTY
Variable for createdAfter string- See Also:
-
TRACKING_INDEX_PROPERTY
Variable for trackingIndex string- See Also:
-
TRACKING_FILE_NAME_PROPERTY
Variable for trackingFileName string- See Also:
-
CONNECT_ATTEMPTS_PROPERTY
Variable for connectAttempts string- See Also:
-
CONNECT_TIMEOUT_PROPERTY
Variable for connectTimeout string- See Also:
-
INITIAL_CATCHUP_AGE_PROPERTY
Variable for initialCatchUpAge string- See Also:
-
PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY
Variable for productsCreatedAfterClient string- See Also:
-
PING_INTERVAL
Variable for pingIntervalMillis string- See Also:
-
PING_WAIT
Variable for pingWait string- See Also:
-
ANY_MESSAGE_INTERVAL
Variable for anyMessageIntervalMillis string- See Also:
-
DEFAULT_TRACKING_FILE_NAME
Variable for tracking file. Links to data/AwsReceiver.json- See Also:
-
DEFAULT_CONNECT_ATTEMPTS
Variable for connect attempts. Set to 5- See Also:
-
DEFAULT_CONNECT_TIMEOUT
Variable for timeout. Set to 1000- See Also:
-
DEFAULT_INITIAL_CATCHUP_AGE
Variable for catchup age. Set to 7.0- See Also:
-
DEFAULT_PING_INTERVAL_MILLIS
Default for time in between pings to server- See Also:
-
DEFAULT_PING_WAIT_MILLIS
Default for how long to wait for pong response to ping before closing or restarting connection- See Also:
-
DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS
Default for interval for any message- See Also:
-
createdAfter
Microsecond timestamp of last message that has been processed -
initialCatchUpAge
protected double initialCatchUpAgeHow far back to check when first connecting. -
previousBroadcast
last broadcast message that has been processed (used for catch up) -
processBroadcast
protected boolean processBroadcastwhether to process broadcast messages (after catching up). -
catchUpRunning
protected boolean catchUpRunningwhether currenting catching up. -
catchUpSync
sync object for catchUp state. -
catchUpThread
thread where catch up process runs. -
catchUpThreadRunning
protected boolean catchUpThreadRunningwhether thread should continue running (shutdown flag) -
lastCatchUpSent
last catch up message sent (for response timeouts)
-
-
Constructor Details
-
AwsProductReceiver
public AwsProductReceiver()
-
-
Method Details
-
configure
Description copied from class:DefaultConfigurable
Process configuration settings. Called before startup().- Specified by:
configure
in interfaceConfigurable
- Overrides:
configure
in classDefaultNotificationReceiver
- Parameters:
config
- the Config object with settings.- Throws:
Exception
- if configuration exceptions occur.
-
onOpen
Called when connection is first opened. Start catch up process.- Specified by:
onOpen
in interfaceWebSocketListener
- Parameters:
session
- Session to open- Throws:
IOException
- IOException
-
onClose
public void onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason) Called when connection is closed, either because shutdown on this end or closed by server.- Specified by:
onClose
in interfaceWebSocketListener
- Parameters:
session
- Session to closecloseReason
- Reason for closing session
-
onJsonMessage
Inspect the received json message and take appropriate action.- Parameters:
json
- json formatted message- Throws:
Exception
- when format error
-
onMessage
Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.- Specified by:
onMessage
in interfaceWebSocketListener
- Parameters:
message
- Message notification - string- Throws:
IOException
- IOException
-
onBroadcast
Handle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.- Parameters:
json
- JSON Message- Throws:
Exception
- Exception
-
onJsonNotification
Process a received notification and update current "created" timestamp.- Parameters:
notification
- JSON Notification- Throws:
Exception
- Exception
-
onProduct
Handle a message with "action"="product", which is received during catch up.- Parameters:
json
- JSON Message- Throws:
Exception
- Exception
-
onProductsCreatedAfter
Handle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.- Parameters:
json
- JSON Message- Throws:
Exception
- Exception
-
run
public void run()Catch up process. Do not run directly, usestartCatchUpThread()
andstopCatchUpThread()
to start and stop the process. Process waits untilstartCatchUp()
is called, and usesDefaultNotificationReceiver.throttleQueues()
between sends. -
sendProductsCreatedAfter
Send a catch-up request to the REST endpoint. The server will reply with a REST payload containing zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.- Throws:
IOException
- IOException
-
startCatchUp
protected void startCatchUp()Notify running background thread to start catch up process. -
startCatchUpThread
protected void startCatchUpThread()Start background thread for catch up process. -
stopCatchUp
protected void stopCatchUp()Notify running background thread to stop catch up process. -
stopCatchUpThread
protected void stopCatchUpThread()Stop background thread for catch up process. -
onConnectFail
public void onConnectFail()Description copied from interface:WebSocketListener
Interface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onConnectFail
in interfaceWebSocketListener
-
onReconnectFail
public void onReconnectFail()Description copied from interface:WebSocketListener
Interface method to be overriden by WebSocket files and AwsProductReceiver- Specified by:
onReconnectFail
in interfaceWebSocketListener
-
startup
Reads createdAfter from a tracking file if it exists, then connects to web socket.- Specified by:
startup
in interfaceConfigurable
- Overrides:
startup
in classDefaultNotificationReceiver
- Throws:
Exception
- Exception
-
shutdown
Closes web socket- Specified by:
shutdown
in interfaceConfigurable
- Overrides:
shutdown
in classDefaultNotificationReceiver
- Throws:
Exception
- Exception
-
readTrackingData
Reads tracking file.- Returns:
- JsonObject tracking file
- Throws:
Exception
- Exception
-
writeTrackingData
Writes tracking file.- Throws:
Exception
- Exception
-
getRestClient
Getter for rest client- Returns:
- ProductsCreatedAfterClient
-
setRestClient
Setter for rest client- Parameters:
restClient
- object
-
getURI
Getter for URI- Returns:
- URI
-
setURI
Setter for URI- Parameters:
uri
- URI
-
getTrackingFileName
Getter for trackingFileName- Returns:
- name of tracking file
-
setTrackingFileName
Setter for trackingFileName- Parameters:
trackingFileName
- trackingFileName
-
getCreatedAfter
Getter for createdAfter- Returns:
- createdAfter
-
setCreatedAfter
Setter for createdAfter- Parameters:
createdAfter
- createdAfter
-
getAttempts
public int getAttempts()Getter for attempts- Returns:
- attempts
-
setAttempts
public void setAttempts(int attempts) Setter for attempts- Parameters:
attempts
- attempts
-
getTimeout
public long getTimeout()Getter for timeout- Returns:
- timeout
-
setTimeout
public void setTimeout(long timeout) Setter for timeout- Parameters:
timeout
- long timeout
-
getPreviousBroadcast
Getter for returning previous broadcast- Returns:
- JsonNotification
-
setPreviousBroadcast
Setter for previous broadcast- Parameters:
previousBroadcast
- the JsonNotification of the most recent broadcast
-
isProcessBroadcast
public boolean isProcessBroadcast()Getter for returning if receiver is processing broadcasts- Returns:
- JsonNotification
-
setProcessBroadcast
public void setProcessBroadcast(boolean processBroadcast) Setter for processing broadcasts
-