AwsProductReceiver.java

package gov.usgs.earthquake.aws;

import java.io.IOException;
import java.io.StringReader;
import java.net.http.HttpClient;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Objects;
import java.util.Optional;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.naming.ConfigurationException;
import javax.websocket.CloseReason;
import javax.websocket.Session;

import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.earthquake.distribution.WebSocketClient;
import gov.usgs.earthquake.distribution.WebSocketListener;
import gov.usgs.earthquake.product.InvalidProductIdException;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;

/**
 * Receives notifications from a PDL notification web socket.
 *
 * After initial connection, ignores broadcasts until catch up process is
 * complete.
 *
 * Catch up involves sending a "products_created_after" request with the latest
 * notification "created" timestamp, and processing products until either the
 * last product matches the last broadcast or there are no more products after
 * the latest notification "created" timestamp.
 */
public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {

  /** Initialzation of logger. For us later in file. */
  public static final Logger LOGGER = Logger.getLogger(AwsProductReceiver.class.getName());
  /** Variable for URI string */
  public static final String URI_PROPERTY = "url";
  /** Variable for createdAfter string */
  public static final String CREATED_AFTER_PROPERTY = "createdAfter";
  /** Variable for trackingIndex string */
  public static final String TRACKING_INDEX_PROPERTY = "trackingIndex";
  /** Variable for trackingFileName string */
  public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
  /** Variable for connectAttempts string */
  public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
  /** Variable for connectTimeout string */
  public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  /** Variable for initialCatchUpAge string */
  public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";
  /** Variable for productsCreatedAfterClient string */
  public static final String PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY = "productsCreatedAfterClient";
  /** Variable for pingIntervalMillis string */
  public static final String PING_INTERVAL = "pingInterval";
  /** Variable for pingWait string */
  public static final String PING_WAIT = "pingWait";
  /** Variable for anyMessageIntervalMillis string */
  public static final String ANY_MESSAGE_INTERVAL = "anyMessageInterval";

  /** Variable for tracking file. Links to data/AwsReceiver.json */
  public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
  /** Variable for connect attempts. Set to 5 */
  public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
  /** Variable for timeout. Set to 1000 */
  public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
  /** Variable for catchup age. Set to 7.0 */
  public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";
  /** Default for time in between pings to server */
  public static final String DEFAULT_PING_INTERVAL_MILLIS = "15000";
  /**
   * Default for how long to wait for pong response to ping before closing or
   * restarting connection
   */
  public static final String DEFAULT_PING_WAIT_MILLIS = "4000";
  /** Default for interval for any message */
  public static final String DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = "0";

  private URI uri = null;
  private String trackingFileName;
  private int attempts;
  private long timeout;
  private long pingIntervalMillis;
  private long pingWaitMillis;
  private long anyMessageIntervalMillis;

  private TrackingIndex trackingIndex;
  private WebSocketClient client;
  private ProductsCreatedAfterClient restClient;

  /** Microsecond timestamp of last message that has been processed */
  protected Instant createdAfter = null;

  /** How far back to check when first connecting. */
  protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);

  /** last broadcast message that has been processed (used for catch up) */
  protected JsonNotification previousBroadcast = null;
  /** whether to process broadcast messages (after catching up). */
  protected boolean processBroadcast = false;

  /** whether currenting catching up. */
  protected boolean catchUpRunning = false;
  /** sync object for catchUp state. */
  protected final Object catchUpSync = new Object();
  /** thread where catch up process runs. */
  protected Thread catchUpThread = null;
  /** whether thread should continue running (shutdown flag) */
  protected boolean catchUpThreadRunning = false;
  /** last catch up message sent (for response timeouts) */
  protected Instant lastCatchUpSent = null;

  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);

    uri = new URI(config.getProperty(URI_PROPERTY));

    attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
    timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
    initialCatchUpAge = Double.valueOf(config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));
    pingIntervalMillis = Long.parseLong(config.getProperty(PING_INTERVAL, DEFAULT_PING_INTERVAL_MILLIS));
    pingWaitMillis = Long.parseLong(config.getProperty(PING_WAIT, DEFAULT_PING_WAIT_MILLIS));
    anyMessageIntervalMillis = Long
        .parseLong(config.getProperty(ANY_MESSAGE_INTERVAL, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS));

    String restClientName = config.getProperty(PRODUCTS_CREATED_AFTER_CLIENT_PROPERTY);
    if (restClientName != null) {
      this.restClient = (ProductsCreatedAfterClient) Config.getConfig().getObject(restClientName);
    } else {
      throw new ConfigurationException(
          "[" + this.getName() + "] invalid or missing ProductsCreatedAfterClient configuration");
    }

    final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
    if (trackingIndexName != null) {
      LOGGER.config("[" + getName() + "] loading tracking index " + trackingIndexName);
      try {
        // read object from global config
        trackingIndex = (TrackingIndex) Config.getConfig().getObject(trackingIndexName);
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "[" + getName() + "] error loading tracking index " + trackingIndexName, e);
      }
    } else {
      trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY);
      if (trackingFileName != null) {
        LOGGER.config("[" + getName() + "] creating tracking index at" + trackingFileName);
        trackingIndex = new TrackingIndex(TrackingIndex.DEFAULT_DRIVER, "jdbc:sqlite:" + trackingFileName);
      }
    }
  }

  /**
   * Called when connection is first opened.
   *
   * Start catch up process.
   */
  @Override
  public void onOpen(Session session) throws IOException {
    LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId() + " " + Instant.now().toString());

    // start catch up process
    LOGGER.info("[" + getName() + "] Starting catch up");
    // ignore broadcast until caught up
    this.setProcessBroadcast(false);
    startCatchUp();
  }

  /**
   * Called when connection is closed, either because shutdown on this end or
   * closed by server.
   */
  @Override
  public void onClose(Session session, CloseReason closeReason) {
    LOGGER.info("[" + getName() + "] onClose " + closeReason.toString() + " " + Instant.now().toString());

    // cannot catch up when not connected, restart in onOpen
    stopCatchUp();
  }

  /**
   * Inspect the received json message and take appropriate action.
   *
   * @param json json formatted message
   * @throws Exception when format error
   */
  synchronized public void onJsonMessage(JsonObject json) throws Exception {
    final String action = json.getString("action");

    if ("broadcast".equals(action)) {
      onBroadcast(json);
    } else if ("product".equals(action)) {
      onProduct(json);
    } else if ("products_created_after".equals(action)) {
      onProductsCreatedAfter(json);
    }
  }

  /**
   * Message handler function passed to WebSocketClient
   *
   * Parses the message as JSON, and checks "action" property to route message for
   * handling.
   *
   * Synchronized to process messages in order, since onProductsCreatedAfter
   * compares state of latest product to determine whether caught up and if
   * broadcasts should be processed.
   *
   * @param message Message notification - string
   */
  @Override
  synchronized public void onMessage(String message) throws IOException {
    try (final JsonReader reader = Json.createReader(new StringReader(message))) {
      // parse message
      final JsonObject json = reader.readObject();
      onJsonMessage(json);
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing message '" + message + "'", e);
      throw new IOException(e);
    }
  }

  /**
   * Handle a message with "action"="broadcast".
   *
   * If caught up process notification as usual, otherwise save notification to
   * help detect when caught up.
   *
   * @param json JSON Message
   * @throws Exception Exception
   */
  protected void onBroadcast(final JsonObject json) throws Exception {
    final JsonNotification notification = new JsonNotification(json.getJsonObject("notification"));

    Optional<ProductId> broadcastedPreviousProductId = Optional.empty();
    try {
      broadcastedPreviousProductId = Optional.of(ProductId.fromJson(json.getJsonObject("previousProductId")));
    } catch (InvalidProductIdException ignored) {
    }

    ProductId broadcastedProductId = notification.getProductId();

    LOGGER.finer("[" + getName() + "]" + " onBroadcast(" + notification.getProductId() + ")" + " sequence="
        + broadcastedProductId + ", previousBroadcast=" + this.previousBroadcast);

    boolean isBroadcastedInOrder = broadcastedPreviousProductId.isEmpty()
        || (broadcastedPreviousProductId.isPresent()
            && Objects.nonNull(previousBroadcast)
            && broadcastedPreviousProductId.get().equals(this.previousBroadcast.getProductId()));

    if (this.isProcessBroadcast() && !isBroadcastedInOrder) {
      // may have missed message
      LOGGER.info(
          "[" + getName() + "] broadcast ids out of sequence" + " (at " + broadcastedPreviousProductId.get()
              + ", received "
              + broadcastedProductId + ")" + ", switching to catch up mode");
      this.setProcessBroadcast(false);
      startCatchUp();
    }

    previousBroadcast = notification;

    // process message if not in catch up mode
    if (this.isProcessBroadcast()) {
      onJsonNotification(notification);
    }
  }

  /**
   * Process a received notification and update current "created" timestamp.
   *
   * @param notification JSON Notification
   * @throws Exception Exception
   */
  protected void onJsonNotification(final JsonNotification notification) throws Exception {
    // receive and notify listeners
    receiveNotification(notification);
    // update tracking file
    this.createdAfter = notification.created;
    writeTrackingData();
    // send heartbeat
    HeartbeatListener.sendHeartbeatMessage(getName(), "createdAfter", createdAfter.toString());
  }

  /**
   * Handle a message with "action"="product", which is received during catch up.
   *
   * @param json JSON Message
   * @throws Exception Exception
   */
  protected void onProduct(final JsonObject json) throws Exception {
    final JsonNotification notification = new JsonNotification(json.getJsonObject("notification"));
    LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
    onJsonNotification(notification);
  }

  /**
   * Handle a message with "action"="products_created_after", which is received
   * during catch up.
   *
   * Indicates the end of a response from a "products_created_after" request.
   * Check whether caught up, and either switch to broadcast mode or continue
   * catch up process.
   *
   * @param json JSON Message
   * @throws Exception Exception
   */
  protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
    final String after = json.getString("created_after");
    final int count = json.getInt("count");
    LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after + ", " + count + " products)");

    // notify background thread that a response was received,
    // as well as pausing messages until restarted below (if needed)
    stopCatchUp();

    // check whether caught up
    if (
    // if a broadcast received during catchup,
    (previousBroadcast != null &&
    // and createdAfter is at or after last broadcast
        createdAfter.compareTo(previousBroadcast.created) >= 0)
        // or no additional products returned
        || count == 0) {
      // caught up
      LOGGER.info("[" + getName() + "] Caught up, switching to broadcast");
      this.setProcessBroadcast(true);
    } else {
      // keep catching up
      startCatchUp();
    }
  }

  /**
   * Catch up process.
   *
   * Do not run directly, use {@link #startCatchUpThread()} and
   * {@link #stopCatchUpThread()} to start and stop the process.
   *
   * Process waits until {@link #startCatchUp()} is called, and uses
   * {@link #throttleQueues()} between sends.
   */
  @Override
  public void run() {
    while (catchUpThreadRunning) {
      try {
        synchronized (catchUpSync) {
          if (!catchUpRunning) {
            catchUpSync.wait();
            continue;
          }
          if (lastCatchUpSent != null) {
            // message already sent, wait for timeout
            Instant now = Instant.now();
            Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
            if (now.isBefore(timeout)) {
              catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
              continue;
            } else {
              // timed out
              LOGGER.warning("No products_created_after response" + ", sent at " + lastCatchUpSent.toString());
              // fall through
            }
          }
        }

        // ready to send, but block until done throttling
        throttleQueues();

        try {
          synchronized (catchUpSync) {
            // connection may have closed while throttling
            if (!catchUpRunning) {
              continue;
            }
            sendProductsCreatedAfter();
            // track when sent
            lastCatchUpSent = Instant.now();
          }
        } catch (IOException e) {
          LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
          if (catchUpThreadRunning && catchUpRunning) {
            // wait before next attempt
            try {
              Thread.sleep(1000L);
            } catch (InterruptedException ie) {
              // probably stopping
            }
          }
        }
      } catch (InterruptedException e) {
        // probably stopping
      }
    }
  }

  /**
   * Send a catch-up request to the REST endpoint.
   *
   * The server will reply with a REST payload containing zero or more
   * "action"="product" messages, and then one "action"="products_created_after"
   * message to indicate the request is complete.
   *
   * @throws IOException IOException
   */
  protected void sendProductsCreatedAfter() throws IOException {
    // set default for created after
    if (this.createdAfter == null) {
      this.createdAfter = Instant.now().minusSeconds(Math.round(initialCatchUpAge * 86400));
    }

    try {
      this.restClient.send(this.createdAfter);
    } catch (Exception ex) {
      LOGGER.log(Level.WARNING, "[" + this.getName() + "] failed to send products_created_after", ex);
      throw new IOException(ex);
    }

  }

  /**
   * Notify running background thread to start catch up process.
   */
  protected void startCatchUp() {
    // notify background thread to start catch up
    synchronized (catchUpSync) {
      catchUpRunning = true;
      // clear sent time
      lastCatchUpSent = null;
      catchUpSync.notify();
    }
  }

  /**
   * Start background thread for catch up process.
   */
  protected void startCatchUpThread() {
    if (catchUpThread != null) {
      throw new IllegalStateException("catchUp thread already started");
    }
    synchronized (catchUpSync) {
      catchUpThreadRunning = true;
      catchUpThread = new Thread(this);
    }
    catchUpThread.start();
  }

  /**
   * Notify running background thread to stop catch up process.
   */
  protected void stopCatchUp() {
    synchronized (catchUpSync) {
      // stop catch up
      catchUpRunning = false;
      // clear sent time
      lastCatchUpSent = null;
      catchUpSync.notify();
    }
  }

  /**
   * Stop background thread for catch up process.
   */
  protected void stopCatchUpThread() {
    if (catchUpThread == null) {
      return;
    }
    // stop catch up thread
    try {
      synchronized (catchUpSync) {
        // orderly shutdown
        catchUpThreadRunning = false;
        catchUpSync.notify();
      }
      // interrupt just in case
      catchUpThread.interrupt();
      catchUpThread.join();
    } catch (Exception e) {
      LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
    } finally {
      catchUpThread = null;
    }
  }

  @Override
  public void onConnectFail() {
    // client failed to connect
    LOGGER.info("[" + getName() + "] onConnectFail " + Instant.now().toString());
  }

  @Override
  public void onReconnectFail() {
    // failed to reconnect after close
    LOGGER.info("[" + getName() + "] onReconnectFail " + Instant.now().toString());
  }

  /**
   * Reads createdAfter from a tracking file if it exists, then connects to web
   * socket.
   *
   * @throws Exception Exception
   */
  @Override
  public void startup() throws Exception {
    super.startup();
    if (trackingIndex == null) {
      trackingIndex = new TrackingIndex();
    }
    trackingIndex.startup();

    // read sequence from tracking file if other parameters agree
    JsonObject json = readTrackingData();
    if (json != null && json.getString(URI_PROPERTY).equals(uri.toString())) {
      createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
    }

    // Listen for messages from REST client
    this.restClient.addReceiver(this);
    this.restClient.startup();

    // open websocket
    client = new WebSocketClient(uri, this, attempts, timeout, true, pingIntervalMillis, pingWaitMillis,
        anyMessageIntervalMillis);

    // start catch up process
    startCatchUpThread();
  }

  /**
   * Closes web socket
   *
   * @throws Exception Exception
   */
  @Override
  public void shutdown() throws Exception {
    // stop catch up process
    stopCatchUpThread();

    // close socket
    try {
      client.shutdown();
    } catch (Exception e) {
    }

    // End REST client
    try {
      this.restClient.removeReceiver(this);
      this.restClient.shutdown();
    } catch (Exception ex) {
    }

    super.shutdown();
  }

  /**
   * Reads tracking file.
   *
   * @return JsonObject tracking file
   * @throws Exception Exception
   */
  public JsonObject readTrackingData() throws Exception {
    // use name as key
    return trackingIndex.getTrackingData(getName());
  }

  /**
   * Writes tracking file.
   *
   * @throws Exception Exception
   */
  public void writeTrackingData() throws Exception {
    JsonObject json = Json.createObjectBuilder().add(URI_PROPERTY, uri.toString())
        .add(CREATED_AFTER_PROPERTY, createdAfter.toString()).build();
    // use name as key
    trackingIndex.setTrackingData(getName(), json);
  }

  /**
   * Getter for rest client
   *
   * @return ProductsCreatedAfterClient
   */
  public ProductsCreatedAfterClient getRestClient() {
    return this.restClient;
  }

  /**
   * Setter for rest client
   *
   * @param restClient object
   */

  public void setRestClient(ProductsCreatedAfterClient restClient) {
    this.restClient = restClient;
  }

  /**
   * Getter for URI
   *
   * @return URI
   */
  public URI getURI() {
    return uri;
  }

  /**
   * Setter for URI
   *
   * @param uri URI
   */
  public void setURI(final URI uri) {
    this.uri = uri;
  }

  /**
   * Getter for trackingFileName
   *
   * @return name of tracking file
   */
  public String getTrackingFileName() {
    return trackingFileName;
  }

  /**
   * Setter for trackingFileName
   *
   * @param trackingFileName trackingFileName
   */
  public void setTrackingFileName(final String trackingFileName) {
    this.trackingFileName = trackingFileName;
  }

  /**
   * Getter for createdAfter
   *
   * @return createdAfter
   */
  public Instant getCreatedAfter() {
    return createdAfter;
  }

  /**
   * Setter for createdAfter
   *
   * @param createdAfter createdAfter
   */
  public void setCreatedAfter(final Instant createdAfter) {
    this.createdAfter = createdAfter;
  }

  /**
   * Getter for attempts
   *
   * @return attempts
   */
  public int getAttempts() {
    return attempts;
  }

  /**
   * Setter for attempts
   *
   * @param attempts attempts
   */
  public void setAttempts(final int attempts) {
    this.attempts = attempts;
  }

  /**
   * Getter for timeout
   *
   * @return timeout
   */
  public long getTimeout() {
    return timeout;
  }

  /**
   * Setter for timeout
   *
   * @param timeout long timeout
   */
  public void setTimeout(final long timeout) {
    this.timeout = timeout;
  }

  /**
   * Getter for returning previous broadcast
   * 
   * @return JsonNotification
   */
  public JsonNotification getPreviousBroadcast() {
    return previousBroadcast;
  }

  /**
   * Setter for previous broadcast
   * 
   * @param previousBroadcast the JsonNotification of the most recent broadcast
   */
  public void setPreviousBroadcast(JsonNotification previousBroadcast) {
    this.previousBroadcast = previousBroadcast;
  }

  /**
   * Getter for returning if receiver is processing broadcasts
   * 
   * @return JsonNotification
   */
  public boolean isProcessBroadcast() {
    return processBroadcast;
  }

  /**
   * Setter for processing broadcasts
   * 
   */
  public void setProcessBroadcast(boolean processBroadcast) {
    this.processBroadcast = processBroadcast;
  }

}