NATSStreamingNotificationReceiver.java
package gov.usgs.earthquake.nats;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
import gov.usgs.earthquake.distribution.URLNotification;
import gov.usgs.earthquake.distribution.URLNotificationJSONConverter;
import gov.usgs.util.Config;
import gov.usgs.util.FileUtils;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Connects directly to a NATS streaming server to receive notifications using a
* NATSClient
*/
public class NATSStreamingNotificationReceiver extends DefaultNotificationReceiver implements MessageHandler {
private static final Logger LOGGER = Logger.getLogger(DefaultNotificationReceiver.class.getName());
/** Property for tracking file name */
public static String TRACKING_FILE_NAME_PROPERTY = "trackingFile";
/** Property on if update sequence should occur after exception */
public static String UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "updateSequenceAfterException";
/** Property for sequence */
public static String SEQUENCE_PROPERTY = "sequence";
/** Name of deafult tracking file */
public static String DEFAULT_TRACKING_FILE_NAME_PROPERTY = "data/STANReceiverInfo.json";
/** Default state of update after exception */
public static String DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY = "true";
private NATSClient client = new NATSClient();
private Subscription subscription;
private String subject;
private long sequence = 0;
private String trackingFileName;
private boolean updateSequenceAfterException;
private boolean exceptionThrown = false;
/**
* Configures receiver based on included properties
*
* @param config The user-defined configuration
*
* @throws Exception If required properties are ignored
*/
@Override
public void configure(Config config) throws Exception {
super.configure(config);
client.configure(config);
subject = config.getProperty(NATSClient.SUBJECT_PROPERTY);
if (subject == null) {
throw new ConfigurationException(NATSClient.SUBJECT_PROPERTY + " is a required parameter");
}
trackingFileName = config.getProperty(TRACKING_FILE_NAME_PROPERTY, DEFAULT_TRACKING_FILE_NAME_PROPERTY);
updateSequenceAfterException = Boolean.parseBoolean(
config.getProperty(UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY, DEFAULT_UPDATE_SEQUENCE_AFTER_EXCEPTION_PROPERTY));
}
/**
* Does initial tracking file management and subscribes to server With a
* tracking file, gets the last sequence
*
* @throws InterruptedException if interrupted
* @throws IOException if IO error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
// Start client
client.startup();
// Check properties if tracking file exists
JsonObject properties = readTrackingFile();
if (properties != null && properties.getString(NATSClient.SERVER_HOST_PROPERTY).equals(client.getServerHost())
&& properties.getString(NATSClient.SERVER_PORT_PROPERTY).equals(client.getServerPort())
&& properties.getString(NATSClient.CLUSTER_ID_PROPERTY).equals(client.getClusterId())
&& properties.getString(NATSClient.CLIENT_ID_PROPERTY).equals(client.getClientId())
&& properties.getString(NATSClient.SUBJECT_PROPERTY).equals(subject)) {
sequence = Long.parseLong(properties.get(SEQUENCE_PROPERTY).toString());
}
subscription = client.getConnection().subscribe(subject, this,
new SubscriptionOptions.Builder().startAtSequence(sequence).build());
// Always starts at stored sequence; initialized to 0 and overwritten by storage
}
/**
* Closes subscription/connection and writes state in tracking file Wraps each
* statement in a try/catch to ensure each step still happens
*
* @throws IOException if IO error occurs
* @throws InterruptedException if interrupted
* @throws TimeoutException if timeout
*/
@Override
public void shutdown() throws Exception {
try {
writeTrackingFile();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] failed to write to tracking file");
}
try {
subscription.unsubscribe();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] failed to unsubscribe from NATS channel");
}
subscription = null;
client.shutdown();
super.shutdown();
}
/**
* Writes pertinent configuration information to tracking file
*
* @throws Exception if error occurs
*/
public void writeTrackingFile() throws Exception {
JsonObject json = Json.createObjectBuilder().add(NATSClient.SERVER_HOST_PROPERTY, client.getServerHost())
.add(NATSClient.SERVER_PORT_PROPERTY, client.getServerPort())
.add(NATSClient.CLUSTER_ID_PROPERTY, client.getClusterId())
.add(NATSClient.CLIENT_ID_PROPERTY, client.getClientId()).add(NATSClient.SUBJECT_PROPERTY, subject)
.add(SEQUENCE_PROPERTY, sequence).build();
FileUtils.writeFileThenMove(new File(trackingFileName + "_tmp"), new File(trackingFileName),
json.toString().getBytes());
}
/**
* Reads contents of tracking file
*
* @return JsonObject containing tracking file contents, or null if file doesn't
* exist
* @throws Exception if error occurs
*/
public JsonObject readTrackingFile() throws Exception {
JsonObject json = null;
File trackingFile = new File(trackingFileName);
if (trackingFile.exists()) {
InputStream contents = new ByteArrayInputStream(FileUtils.readFile(trackingFile));
JsonReader jsonReader = Json.createReader(contents);
json = jsonReader.readObject();
jsonReader.close();
}
return json;
}
/**
* Defines behavior for message receipt. Attempts to process notifications, with
* configurable behavior for exception handling
*
* @param message The message received from the STAN server
*/
@Override
public void onMessage(Message message) {
try {
// parse message, send to listeners
URLNotification notification = URLNotificationJSONConverter
.parseJSON(new ByteArrayInputStream(message.getData()));
receiveNotification(notification);
// update sequence and tracking file if exception not thrown or we still want to
// update sequence anyway
if (!exceptionThrown || updateSequenceAfterException) {
sequence = message.getSequence();
writeTrackingFile();
}
} catch (Exception e) {
exceptionThrown = true;
LOGGER.log(Level.WARNING,
"[" + getName() + "] exception handling NATSStreaming message."
+ (!updateSequenceAfterException ? " Will no longer update sequence; restart PDL to reprocess." : "")
+ " Stack Trace: " + e);
LOGGER.log(Level.FINE, "[" + getName() + "] Message: " + message.getData());
}
}
/** @return trackingFileName */
public String getTrackingFileName() {
return trackingFileName;
}
/** @param trackingFileName to set */
public void setTrackingFileName(String trackingFileName) {
this.trackingFileName = trackingFileName;
}
/** @return NATSClient */
public NATSClient getClient() {
return client;
}
/** @param client NATSClient to set */
public void setClient(NATSClient client) {
this.client = client;
}
/** @return subject */
public String getSubject() {
return subject;
}
/** @param subject to set */
public void setSubject(String subject) {
this.subject = subject;
}
}