WebSocketClient.java

  1. package gov.usgs.earthquake.distribution;

  2. import javax.websocket.*;

  3. import gov.usgs.util.Config;
  4. import gov.usgs.util.DefaultConfigurable;

  5. import java.io.IOException;
  6. import java.net.URI;
  7. import java.util.Objects;
  8. import java.util.concurrent.*;
  9. import java.util.logging.Logger;
  10. import java.nio.ByteBuffer;

  11. /**
  12.  * Manages a simple connection to a websocket. Can also be overridden for more
  13.  * complex behavior.
  14.  */
  15. @ClientEndpoint
  16. public class WebSocketClient extends DefaultConfigurable implements Runnable {

  17.   /** Initialzation of logger. For us later in file. */
  18.   public static final Logger LOGGER = Logger.getLogger(WebSocketClient.class.getName());

  19.   private Session session;

  20.   private URI endpoint;
  21.   private WebSocketListener listener;
  22.   private int attempts = WebSocketClient.DEFAULT_ATTEMPTS;
  23.   private long timeoutMillis = WebSocketClient.DEFAULT_TIMEOUT_MILLIS;
  24.   private boolean retryOnClose = WebSocketClient.DEFAULT_RETRY_ON_CLOSE;
  25.   private ScheduledExecutorService scheduledExector;
  26.   private long pingIntervalMillis = WebSocketClient.DEFAULT_PING_INTERVAL_MILLIS;
  27.   private long pingWaitMillis = WebSocketClient.DEFAULT_PING_WAIT_MILLIS;
  28.   private boolean pingSent = false;
  29.   private boolean pongReceived = false;
  30.   private long timePingSentMillis;
  31.   private ScheduledFuture<?> pingTask;
  32.   private String pingFailMessage = "";
  33.   private ScheduledFuture<?> anyMessageTask;
  34.   private long anyMessageIntervalMillis = WebSocketClient.DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS;
  35.   private boolean anyMessageReceived = false;

  36.   // Default Attempts and Timeout have been set to values that essentially mean
  37.   // the client will continue to retry "forever".
  38.   // This is the most common way the client will probably be run, as a continual
  39.   // process meant to be up at all times.
  40.   /** Default number of attempts */
  41.   public static final int DEFAULT_ATTEMPTS = 100000;
  42.   /** Default timeout in ms */
  43.   public static final long DEFAULT_TIMEOUT_MILLIS = 500;
  44.   /** Default for trying to retry on close */
  45.   public static final boolean DEFAULT_RETRY_ON_CLOSE = true;
  46.   /** Default for time in between pings to server */
  47.   public static final long DEFAULT_PING_INTERVAL_MILLIS = 15000;
  48.   /**
  49.    * Default for how long to wait for pong response to ping before closing or
  50.    * restarting connection
  51.    */
  52.   public static final long DEFAULT_PING_WAIT_MILLIS = 4000;
  53.   /** Default for how long to wait for any Message(excluding Pongs) */
  54.   // A value of 0 disables this function
  55.   public static final long DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = 0;

  56.   /**
  57.    * Property name to configure the pingWaitMillis value for this WebSocketClient
  58.    */
  59.   public static final String PING_WAIT_MILLIS_PROPERTY = "pingWaitMillis";

  60.   /**
  61.    * Property name to configure the anyMessageIntervalMillis value for this
  62.    * WebSocketClient
  63.    */
  64.   public static final String ANY_MESSAGE_INTERVAL_MILLIS_PROPERTY = "anyMessageIntervalMillis";

  65.   /**
  66.    * Property name to configure the pingIntervalMillis value for this
  67.    * WebSocketClient
  68.    */
  69.   public static final String PING_INTERVAL_MILLIS_PROPERTY = "pingIntervalMillis";

  70.   /**
  71.    * Property name to configure the connectRetries value for this WebSocketClient
  72.    */
  73.   public static final String CONNECT_RETRIES_PROPERTY = "connectRetries";

  74.   /** Property name to configure the url value for this WebSocketClient */
  75.   public static final String URI_PROPERTY = "url";

  76.   /**
  77.    * Property name to configure the connectTimeout value for this WebSocketClient
  78.    */
  79.   public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";

  80.   /**
  81.    * Property name to configure the retryOnClose value for this WebSocketClient
  82.    */
  83.   public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";

  84.   /**
  85.    * Default constructor required for configurable interface.
  86.    *
  87.    * @throws Exception
  88.    */
  89.   public WebSocketClient() throws Exception {
  90.     // Must call configure method to set all necessary attributes
  91.     // If not configured properly, expect crazy exceptions at runtime
  92.   }

  93.   /**
  94.    * Constructs the client. Also connects to the server.
  95.    *
  96.    * @param endpoint      the URI to connect to
  97.    * @param listener      a WebSocketListener to handle incoming messages
  98.    * @param attempts      an integer number of times to try the connection
  99.    * @param timeoutMillis a long for the wait time between attempts
  100.    * @param retryOnClose  boolean for if the connection should retry when closed
  101.    * @throws Exception on thread interrupt or connection failure
  102.    * @deprecated use
  103.    *             {@link #WebSocketClient(URI, WebSocketListener, int, long, boolean, long, long, long)},
  104.    *             includes ping configuration
  105.    */
  106.   @Deprecated
  107.   public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
  108.       boolean retryOnClose) throws Exception {
  109.     this(endpoint, listener, attempts, timeoutMillis, retryOnClose, DEFAULT_PING_INTERVAL_MILLIS,
  110.         DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
  111.   }

  112.   /**
  113.    * Creates a Websocket Client Default values for attempts ant timeoutMillis
  114.    * create an instance which is designed to be up and running at all times
  115.    *
  116.    * @param endpoint                 the URI to connect to
  117.    * @param listener                 a WebSocketListener to handle incoming
  118.    *                                 messages
  119.    * @param attempts                 an integer number of times to try the
  120.    *                                 connection
  121.    * @param timeoutMillis            a long for the wait time between attempts
  122.    * @param retryOnClose             boolean for if the connection should retry
  123.    *                                 when closed
  124.    * @param pingIntervalMillis       how often to send ping in milliseconds. If
  125.    *                                 you don't want to send pings set to 0 or
  126.    *                                 negative value.
  127.    * @param pingWaitMillis           how long to wait, in milliseconds, before
  128.    *                                 declaring socket down and closing and
  129.    *                                 retrying if retryOnClose is set.
  130.    * @param anyMessageIntervalMillis how often to check if any message has
  131.    *                                 arrived(excluding Pongs) restart if one has
  132.    *                                 not.
  133.    * @throws Exception on thread interrupt or connection failure
  134.    */
  135.   public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
  136.       boolean retryOnClose, long pingIntervalMillis, long pingWaitMillis, long anyMessageIntervalMillis)
  137.       throws Exception {
  138.     this.setName("websocket_client");

  139.     this.pingWaitMillis = pingWaitMillis;
  140.     LOGGER.config(() -> String.format("[%s] pingWaitMillis=%s", getName(), this.pingWaitMillis));

  141.     this.setAnyMessageIntervalMillis(anyMessageIntervalMillis);
  142.     LOGGER
  143.         .config(() -> String.format("[%s] anyMessageIntervalMillis=%s", getName(), this.getAnyMessageIntervalMillis()));

  144.     this.setPingIntervalMillis(pingIntervalMillis);
  145.     LOGGER
  146.         .config(() -> String.format("[%s] pingIntervalMillis=%s", getName(), this.getPingIntervalMillis()));

  147.     this.listener = listener;

  148.     this.endpoint = endpoint;
  149.     LOGGER.config(() -> String.format("[%s] url=%s", getName(), endpoint));

  150.     this.attempts = attempts;
  151.     LOGGER.config(() -> String.format("[%s] attempts=%s", getName(), this.attempts));

  152.     this.timeoutMillis = timeoutMillis;
  153.     LOGGER.config(() -> String.format("[%s] timeoutMillis=%s", getName(), this.timeoutMillis));

  154.     this.retryOnClose = retryOnClose;
  155.     LOGGER.config(() -> String.format("[%s] retryOnClose=%s", getName(), this.retryOnClose));
  156.   }

  157.   /**
  158.    * Constructs the client
  159.    *
  160.    * @param endpoint the URI to connect to
  161.    * @param listener a WebSocketListener to handle incoming messages
  162.    * @throws Exception thread interrupt or connection failure
  163.    */
  164.   public WebSocketClient(URI endpoint, WebSocketListener listener) throws Exception {
  165.     this(endpoint, listener, DEFAULT_ATTEMPTS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_RETRY_ON_CLOSE,
  166.         DEFAULT_PING_INTERVAL_MILLIS, DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
  167.   }

  168.   private void anyMessageClose() {
  169.     try {
  170.       if (this.session.isOpen()) {
  171.         session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
  172.       } else {
  173.         onClose(this.session, new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
  174.       }
  175.     } catch (Exception e) {
  176.     }
  177.   }

  178.   /**
  179.    * Called by Websocket interface when a Pong is received in response to a ping
  180.    * that was sent.
  181.    *
  182.    * @param pongMessage Message that was populated from ping.
  183.    * @param session     The websocket session.
  184.    */
  185.   @OnMessage
  186.   public void catchPong(PongMessage pongMessage, Session session) {
  187.     this.pongReceived = true;
  188.   }

  189.   /**
  190.    * Connect to server
  191.    *
  192.    * @throws Exception if error occurs
  193.    */
  194.   public void connect() throws Exception {
  195.     // try to connect to server
  196.     WebSocketContainer container = ContainerProvider.getWebSocketContainer();
  197.     int failedAttempts = 0;
  198.     Exception lastExcept = null;
  199.     for (int i = 0; i < attempts; i++) {
  200.       try {
  201.         int connectAttempt = i + 1;
  202.         LOGGER
  203.             .info(() -> String.format("[%s] attempt %s out of %s connecting to %s", getName(), connectAttempt, attempts,
  204.                 this.endpoint.toString()));
  205.         container.connectToServer(this, endpoint);
  206.         break;
  207.       } catch (Exception e) {
  208.         // increment failed attempts, sleep
  209.         failedAttempts++;
  210.         lastExcept = e;
  211.         // Sleep longer and longer between attempts up to a max of one minute
  212.         long sleepInterval = Math.min(60000, (failedAttempts * timeoutMillis));
  213.         LOGGER.info(() -> String.format("[%s] failed to connect to %s, retrying in %s", getName(),
  214.             this.endpoint.toString(), sleepInterval));
  215.         Thread.sleep(sleepInterval);
  216.       }
  217.     }

  218.     // throw connect exception if all attempts fail
  219.     if (failedAttempts == attempts) {
  220.       this.listener.onConnectFail();
  221.       throw lastExcept;
  222.     }
  223.   }

  224.   /**
  225.    * If any message at all, which would include heartbeats, has been received
  226.    * since the last anymessage interval this value will be true.
  227.    *
  228.    * @return
  229.    */
  230.   private boolean hasReceivedAnyMessage() {
  231.     return this.anyMessageReceived;
  232.   }

  233.   /**
  234.    * Checks if there is an open session
  235.    *
  236.    * @return boolean
  237.    * @throws IOException if IO error occurs
  238.    */
  239.   public boolean isConnected() throws IOException {
  240.     return this.session != null && this.session.isOpen();
  241.   }

  242.   /**
  243.    * Sets the session and listener
  244.    *
  245.    * @param session Session
  246.    * @throws IOException if IO error occurs
  247.    */
  248.   @OnOpen
  249.   public void onOpen(Session session) throws IOException {
  250.     this.session = session;
  251.     if (pingIntervalMillis > 0) {
  252.       pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
  253.     }
  254.     if (anyMessageIntervalMillis > 0) {
  255.       startAnyMessageReceived();
  256.     }
  257.     this.listener.onOpen(session);
  258.     this.anyMessageReceived = false;
  259.   }

  260.   /**
  261.    * Closes the session on the listener, sets constructor session to null Check if
  262.    * should be retryed
  263.    *
  264.    * @param session Session
  265.    * @param reason  for close
  266.    * @throws IOException if IO error occurs
  267.    */
  268.   @OnClose
  269.   public void onClose(Session session, CloseReason reason) throws IOException {
  270.     this.listener.onClose(session, reason);
  271.     this.session = null;
  272.     pingSent = false;
  273.     pongReceived = false;
  274.     if (pingTask != null) {
  275.       pingTask.cancel(true);
  276.     }
  277.     if (anyMessageTask != null) {
  278.       anyMessageTask.cancel(true);
  279.     }
  280.     if (retryOnClose) {
  281.       try {
  282.         this.connect();
  283.       } catch (Exception e) {
  284.         // failed to reconnect
  285.         this.listener.onReconnectFail();
  286.         // propagate this failure
  287.         throw new IOException(e);
  288.       }
  289.     }
  290.   }

  291.   /**
  292.    * Gives listener the message
  293.    *
  294.    * @param message String
  295.    * @throws IOException if IO error occurs
  296.    */
  297.   @OnMessage
  298.   public void onMessage(String message) throws IOException {
  299.     this.anyMessageReceived = true;
  300.     if (this.anyMessageIntervalMillis > 0) {
  301.       startAnyMessageReceived();
  302.     }
  303.     this.listener.onMessage(message);
  304.   }

  305.   /**
  306.    * If a ping has failed this method will close the session in the proper manner.
  307.    *
  308.    * @param session
  309.    * @param pingFailMessage
  310.    */
  311.   private void pingFailClose(Session session, String pingFailMessage) {
  312.     if (pingFailMessage.isBlank()) {
  313.       pingFailMessage = "Pong not received!";
  314.     }
  315.     // String for CloseReason has to be lest than 123 bytes.
  316.     if (pingFailMessage.length() > 90) {
  317.       pingFailMessage = pingFailMessage.substring(0, 90);
  318.     }
  319.     try {
  320.       if (session.isOpen()) {
  321.         session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
  322.       } else {
  323.         onClose(session,
  324.             new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
  325.       }
  326.     } catch (Exception e) {
  327.     }
  328.     pingFailMessage = "";
  329.   }

  330.   @Override
  331.   public void run() {
  332.     if (!pingSent) {
  333.       // Add timestamp to ping message that could be evaluated when pong is received
  334.       this.timePingSentMillis = System.currentTimeMillis();
  335.       byte[] data = ("" + this.timePingSentMillis).getBytes();
  336.       try {
  337.         pongReceived = false;
  338.         // Set pingSent=true here because the pingTask will always be started and the
  339.         // failure
  340.         // will occur when the pong has not been received even if the ping fails to
  341.         // send.
  342.         pingSent = true;
  343.         this.session.getAsyncRemote().sendPing(ByteBuffer.wrap(data));

  344.       } catch (Exception e) {
  345.         // This message is used to indicate that the failure was actually in the sending
  346.         // of the ping.
  347.         pingFailMessage = "Sending Ping failed.  E=" + e.getLocalizedMessage();
  348.       }

  349.       pingTask = this.scheduledExector.schedule(this, pingWaitMillis, TimeUnit.MILLISECONDS);
  350.     } else {
  351.       if (!pongReceived) {
  352.         pingFailClose(session, pingFailMessage);
  353.       } else {
  354.         pingSent = false;
  355.         pongReceived = false;
  356.         pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
  357.       }
  358.     }
  359.   }

  360.   @Override
  361.   public void configure(Config config) throws Exception {
  362.     this.pingWaitMillis = Long.parseLong(
  363.         config.getProperty(WebSocketClient.PING_WAIT_MILLIS_PROPERTY,
  364.             String.valueOf(WebSocketClient.DEFAULT_PING_WAIT_MILLIS)));
  365.     LOGGER.config(() -> String.format("[%s] pingWaitMillis=%s", getName(), this.pingWaitMillis));

  366.     long anyMessageIntervalMillis = Long.parseLong(
  367.         config.getProperty(WebSocketClient.ANY_MESSAGE_INTERVAL_MILLIS_PROPERTY,
  368.             String.valueOf(WebSocketClient.DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS)));
  369.     this.setAnyMessageIntervalMillis(anyMessageIntervalMillis);

  370.     LOGGER
  371.         .config(() -> String.format("[%s] anyMessageIntervalMillis=%s", getName(), this.getAnyMessageIntervalMillis()));

  372.     long pingIntervalMillis = Long.parseLong(
  373.         config.getProperty(WebSocketClient.PING_INTERVAL_MILLIS_PROPERTY,
  374.             String.valueOf(WebSocketClient.DEFAULT_PING_INTERVAL_MILLIS)));
  375.     this.setPingIntervalMillis(pingIntervalMillis);
  376.     LOGGER
  377.         .config(() -> String.format("[%s] pingIntervalMillis=%s", getName(), this.getPingIntervalMillis()));

  378.     String uri = config.getProperty(WebSocketClient.URI_PROPERTY);
  379.     if (Objects.isNull(uri)) {
  380.       throw new ConfigurationException(
  381.           String.format("[%s] missing required property %s", this.getName(), WebSocketClient.URI_PROPERTY));
  382.     }
  383.     try {
  384.       this.endpoint = new URI(uri);
  385.     } catch (Exception e) {
  386.       throw new ConfigurationException(
  387.           String.format("[%s] invalid url given for %s", this.getName(), WebSocketClient.URI_PROPERTY));
  388.     }

  389.     LOGGER.config(() -> String.format("[%s] url=%s", getName(), uri));

  390.     this.attempts = Integer.parseInt(
  391.         config.getProperty(WebSocketClient.CONNECT_RETRIES_PROPERTY,
  392.             String.valueOf(WebSocketClient.DEFAULT_ATTEMPTS)));

  393.     LOGGER.config(() -> String.format("[%s] attempts=%s", getName(), this.attempts));

  394.     this.timeoutMillis = Long.parseLong(config.getProperty(WebSocketClient.CONNECT_TIMEOUT_PROPERTY,
  395.         String.valueOf(WebSocketClient.DEFAULT_TIMEOUT_MILLIS)));
  396.     LOGGER.config(() -> String.format("[%s] timeoutMillis=%s", getName(), this.timeoutMillis));

  397.     this.retryOnClose = Boolean.parseBoolean(config.getProperty(WebSocketClient.RETRY_ON_CLOSE_PROPERTY,
  398.         String.valueOf(WebSocketClient.DEFAULT_RETRY_ON_CLOSE)));
  399.     LOGGER.config(() -> String.format("[%s] retryOnClose=%s", getName(), this.retryOnClose));
  400.   }

  401.   /**
  402.    * Connect the client
  403.    *
  404.    * @throws Exception
  405.    */
  406.   @Override
  407.   public void startup() throws Exception {
  408.     this.connect();
  409.   }

  410.   /**
  411.    * Sets retry to false, then closes session
  412.    *
  413.    * @throws Exception if error occurs
  414.    */
  415.   @Override
  416.   public void shutdown() throws Exception {
  417.     this.retryOnClose = false;
  418.     this.session.close();
  419.     if (this.scheduledExector != null) {
  420.       this.scheduledExector.shutdownNow();
  421.     }
  422.   }

  423.   /** @param listener set WebSocketListener */
  424.   public void setListener(WebSocketListener listener) {
  425.     this.listener = listener;
  426.   }

  427.   /**
  428.    * Start a task to check if any message has been received during the
  429.    * anyMessageIntervalMillis time period.
  430.    */
  431.   private void startAnyMessageReceived() {
  432.     anyMessageReceived = false;
  433.     if (anyMessageTask != null) {
  434.       anyMessageTask.cancel(false);
  435.     }
  436.     anyMessageTask = this.scheduledExector.schedule(new AnyMessageRunner(this), anyMessageIntervalMillis,
  437.         TimeUnit.MILLISECONDS);
  438.   }

  439.   /**
  440.    * This class is used to asyncronously handle tracking if any message has been
  441.    * received.
  442.    */
  443.   private class AnyMessageRunner implements Runnable {
  444.     private WebSocketClient webSocketClient;

  445.     public AnyMessageRunner(WebSocketClient webSocketClient) {
  446.       this.webSocketClient = webSocketClient;
  447.     }

  448.     @Override
  449.     public void run() {
  450.       if (!webSocketClient.hasReceivedAnyMessage()) {
  451.         webSocketClient.anyMessageClose();
  452.       } else {
  453.         webSocketClient.startAnyMessageReceived();
  454.       }
  455.     }
  456.   }

  457.   public long getAnyMessageIntervalMillis() {
  458.     return anyMessageIntervalMillis;
  459.   }

  460.   public int getAttempts() {
  461.     return attempts;
  462.   }

  463.   public URI getEndpoint() {
  464.     return this.endpoint;
  465.   }

  466.   public long getPingIntervalMillis() {
  467.     return this.pingIntervalMillis;
  468.   }

  469.   public long getPingWaitMillis() {
  470.     return pingWaitMillis;
  471.   }

  472.   public ScheduledExecutorService getScheduledExector() {
  473.     return scheduledExector;
  474.   }

  475.   public long getTimeoutMillis() {
  476.     return timeoutMillis;
  477.   }

  478.   public boolean isRetryOnClose() {
  479.     return retryOnClose;
  480.   }

  481.   public void setPingIntervalMillis(long pingIntervalMillis) throws Exception {
  482.     if (this.isConnected()) {
  483.       throw new ConfigurationException("Can not change pingIntervalMillis after client is connected");
  484.     }
  485.     this.pingIntervalMillis = pingIntervalMillis;

  486.     if (this.pingIntervalMillis > 0 && this.scheduledExector == null) {
  487.       this.scheduledExector = Executors.newSingleThreadScheduledExecutor();
  488.     }
  489.   }

  490.   public void setAnyMessageIntervalMillis(long anyMessageIntervalMillis) throws Exception {
  491.     if (this.isConnected()) {
  492.       throw new ConfigurationException("Can not change anyMessageIntervalMillis after client is connected");
  493.     }
  494.     this.anyMessageIntervalMillis = anyMessageIntervalMillis;

  495.     if (this.anyMessageIntervalMillis > 0 && this.scheduledExector == null) {
  496.       this.scheduledExector = Executors.newSingleThreadScheduledExecutor();
  497.     }
  498.   }

  499. }