WebSocketClient.java

package gov.usgs.earthquake.distribution;

import javax.websocket.*;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;
import java.nio.ByteBuffer;

/**
 * Manages a simple connection to a websocket. Can also be overridden for more
 * complex behavior.
 */
@ClientEndpoint
public class WebSocketClient implements Runnable {

  private Session session;

  private URI endpoint;
  private WebSocketListener listener;
  private int attempts;
  private long timeoutMillis;
  private boolean retryOnClose;
  private ScheduledExecutorService scheduledExector;
  private long pingIntervalMillis = 0;
  private long pingWaitMillis = 0;
  private boolean pingSent = false;
  private boolean pongReceived = false;
  private long timePingSentMillis;
  private ScheduledFuture<?> pingTask;
  private String pingFailMessage = "";
  private ScheduledFuture<?> anyMessageTask;
  private long anyMessageIntervalMillis = 0;
  private boolean anyMessageReceived = false;

  // Default Attempts and Timeout have been set to values that essentially mean
  // the client will continue to retry "forever".
  // This is the most common way the client will probably be run, as a continual
  // process meant to be up at all times.
  /** Default number of attempts */
  public static final int DEFAULT_ATTEMPTS = 100000;
  /** Default timeout in ms */
  public static final long DEFAULT_TIMEOUT_MILLIS = 500;
  /** Default for trying to retry on close */
  public static final boolean DEFAULT_RETRY_ON_CLOSE = true;
  /** Default for time in between pings to server */
  public static final long DEFAULT_PING_INTERVAL_MILLIS = 15000;
  /**
   * Default for how long to wait for pong response to ping before closing or
   * restarting connection
   */
  public static final long DEFAULT_PING_WAIT_MILLIS = 4000;
  /** Default for how long to wait for any Message(excluding Pongs) */
  // A value of 0 disables this function
  public static final long DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = 0;

