WebSocketNotificationReceiver.java
package gov.usgs.earthquake.distribution;
import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import gov.usgs.util.StreamUtils;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Receives notifications from an arbitrary web socket.
*/
public class WebSocketNotificationReceiver extends DefaultNotificationReceiver implements WebSocketListener {
/** Logger for use in the file */
public static final Logger LOGGER = Logger.getLogger(WebSocketNotificationReceiver.class.getName());
/** Property for serverHost */
public static final String SERVER_HOST_PROPERTY = "serverHost";
/** Property for serverPort */
public static final String SERVER_PORT_PROPERTY = "serverPort";
/** Property for serverPath */
public static final String SERVER_PATH_PROPERTY = "serverPath";
/** Property for sequence */
public static final String SEQUENCE_PROPERTY = "sequence";
/** Property for timestamp */
public static final String TIMESTAMP_PROPERTY = "timestamp";
/** Property for trackingFileName */
public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
/** Property for connectAttempts */
public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
/** Property for connectTimeout */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Property for retryOnClose */
public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";
/** Variable for pingIntervalMillis string */
public static final String PING_INTERVAL = "pingInterval";
/** Variable for pingWait string */
public static final String PING_WAIT = "pingWait";
/** Variable for anyMessageIntervalMillis string */
public static final String ANY_MESSAGE_INTERVAL = "anyMessageInterval";
/** Default server host */
public static final String DEFAULT_SERVER_HOST = "http://www.google.com";
/** Default server port */
public static final String DEFAULT_SERVER_PORT = "4222";
/** Default server path */
public static final String DEFAULT_SERVER_PATH = "/sequence/";
/** Default tracking file */
public static final String DEFAULT_TRACKING_FILE_NAME = "data/WebSocketReceiverInfo";
/** Default number of connect attempts */
public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
/** Default timeout in ms */
public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
/** Default condiction for retry on close */
public static final String DEFAULT_RETRY_ON_CLOSE = "true";
/** attribute for data */
public static final String ATTRIBUTE_DATA = "data";
/** Default for time in between pings to server */
public static final String DEFAULT_PING_INTERVAL_MILLIS = "15000";
/**
* Default for how long to wait for pong response to ping before closing or
* restarting connection
*/
public static final String DEFAULT_PING_WAIT_MILLIS = "4000";
/** Default for interval for any message */
public static final String DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = "0";
private String serverHost;
private String serverPort;
private String serverPath;
private String trackingFileName;
private int attempts;
private long timeout;
private long pingIntervalMillis;
private long pingWaitMillis;
private long anyMessageIntervalMillis;
private WebSocketClient client;
private String sequence = "0";
@Override
public void configure(Config config) throws Exception {
super.configure(config);
serverHost = config.getProperty(SERVER_HOST_PROPERTY, DEFAULT_SERVER_HOST);
serverPort = config.getProperty(SERVER_PORT_PROPERTY, DEFAULT_SERVER_PORT);
serverPath = config.getProperty(SERVER_PATH_PROPERTY, DEFAULT_SERVER_PATH);
attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME);
pingIntervalMillis = Long.parseLong(config.getProperty(PING_INTERVAL, DEFAULT_PING_INTERVAL_MILLIS));
pingWaitMillis = Long.parseLong(config.getProperty(PING_WAIT, DEFAULT_PING_WAIT_MILLIS));
}
/**
* Reads a sequence from a tracking file if it exists. Otherwise, starting
* sequence is 0. Connects to web socket
*
* @throws Exception if error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
// read sequence from tracking file if other parameters agree
JsonObject json = readTrackingFile();
if (json != null && json.getString(SERVER_HOST_PROPERTY).equals(serverHost)
&& json.getString(SERVER_PORT_PROPERTY).equals(serverPort)
&& json.getString(SERVER_PATH_PROPERTY).equals(serverPath)) {
sequence = json.getString(SEQUENCE_PROPERTY);
}
// open websocket
client = new WebSocketClient(new URI(serverHost + ":" + serverPort + serverPath + sequence), this, attempts,
timeout, true, pingIntervalMillis, pingWaitMillis, anyMessageIntervalMillis);
}
/**
* Closes web socket
*
* @throws Exception if error occurs
*/
@Override
public void shutdown() throws Exception {
// close socket
client.shutdown();
super.shutdown();
}
/**
* Writes tracking file to disc, storing latest sequence
*
* @throws Exception if error occurs
*/
public void writeTrackingFile() throws Exception {
JsonObject json = Json.createObjectBuilder().add(SERVER_HOST_PROPERTY, serverHost)
.add(SERVER_PATH_PROPERTY, serverPath).add(SERVER_PORT_PROPERTY, serverPort).add(SEQUENCE_PROPERTY, sequence)
.build();
FileUtils.writeFileThenMove(new File(trackingFileName + "_tmp.json"), new File(trackingFileName + ".json"),
json.toString().getBytes());
}
/**
* Reads tracking file from disc
*
* @return JsonObject tracking file
* @throws Exception if error occurs
*/
public JsonObject readTrackingFile() throws Exception {
JsonObject json = null;
File trackingFile = new File(trackingFileName + ".json");
if (trackingFile.exists()) {
InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
JsonReader jsonReader = Json.createReader(contents);
json = jsonReader.readObject();
jsonReader.close();
}
return json;
}
@Override
public void onOpen(Session session) {
// do nothing
}
/**
* Message handler function passed to WebSocketClient Parses the message as
* JSON, receives the contained URL notification, and writes the tracking file.
*
* @param message String
*/
@Override
public void onMessage(String message) {
JsonObject json;
try (InputStream in = StreamUtils.getInputStream(message); JsonReader reader = Json.createReader(in)) {
// parse input as json
json = reader.readObject();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] exception while receiving notification; is it encoded as JSON? ",
e);
return;
}
try {
// convert to URLNotification and receive
JsonObject dataJson = json.getJsonObject(ATTRIBUTE_DATA);
URLNotification notification = URLNotificationJSONConverter.parseJSON(dataJson);
receiveNotification(notification);
// send heartbeat
HeartbeatListener.sendHeartbeatMessage(getName(), "nats notification timestamp",
json.getString(TIMESTAMP_PROPERTY));
// write tracking file
sequence = json.getJsonNumber(SEQUENCE_PROPERTY).toString();
writeTrackingFile();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing URLNotification ", e);
}
}
@Override
public void onClose(Session session, CloseReason closeReason) {
// do nothing
}
@Override
public void onConnectFail() {
// do nothing
}
@Override
public void onReconnectFail() {
// do nothing
}
/** @return serverHost */
public String getServerHost() {
return serverHost;
}
/** @param serverHost to set */
public void setServerHost(String serverHost) {
this.serverHost = serverHost;
}
/** @return serverPort */
public String getServerPort() {
return serverPort;
}
/** @param serverPort to set */
public void setServerPort(String serverPort) {
this.serverPort = serverPort;
}
/** @return serverPath */
public String getServerPath() {
return serverPath;
}
/** @param serverPath to set */
public void setServerPath(String serverPath) {
this.serverPath = serverPath;
}
/** @return trackingFileName */
public String getTrackingFileName() {
return trackingFileName;
}
/** @param trackingFileName to set */
public void setTrackingFileName(String trackingFileName) {
this.trackingFileName = trackingFileName;
}
/** @return sequence */
public String getSequence() {
return sequence;
}
/** @param sequence to set */
public void setSequence(String sequence) {
this.sequence = sequence;
}
/** @return attempts */
public int getAttempts() {
return attempts;
}
/** @param attempts to set */
public void setAttempts(int attempts) {
this.attempts = attempts;
}
/** @return timeout */
public long getTimeout() {
return timeout;
}
/** @param timeout to set */
public void setTimeout(long timeout) {
this.timeout = timeout;
}
}