WebSocketNotificationReceiver.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.util.Config;
  3. import gov.usgs.util.FileUtils;
  4. import gov.usgs.util.StreamUtils;

  5. import javax.json.Json;
  6. import javax.json.JsonObject;
  7. import javax.json.JsonReader;
  8. import javax.websocket.CloseReason;
  9. import javax.websocket.Session;
  10. import java.io.ByteArrayInputStream;
  11. import java.io.File;
  12. import java.io.InputStream;
  13. import java.net.URI;
  14. import java.util.logging.Level;
  15. import java.util.logging.Logger;

  16. /**
  17.  * Receives notifications from an arbitrary web socket.
  18.  */
  19. public class WebSocketNotificationReceiver extends DefaultNotificationReceiver implements WebSocketListener {

  20.   /** Logger for use in the file */
  21.   public static final Logger LOGGER = Logger.getLogger(WebSocketNotificationReceiver.class.getName());

  22.   /** Property for serverHost */
  23.   public static final String SERVER_HOST_PROPERTY = "serverHost";
  24.   /** Property for serverPort */
  25.   public static final String SERVER_PORT_PROPERTY = "serverPort";
  26.   /** Property for serverPath */
  27.   public static final String SERVER_PATH_PROPERTY = "serverPath";
  28.   /** Property for sequence */
  29.   public static final String SEQUENCE_PROPERTY = "sequence";
  30.   /** Property for timestamp */
  31.   public static final String TIMESTAMP_PROPERTY = "timestamp";
  32.   /** Property for trackingFileName */
  33.   public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
  34.   /** Property for connectAttempts */
  35.   public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
  36.   /** Property for connectTimeout */
  37.   public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  38.   /** Property for retryOnClose */
  39.   public static final String RETRY_ON_CLOSE_PROPERTY = "retryOnClose";
  40.   /** Variable for pingIntervalMillis string */
  41.   public static final String PING_INTERVAL = "pingInterval";
  42.   /** Variable for pingWait string */
  43.   public static final String PING_WAIT = "pingWait";
  44.   /** Variable for anyMessageIntervalMillis string */
  45.   public static final String ANY_MESSAGE_INTERVAL = "anyMessageInterval";

  46.   /** Default server host */
  47.   public static final String DEFAULT_SERVER_HOST = "http://www.google.com";
  48.   /** Default server port */
  49.   public static final String DEFAULT_SERVER_PORT = "4222";
  50.   /** Default server path */
  51.   public static final String DEFAULT_SERVER_PATH = "/sequence/";
  52.   /** Default tracking file */
  53.   public static final String DEFAULT_TRACKING_FILE_NAME = "data/WebSocketReceiverInfo";
  54.   /** Default number of connect attempts */
  55.   public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
  56.   /** Default timeout in ms */
  57.   public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
  58.   /** Default condiction for retry on close */
  59.   public static final String DEFAULT_RETRY_ON_CLOSE = "true";
  60.   /** attribute for data */
  61.   public static final String ATTRIBUTE_DATA = "data";
  62.   /** Default for time in between pings to server */
  63.   public static final String DEFAULT_PING_INTERVAL_MILLIS = "15000";
  64.   /**
  65.    * Default for how long to wait for pong response to ping before closing or
  66.    * restarting connection
  67.    */
  68.   public static final String DEFAULT_PING_WAIT_MILLIS = "4000";
  69.   /** Default for interval for any message */
  70.   public static final String DEFAULT_ANY_MESSAGE_INTERVAL_MILLIS = "0";

  71.   private String serverHost;
  72.   private String serverPort;
  73.   private String serverPath;
  74.   private String trackingFileName;
  75.   private int attempts;
  76.   private long timeout;
  77.   private long pingIntervalMillis;
  78.   private long pingWaitMillis;
  79.   private long anyMessageIntervalMillis;

  80.   private WebSocketClient client;
  81.   private String sequence = "0";

  82.   @Override
  83.   public void configure(Config config) throws Exception {
  84.     super.configure(config);

  85.     serverHost = config.getProperty(SERVER_HOST_PROPERTY, DEFAULT_SERVER_HOST);
  86.     serverPort = config.getProperty(SERVER_PORT_PROPERTY, DEFAULT_SERVER_PORT);
  87.     serverPath = config.getProperty(SERVER_PATH_PROPERTY, DEFAULT_SERVER_PATH);
  88.     attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
  89.     timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
  90.     trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME);
  91.     pingIntervalMillis = Long.parseLong(config.getProperty(PING_INTERVAL, DEFAULT_PING_INTERVAL_MILLIS));
  92.     pingWaitMillis = Long.parseLong(config.getProperty(PING_WAIT, DEFAULT_PING_WAIT_MILLIS));
  93.   }

  94.   /**
  95.    * Reads a sequence from a tracking file if it exists. Otherwise, starting
  96.    * sequence is 0. Connects to web socket
  97.    *
  98.    * @throws Exception if error occurs
  99.    */
  100.   @Override
  101.   public void startup() throws Exception {
  102.     super.startup();

  103.     // read sequence from tracking file if other parameters agree
  104.     JsonObject json = readTrackingFile();
  105.     if (json != null && json.getString(SERVER_HOST_PROPERTY).equals(serverHost)
  106.         && json.getString(SERVER_PORT_PROPERTY).equals(serverPort)
  107.         && json.getString(SERVER_PATH_PROPERTY).equals(serverPath)) {
  108.       sequence = json.getString(SEQUENCE_PROPERTY);
  109.     }

  110.     // open websocket
  111.     client = new WebSocketClient(new URI(serverHost + ":" + serverPort + serverPath + sequence), this, attempts,
  112.         timeout, true, pingIntervalMillis, pingWaitMillis, anyMessageIntervalMillis);
  113.   }

  114.   /**
  115.    * Closes web socket
  116.    *
  117.    * @throws Exception if error occurs
  118.    */
  119.   @Override
  120.   public void shutdown() throws Exception {
  121.     // close socket
  122.     client.shutdown();
  123.     super.shutdown();
  124.   }

  125.   /**
  126.    * Writes tracking file to disc, storing latest sequence
  127.    *
  128.    * @throws Exception if error occurs
  129.    */
  130.   public void writeTrackingFile() throws Exception {
  131.     JsonObject json = Json.createObjectBuilder().add(SERVER_HOST_PROPERTY, serverHost)
  132.         .add(SERVER_PATH_PROPERTY, serverPath).add(SERVER_PORT_PROPERTY, serverPort).add(SEQUENCE_PROPERTY, sequence)
  133.         .build();

  134.     FileUtils.writeFileThenMove(new File(trackingFileName + "_tmp.json"), new File(trackingFileName + ".json"),
  135.         json.toString().getBytes());
  136.   }

  137.   /**
  138.    * Reads tracking file from disc
  139.    *
  140.    * @return JsonObject tracking file
  141.    * @throws Exception if error occurs
  142.    */
  143.   public JsonObject readTrackingFile() throws Exception {
  144.     JsonObject json = null;

  145.     File trackingFile = new File(trackingFileName + ".json");
  146.     if (trackingFile.exists()) {
  147.       InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
  148.       JsonReader jsonReader = Json.createReader(contents);
  149.       json = jsonReader.readObject();
  150.       jsonReader.close();
  151.     }
  152.     return json;
  153.   }

  154.   @Override
  155.   public void onOpen(Session session) {
  156.     // do nothing
  157.   }

  158.   /**
  159.    * Message handler function passed to WebSocketClient Parses the message as
  160.    * JSON, receives the contained URL notification, and writes the tracking file.
  161.    *
  162.    * @param message String
  163.    */
  164.   @Override
  165.   public void onMessage(String message) {
  166.     JsonObject json;
  167.     try (InputStream in = StreamUtils.getInputStream(message); JsonReader reader = Json.createReader(in)) {
  168.       // parse input as json
  169.       json = reader.readObject();
  170.     } catch (Exception e) {
  171.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception while receiving notification; is it encoded as JSON? ",
  172.           e);
  173.       return;
  174.     }
  175.     try {
  176.       // convert to URLNotification and receive
  177.       JsonObject dataJson = json.getJsonObject(ATTRIBUTE_DATA);
  178.       URLNotification notification = URLNotificationJSONConverter.parseJSON(dataJson);
  179.       receiveNotification(notification);

  180.       // send heartbeat
  181.       HeartbeatListener.sendHeartbeatMessage(getName(), "nats notification timestamp",
  182.           json.getString(TIMESTAMP_PROPERTY));

  183.       // write tracking file
  184.       sequence = json.getJsonNumber(SEQUENCE_PROPERTY).toString();
  185.       writeTrackingFile();
  186.     } catch (Exception e) {
  187.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing URLNotification ", e);
  188.     }
  189.   }

  190.   @Override
  191.   public void onClose(Session session, CloseReason closeReason) {
  192.     // do nothing
  193.   }

  194.   @Override
  195.   public void onConnectFail() {
  196.     // do nothing
  197.   }

  198.   @Override
  199.   public void onReconnectFail() {
  200.     // do nothing
  201.   }

  202.   /** @return serverHost */
  203.   public String getServerHost() {
  204.     return serverHost;
  205.   }

  206.   /** @param serverHost to set */
  207.   public void setServerHost(String serverHost) {
  208.     this.serverHost = serverHost;
  209.   }

  210.   /** @return serverPort */
  211.   public String getServerPort() {
  212.     return serverPort;
  213.   }

  214.   /** @param serverPort to set */
  215.   public void setServerPort(String serverPort) {
  216.     this.serverPort = serverPort;
  217.   }

  218.   /** @return serverPath */
  219.   public String getServerPath() {
  220.     return serverPath;
  221.   }

  222.   /** @param serverPath to set */
  223.   public void setServerPath(String serverPath) {
  224.     this.serverPath = serverPath;
  225.   }

  226.   /** @return trackingFileName */
  227.   public String getTrackingFileName() {
  228.     return trackingFileName;
  229.   }

  230.   /** @param trackingFileName to set */
  231.   public void setTrackingFileName(String trackingFileName) {
  232.     this.trackingFileName = trackingFileName;
  233.   }

  234.   /** @return sequence */
  235.   public String getSequence() {
  236.     return sequence;
  237.   }

  238.   /** @param sequence to set */
  239.   public void setSequence(String sequence) {
  240.     this.sequence = sequence;
  241.   }

  242.   /** @return attempts */
  243.   public int getAttempts() {
  244.     return attempts;
  245.   }

  246.   /** @param attempts to set */
  247.   public void setAttempts(int attempts) {
  248.     this.attempts = attempts;
  249.   }

  250.   /** @return timeout */
  251.   public long getTimeout() {
  252.     return timeout;
  253.   }

  254.   /** @param timeout to set */
  255.   public void setTimeout(long timeout) {
  256.     this.timeout = timeout;
  257.   }
  258. }