WebSocketClient.java
package gov.usgs.earthquake.distribution;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;
import java.nio.ByteBuffer;
/**
* Manages a simple connection to a websocket. Can also be overridden for more
* complex behavior.
*/
@ClientEndpoint
public class WebSocketClient implements Runnable {
private Session session;
private URI endpoint;
private WebSocketListener listener;
private int attempts;
private long timeoutMillis;
private boolean retryOnClose;
private ScheduledExecutorService scheduledExector;
private long pingIntervalMillis = 0;
private long pingWaitMillis = 0;
private boolean pingSent = false;
private boolean pongReceived = false;
private long timePingSentMillis;
private ScheduledFuture<?> pingTask;
private String pingFailMessage = "";
private ScheduledFuture<?> anyMessageTask;
private long anyMessageIntervalMillis = 0;
private boolean anyMessageReceived = false;
// Default Attempts and Timeout have been set to values that essentially mean
// the client will continue to retry "forever".
// This is the most common way the client will probably be run, as a continual
// process meant to be up at all times.
/** Default number of attempts */
public static final int DEFAULT_ATTEMPTS = 100000;
/** Default timeout in ms */
public static final long DEFAULT_TIMEOUT_MILLIS = 500;
/** Default for trying to retry on close */
public static final boolean DEFAULT_RETRY_ON_CLOSE = true;
/** Default for time in between pings to server */
public static final long DEFAULT_PING_INTERVAL_MILLIS = 15000;
/**
* Default for how long to wait for pong response to ping before closing or
* restarting connection
*/
public static final long DEFAULT_PING_WAIT_MILLIS = 4000;
/** Default for how long to wait for any Message(excluding Pongs) */
// A value of 0 disables this function
public static final long DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = 0;
/**
* Constructs the client. Also connects to the server.
*
* @param endpoint the URI to connect to
* @param listener a WebSocketListener to handle incoming messages
* @param attempts an integer number of times to try the connection
* @param timeoutMillis a long for the wait time between attempts
* @param retryOnClose boolean for if the connection should retry when closed
* @throws Exception on thread interrupt or connection failure
* @deprecated use
* {@link #WebSocketClient(URI, WebSocketListener, int, long, boolean, long, long, long)},
* includes ping configuration
*/
@Deprecated
public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
boolean retryOnClose) throws Exception {
this(endpoint, listener, attempts, timeoutMillis, retryOnClose, DEFAULT_PING_INTERVAL_MILLIS,
DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
}
/**
* Creates a Websocket Client Default values for attempts ant timeoutMillis
* create an instance which is designed to be up and running at all times
*
* @param endpoint the URI to connect to
* @param listener a WebSocketListener to handle incoming
* messages
* @param attempts an integer number of times to try the
* connection
* @param timeoutMillis a long for the wait time between attempts
* @param retryOnClose boolean for if the connection should retry
* when closed
* @param pingIntervalMillis how often to send ping in milliseconds. If
* you don't want to send pings set to 0 or
* negative value.
* @param pingWaitMillis how long to wait, in milliseconds, before
* declaring socket down and closing and
* retrying if retryOnClose is set.
* @param anyMessageIntervalMillis how often to check if any message has
* arrived(excluding Pongs) restart if one has
* not.
* @throws Exception on thread interrupt or connection failure
*/
public WebSocketClient(URI endpoint, WebSocketListener listener, int attempts, long timeoutMillis,
boolean retryOnClose, long pingIntervalMillis, long pingWaitMillis, long anyMessageIntervalMillis)
throws Exception {
this.pingIntervalMillis = pingIntervalMillis;
this.pingWaitMillis = pingWaitMillis;
this.anyMessageIntervalMillis = anyMessageIntervalMillis;
if (this.pingIntervalMillis > 0 || this.anyMessageIntervalMillis > 0) {
this.scheduledExector = Executors.newSingleThreadScheduledExecutor();
}
this.listener = listener;
this.endpoint = endpoint;
this.attempts = attempts;
this.timeoutMillis = timeoutMillis;
this.retryOnClose = retryOnClose;
connect();
}
/**
* Constructs the client
*
* @param endpoint the URI to connect to
* @param listener a WebSocketListener to handle incoming messages
* @throws Exception thread interrupt or connection failure
*/
public WebSocketClient(URI endpoint, WebSocketListener listener) throws Exception {
this(endpoint, listener, DEFAULT_ATTEMPTS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_RETRY_ON_CLOSE,
DEFAULT_PING_INTERVAL_MILLIS, DEFAULT_PING_WAIT_MILLIS, DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS);
}
private void anyMessageClose() {
try {
if (this.session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
} else {
onClose(this.session, new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Any Message Failed!"));
}
} catch (Exception e) {
}
}
/**
* Called by Websocket interface when a Pong is received in response to a ping
* that was sent.
*
* @param pongMessage Message that was populated from ping.
* @param session The websocket session.
*/
@OnMessage
public void catchPong(PongMessage pongMessage, Session session) {
this.pongReceived = true;
}
/**
* Connect to server
*
* @throws Exception if error occurs
*/
public void connect() throws Exception {
// try to connect to server
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
int failedAttempts = 0;
Exception lastExcept = null;
for (int i = 0; i < attempts; i++) {
try {
container.connectToServer(this, endpoint);
break;
} catch (Exception e) {
// increment failed attempts, sleep
failedAttempts++;
lastExcept = e;
Thread.sleep(timeoutMillis);
}
}
// throw connect exception if all attempts fail
if (failedAttempts == attempts) {
this.listener.onConnectFail();
throw lastExcept;
}
}
/**
* If any message at all, which would include heartbeats, has been received
* since the last anymessage interval this value will be true.
*
* @return
*/
private boolean hasReceivedAnyMessage() {
return this.anyMessageReceived;
}
/**
* Checks if there is an open session
*
* @return boolean
* @throws IOException if IO error occurs
*/
public boolean isConnected() throws IOException {
return this.session != null && this.session.isOpen();
}
/**
* Sets the session and listener
*
* @param session Session
* @throws IOException if IO error occurs
*/
@OnOpen
public void onOpen(Session session) throws IOException {
this.session = session;
if (pingIntervalMillis > 0) {
pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
}
if (anyMessageIntervalMillis > 0) {
startAnyMessageReceived();
}
this.listener.onOpen(session);
this.anyMessageReceived = false;
}
/**
* Closes the session on the lister, sets constructor session to null Check if
* should be retryed
*
* @param session Session
* @param reason for close
* @throws IOException if IO error occurs
*/
@OnClose
public void onClose(Session session, CloseReason reason) throws IOException {
this.listener.onClose(session, reason);
this.session = null;
pingSent = false;
pongReceived = false;
if (pingTask != null) {
pingTask.cancel(true);
}
if (anyMessageTask != null) {
anyMessageTask.cancel(true);
}
if (retryOnClose) {
try {
this.connect();
} catch (Exception e) {
// failed to reconnect
this.listener.onReconnectFail();
}
}
}
/**
* Gives listener the message
*
* @param message String
* @throws IOException if IO error occurs
*/
@OnMessage
public void onMessage(String message) throws IOException {
this.anyMessageReceived = true;
if (this.anyMessageIntervalMillis > 0) {
startAnyMessageReceived();
}
this.listener.onMessage(message);
}
/**
* If a ping has failed this method will close the session in the proper manner.
*
* @param session
* @param pingFailMessage
*/
private void pingFailClose(Session session, String pingFailMessage) {
if (pingFailMessage.isBlank()) {
pingFailMessage = "Pong not received!";
}
// String for CloseReason has to be lest than 123 bytes.
if (pingFailMessage.length() > 90) {
pingFailMessage = pingFailMessage.substring(0, 90);
}
try {
if (session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
} else {
onClose(session,
new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, "Ping/Pong Failed! " + pingFailMessage));
}
} catch (Exception e) {
}
pingFailMessage = "";
}
@Override
public void run() {
if (!pingSent) {
// Add timestamp to ping message that could be evaluated when pong is received
this.timePingSentMillis = System.currentTimeMillis();
byte[] data = ("" + this.timePingSentMillis).getBytes();
try {
pongReceived = false;
// Set pingSent=true here because the pingTask will always be started and the
// failure
// will occur when the pong has not been received even if the ping fails to
// send.
pingSent = true;
this.session.getAsyncRemote().sendPing(ByteBuffer.wrap(data));
} catch (Exception e) {
// This message is used to indicate that the failure was actually in the sending
// of the ping.
pingFailMessage = "Sending Ping failed. E=" + e.getLocalizedMessage();
}
pingTask = this.scheduledExector.schedule(this, pingWaitMillis, TimeUnit.MILLISECONDS);
} else {
if (!pongReceived) {
pingFailClose(session, pingFailMessage);
} else {
pingSent = false;
pongReceived = false;
pingTask = this.scheduledExector.schedule(this, pingIntervalMillis, TimeUnit.MILLISECONDS);
}
}
}
/**
* Sets retry to false, then closes session
*
* @throws Exception if error occurs
*/
public void shutdown() throws Exception {
this.retryOnClose = false;
this.session.close();
if (this.scheduledExector != null) {
this.scheduledExector.shutdownNow();
}
}
/** @param listener set WebSocketListener */
public void setListener(WebSocketListener listener) {
this.listener = listener;
}
/**
* Start a task to check if any message has been received during the
* anyMessageIntervalMillis time period.
*/
private void startAnyMessageReceived() {
anyMessageReceived = false;
if (anyMessageTask != null) {
anyMessageTask.cancel(false);
}
anyMessageTask = this.scheduledExector.schedule(new AnyMessageRunner(this), anyMessageIntervalMillis,
TimeUnit.MILLISECONDS);
}
/**
* This class is used to asyncronously handle tracking if any message has been
* received.
*/
private class AnyMessageRunner implements Runnable {
private WebSocketClient webSocketClient;
public AnyMessageRunner(WebSocketClient webSocketClient) {
this.webSocketClient = webSocketClient;
}
@Override
public void run() {
if (!webSocketClient.hasReceivedAnyMessage()) {
webSocketClient.anyMessageClose();
} else {
webSocketClient.startAnyMessageReceived();
}
}
}
}