  /**
   * Constructs the client. Also connects to the server.
   *
   * @param endpoint      the URI to connect to
   * @param listener      a WebSocketListener to handle incoming messages
   * @param attempts      an integer number of times to try the connection
   * @param timeoutMillis a long for the wait time between attempts
   * @param retryOnClose  boolean for if the connection should retry when closed
   * @throws Exception on thread interrupt or connection failure
   * @deprecated use
   *             {@link #WebSocketClient(URI, WebSocketListener, int, long, boolean, long, long, long)},
   *             includes ping configuration
   */
  @Deprecated
  public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
      boolean retryOnClose) throws Exception {
    this(endpoint, listener, attempts, timeoutMillis, retryOnClose, DEFAULT_PING_INTERVAL_MILLIS,
        DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
  }

  /**
   * Creates a Websocket Client Default values for attempts ant timeoutMillis
   * create an instance which is designed to be up and running at all times
   *
   * @param endpoint                 the URI to connect to
   * @param listener                 a WebSocketListener to handle incoming
   *                                 messages
   * @param attempts                 an integer number of times to try the
   *                                 connection
   * @param timeoutMillis            a long for the wait time between attempts
   * @param retryOnClose             boolean for if the connection should retry
   *                                 when closed
   * @param pingIntervalMillis       how often to send ping in milliseconds. If
   *                                 you don't want to send pings set to 0 or
   *                                 negative value.
   * @param pingWaitMillis           how long to wait, in milliseconds, before
   *                                 declaring socket down and closing and
   *                                 retrying if retryOnClose is set.
   * @param anyMessageIntervalMillis how often to check if any message has
   *                                 arrived(excluding Pongs) restart if one has
   *                                 not.
   * @throws Exception on thread interrupt or connection failure
   */
  public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
      boolean retryOnClose, long pingIntervalMillis, long pingWaitMillis, long anyMessageIntervalMillis)
      throws Exception {
    this.pingIntervalMillis = pingIntervalMillis;
    this.pingWaitMillis = pingWaitMillis;
    this.anyMessageIntervalMillis = anyMessageIntervalMillis;
    if (this.pingIntervalMillis > 0 || this.anyMessageIntervalMillis > 0) {
      this.scheduledExector = Executors.newSingleThreadScheduledExecutor();
    }

    this.listener = listener;
    this.endpoint = endpoint;
    this.attempts = attempts;
    this.timeoutMillis = timeoutMillis;
    this.retryOnClose = retryOnClose;

    connect();
  }

  /**
   * Constructs the client
   *
   * @param endpoint the URI to connect to
   * @param listener a WebSocketListener to handle incoming messages
   * @throws Exception thread interrupt or connection failure
   */
  public WebSocketClient(URI endpoint, WebSocketListener listener) throws Exception {
    this(endpoint, listener, DEFAULT_ATTEMPTS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_RETRY_ON_CLOSE,
        DEFAULT_PING_INTERVAL_MILLIS, DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
  }

  private void anyMessageClose() {
    try {
      if (this.session.isOpen()) {
        session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
      } else {
        onClose(this.session, new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
      }
    } catch (Exception e) {
    }
  }

  /**
   * Called by Websocket interface when a Pong is received in response to a ping
   * that was sent.
   *
   * @param pongMessage Message that was populated from ping.
   * @param session     The websocket session.
   */
  @OnMessage
  public void catchPong(PongMessage pongMessage, Session session) {
    this.pongReceived = true;
  }

  /**
   * Connect to server
   *
   * @throws Exception if error occurs
   */
  public void connect() throws Exception {
    // try to connect to server
    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
    int failedAttempts = 0;
    Exception lastExcept = null;
    for (int i = 0; i < attempts; i++) {
      try {
        container.connectToServer(this, endpoint);
        break;
      } catch (Exception e) {
        // increment failed attempts, sleep
        failedAttempts++;
        lastExcept = e;
        Thread.sleep(timeoutMillis);
      }
    }

    // throw connect exception if all attempts fail
    if (failedAttempts == attempts) {
      this.listener.onConnectFail();
      throw lastExcept;
    }
  }

  /**
   * If any message at all, which would include heartbeats, has been received
   * since the last anymessage interval this value will be true.
   *
   * @return
   */
  private boolean hasReceivedAnyMessage() {
    return this.anyMessageReceived;
  }

  /**
   * Checks if there is an open session
   *
   * @return boolean
   * @throws IOException if IO error occurs
   */
  public boolean isConnected() throws IOException {
    return this.session != null && this.session.isOpen();
  }

  /**
   * Sets the session and listener
   *
   * @param session Session
   * @throws IOException if IO error occurs
   */
  @OnOpen
  public void onOpen(Session session) throws IOException {
    this.session = session;
    if (pingIntervalMillis > 0) {
      pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
    }
    if (anyMessageIntervalMillis > 0) {
      startAnyMessageReceived();
    }
    this.listener.onOpen(session);
    this.anyMessageReceived = false;
  }

  /**
   * Closes the session on the lister, sets constructor session to null Check if
   * should be retryed
   *
   * @param session Session
   * @param reason  for close
   * @throws IOException if IO error occurs
   */
  @OnClose
  public void onClose(Session session, CloseReason reason) throws IOException {
    this.listener.onClose(session, reason);
    this.session = null;
    pingSent = false;
    pongReceived = false;
    if (pingTask != null) {
      pingTask.cancel(true);
    }
    if (anyMessageTask != null) {
      anyMessageTask.cancel(true);
    }
    if (retryOnClose) {
      try {
        this.connect();
      } catch (Exception e) {
        // failed to reconnect
        this.listener.onReconnectFail();
      }
    }
  }

  /**
   * Gives listener the message
   *
   * @param message String
   * @throws IOException if IO error occurs
   */
  @OnMessage
  public void onMessage(String message) throws IOException {
    this.anyMessageReceived = true;
    if (this.anyMessageIntervalMillis > 0) {
      startAnyMessageReceived();
    }
    this.listener.onMessage(message);
  }

  /**
   * If a ping has failed this method will close the session in the proper manner.
   *
   * @param session
   * @param pingFailMessage
   */
  private void pingFailClose(Session session, String pingFailMessage) {
    if (pingFailMessage.isBlank()) {
      pingFailMessage = "Pong not received!";
    }
    // String for CloseReason has to be lest than 123 bytes.
    if (pingFailMessage.length() > 90) {
      pingFailMessage = pingFailMessage.substring(0, 90);
    }
    try {
      if (session.isOpen()) {
        session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
      } else {
        onClose(session,
            new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
      }
    } catch (Exception e) {
    }
    pingFailMessage = "";
  }

  @Override
  public void run() {
    if (!pingSent) {
      // Add timestamp to ping message that could be evaluated when pong is received
      this.timePingSentMillis = System.currentTimeMillis();
      byte[] data = ("" + this.timePingSentMillis).getBytes();
      try {
        pongReceived = false;
        // Set pingSent=true here because the pingTask will always be started and the
        // failure
        // will occur when the pong has not been received even if the ping fails to
        // send.
        pingSent = true;
        this.session.getAsyncRemote().sendPing(ByteBuffer.wrap(data));

      } catch (Exception e) {
        // This message is used to indicate that the failure was actually in the sending
        // of the ping.
        pingFailMessage = "Sending Ping failed.  E=" + e.getLocalizedMessage();
      }

      pingTask = this.scheduledExector.schedule(this, pingWaitMillis, TimeUnit.MILLISECONDS);
    } else {
      if (!pongReceived) {
        pingFailClose(session, pingFailMessage);
      } else {
        pingSent = false;
        pongReceived = false;
        pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
      }
    }
  }

  /**
   * Sets retry to false, then closes session
   *
   * @throws Exception if error occurs
   */
  public void shutdown() throws Exception {
    this.retryOnClose = false;
    this.session.close();
    if (this.scheduledExector != null) {
      this.scheduledExector.shutdownNow();
    }
  }

  /** @param listener set WebSocketListener */
  public void setListener(WebSocketListener listener) {
    this.listener = listener;
  }

  /**
   * Start a task to check if any message has been received during the
   * anyMessageIntervalMillis time period.
   */
  private void startAnyMessageReceived() {
    anyMessageReceived = false;
    if (anyMessageTask != null) {
      anyMessageTask.cancel(false);
    }
    anyMessageTask = this.scheduledExector.schedule(new AnyMessageRunner(this), anyMessageIntervalMillis,
        TimeUnit.MILLISECONDS);
  }

  /**
   * This class is used to asyncronously handle tracking if any message has been
   * received.
   */
  private class AnyMessageRunner implements Runnable {
    private WebSocketClient webSocketClient;

    public AnyMessageRunner(WebSocketClient webSocketClient) {
      this.webSocketClient = webSocketClient;
    }

    @Override
    public void run() {
      if (!webSocketClient.hasReceivedAnyMessage()) {
        webSocketClient.anyMessageClose();
      } else {
        webSocketClient.startAnyMessageReceived();
      }
    }
  }

}