Class AwsProductReceiver

All Implemented Interfaces:
NotificationIndexCleanup.Listener, NotificationReceiver, WebSocketListener, Configurable, Runnable

public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener
Receives notifications from a PDL notification web socket. After initial connection, ignores broadcasts until catch up process is complete. Catch up involves sending a "products_created_after" request with the latest notification "created" timestamp, and processing products until either the last product matches the last broadcast or there are no more products after the latest notification "created" timestamp.
  • Field Details

    • LOGGER

      public static final Logger LOGGER
      Initialzation of logger. For us later in file.
    • URI_PROPERTY

      public static final String URI_PROPERTY
      Variable for URI string
      See Also:
    • CREATED_AFTER_PROPERTY

      public static final String CREATED_AFTER_PROPERTY
      Variable for createdAfter string
      See Also:
    • TRACKING_INDEX_PROPERTY

      public static final String TRACKING_INDEX_PROPERTY
      Variable for trackingIndex string
      See Also:
    • TRACKING_FILE_NAME_PROPERTY

      public static final String TRACKING_FILE_NAME_PROPERTY
      Variable for trackingFileName string
      See Also:
    • CONNECT_ATTEMPTS_PROPERTY

      public static final String CONNECT_ATTEMPTS_PROPERTY
      Variable for connectAttempts string
      See Also:
    • CONNECT_TIMEOUT_PROPERTY

      public static final String CONNECT_TIMEOUT_PROPERTY
      Variable for connectTimeout string
      See Also:
    • INITIAL_CATCHUP_AGE_PROPERTY

      public static final String INITIAL_CATCHUP_AGE_PROPERTY
      Variable for initialCatchUpAge string
      See Also:
    • PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY

      public static final String PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY
      Variable for productsCreatedAfterClient string
      See Also:
    • PING_INTERVAL

      public static final String PING_INTERVAL
      Variable for pingIntervalMillis string
      See Also:
    • PING_WAIT

      public static final String PING_WAIT
      Variable for pingWait string
      See Also:
    • ANY_MESSAGE_INTERVAL

      public static final String ANY_MESSAGE_INTERVAL
      Variable for anyMessageIntervalMillis string
      See Also:
    • DEFAULT_TRACKING_FILE_NAME

      public static final String DEFAULT_TRACKING_FILE_NAME
      Variable for tracking file. Links to data/AwsReceiver.json
      See Also:
    • DEFAULT_CONNECT_ATTEMPTS

      public static final String DEFAULT_CONNECT_ATTEMPTS
      Variable for connect attempts. Set to 5
      See Also:
    • DEFAULT_CONNECT_TIMEOUT

      public static final String DEFAULT_CONNECT_TIMEOUT
      Variable for timeout. Set to 1000
      See Also:
    • DEFAULT_INITIAL_CATCHUP_AGE

      public static final String DEFAULT_INITIAL_CATCHUP_AGE
      Variable for catchup age. Set to 7.0
      See Also:
    • DEFAULT_PING_INTERVAL_MILLIS

      public static final String DEFAULT_PING_INTERVAL_MILLIS
      Default for time in between pings to server
      See Also:
    • DEFAULT_PING_WAIT_MILLIS

      public static final String DEFAULT_PING_WAIT_MILLIS
      Default for how long to wait for pong response to ping before closing or restarting connection
      See Also:
    • DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS

      public static final String DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS
      Default for interval for any message
      See Also:
    • createdAfter

      protected Instant createdAfter
      Microsecond timestamp of last message that has been processed
    • initialCatchUpAge

      protected double initialCatchUpAge
      How far back to check when first connecting.
    • previousBroadcast

      protected JsonNotification previousBroadcast
      last broadcast message that has been processed (used for catch up)
    • processBroadcast

      protected boolean processBroadcast
      whether to process broadcast messages (after catching up).
    • catchUpRunning

      protected boolean catchUpRunning
      whether currenting catching up.
    • catchUpSync

      protected final Object catchUpSync
      sync object for catchUp state.
    • catchUpThread

      protected Thread catchUpThread
      thread where catch up process runs.
    • catchUpThreadRunning

      protected boolean catchUpThreadRunning
      whether thread should continue running (shutdown flag)
    • lastCatchUpSent

      protected Instant lastCatchUpSent
      last catch up message sent (for response timeouts)
  • Constructor Details

    • AwsProductReceiver

      public AwsProductReceiver()
  • Method Details

    • configure

      public void configure(Config config) throws Exception
      Description copied from class: DefaultConfigurable
      Process configuration settings. Called before startup().
      Specified by:
      configure in interface Configurable
      Overrides:
      configure in class DefaultNotificationReceiver
      Parameters:
      config - the Config object with settings.
      Throws:
      Exception - if configuration exceptions occur.
    • onOpen

      public void onOpen(javax.websocket.Session session) throws IOException
      Called when connection is first opened. Start catch up process.
      Specified by:
      onOpen in interface WebSocketListener
      Parameters:
      session - Session to open
      Throws:
      IOException - IOException
    • onClose

      public void onClose(javax.websocket.Session session, javax.websocket.CloseReason closeReason)
      Called when connection is closed, either because shutdown on this end or closed by server.
      Specified by:
      onClose in interface WebSocketListener
      Parameters:
      session - Session to close
      closeReason - Reason for closing session
    • onJsonMessage

      public void onJsonMessage(javax.json.JsonObject json) throws Exception
      Inspect the received json message and take appropriate action.
      Parameters:
      json - json formatted message
      Throws:
      Exception - when format error
    • onMessage

      public void onMessage(String message) throws IOException
      Message handler function passed to WebSocketClient Parses the message as JSON, and checks "action" property to route message for handling. Synchronized to process messages in order, since onProductsCreatedAfter compares state of latest product to determine whether caught up and if broadcasts should be processed.
      Specified by:
      onMessage in interface WebSocketListener
      Parameters:
      message - Message notification - string
      Throws:
      IOException - IOException
    • onBroadcast

      protected void onBroadcast(javax.json.JsonObject json) throws Exception
      Handle a message with "action"="broadcast". If caught up process notification as usual, otherwise save notification to help detect when caught up.
      Parameters:
      json - JSON Message
      Throws:
      Exception - Exception
    • onJsonNotification

      protected void onJsonNotification(JsonNotification notification) throws Exception
      Process a received notification and update current "created" timestamp.
      Parameters:
      notification - JSON Notification
      Throws:
      Exception - Exception
    • onProduct

      protected void onProduct(javax.json.JsonObject json) throws Exception
      Handle a message with "action"="product", which is received during catch up.
      Parameters:
      json - JSON Message
      Throws:
      Exception - Exception
    • onProductsCreatedAfter

      protected void onProductsCreatedAfter(javax.json.JsonObject json) throws Exception
      Handle a message with "action"="products_created_after", which is received during catch up. Indicates the end of a response from a "products_created_after" request. Check whether caught up, and either switch to broadcast mode or continue catch up process.
      Parameters:
      json - JSON Message
      Throws:
      Exception - Exception
    • run

      public void run()
      Catch up process. Do not run directly, use startCatchUpThread() and stopCatchUpThread() to start and stop the process. Process waits until startCatchUp() is called, and uses DefaultNotificationReceiver.throttleQueues() between sends.
      Specified by:
      run in interface Runnable
    • sendProductsCreatedAfter

      protected void sendProductsCreatedAfter() throws IOException
      Send a catch-up request to the REST endpoint. The server will reply with a REST payload containing zero or more "action"="product" messages, and then one "action"="products_created_after" message to indicate the request is complete.
      Throws:
      IOException - IOException
    • startCatchUp

      protected void startCatchUp()
      Notify running background thread to start catch up process.
    • startCatchUpThread

      protected void startCatchUpThread()
      Start background thread for catch up process.
    • stopCatchUp

      protected void stopCatchUp()
      Notify running background thread to stop catch up process.
    • stopCatchUpThread

      protected void stopCatchUpThread()
      Stop background thread for catch up process.
    • onConnectFail

      public void onConnectFail()
      Description copied from interface: WebSocketListener
      Interface method to be overriden by WebSocket files and AwsProductReceiver
      Specified by:
      onConnectFail in interface WebSocketListener
    • onReconnectFail

      public void onReconnectFail()
      Description copied from interface: WebSocketListener
      Interface method to be overriden by WebSocket files and AwsProductReceiver
      Specified by:
      onReconnectFail in interface WebSocketListener
    • startup

      public void startup() throws Exception
      Reads createdAfter from a tracking file if it exists, then connects to web socket.
      Specified by:
      startup in interface Configurable
      Overrides:
      startup in class DefaultNotificationReceiver
      Throws:
      Exception - Exception
    • shutdown

      public void shutdown() throws Exception
      Closes web socket
      Specified by:
      shutdown in interface Configurable
      Overrides:
      shutdown in class DefaultNotificationReceiver
      Throws:
      Exception - Exception
    • readTrackingData

      public javax.json.JsonObject readTrackingData() throws Exception
      Reads tracking file.
      Returns:
      JsonObject tracking file
      Throws:
      Exception - Exception
    • writeTrackingData

      public void writeTrackingData() throws Exception
      Writes tracking file.
      Throws:
      Exception - Exception
    • getRestClient

      public ProductsCreatedAfterClient getRestClient()
      Getter for rest client
      Returns:
      ProductsCreatedAfterClient
    • setRestClient

      public void setRestClient(ProductsCreatedAfterClient restClient)
      Setter for rest client
      Parameters:
      restClient - object
    • getURI

      public URI getURI()
      Getter for URI
      Returns:
      URI
    • setURI

      public void setURI(URI uri)
      Setter for URI
      Parameters:
      uri - URI
    • getTrackingFileName

      public String getTrackingFileName()
      Getter for trackingFileName
      Returns:
      name of tracking file
    • setTrackingFileName

      public void setTrackingFileName(String trackingFileName)
      Setter for trackingFileName
      Parameters:
      trackingFileName - trackingFileName
    • getCreatedAfter

      public Instant getCreatedAfter()
      Getter for createdAfter
      Returns:
      createdAfter
    • setCreatedAfter

      public void setCreatedAfter(Instant createdAfter)
      Setter for createdAfter
      Parameters:
      createdAfter - createdAfter
    • getAttempts

      public int getAttempts()
      Getter for attempts
      Returns:
      attempts
    • setAttempts

      public void setAttempts(int attempts)
      Setter for attempts
      Parameters:
      attempts - attempts
    • getTimeout

      public long getTimeout()
      Getter for timeout
      Returns:
      timeout
    • setTimeout

      public void setTimeout(long timeout)
      Setter for timeout
      Parameters:
      timeout - long timeout
    • getPreviousBroadcast

      public JsonNotification getPreviousBroadcast()
      Getter for returning previous broadcast
      Returns:
      JsonNotification
    • setPreviousBroadcast

      public void setPreviousBroadcast(JsonNotification previousBroadcast)
      Setter for previous broadcast
      Parameters:
      previousBroadcast - the JsonNotification of the most recent broadcast
    • isProcessBroadcast

      public boolean isProcessBroadcast()
      Getter for returning if receiver is processing broadcasts
      Returns:
      JsonNotification
    • setProcessBroadcast

      public void setProcessBroadcast(boolean processBroadcast)
      Setter for processing broadcasts