ProductsCreatedAfterClient.java
package gov.usgs.earthquake.aws;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Objects;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
/**
* Client to encapsulate REST interactions with server providing
* `products_created_after` information.
*
*/
public class ProductsCreatedAfterClient extends DefaultConfigurable {
/** Logger to log across instances */
public static Logger LOGGER = Logger.getLogger(ProductsCreatedAfterClient.class.getName());
/** static property string */
public static final String BATCH_LIMIT_PROPERTY = "batchLimit";
/**
* The template for making REST requests. This URL supports the following
* replacement strings: - {CREATED_AFTER} - {BATCH_LIMIT}
*/
public static final String URL_TEMPLATE_PROPERTY = "urlTemplate";
/** static property default value */
public static final String DEFAULT_BATCH_LIMIT = "100";
/** static property default URL */
public static final String DEFAULT_URL_TEMPLATE = "https://earthquake.usgs.gov/pdl/west/products_create_after?created_after={CREATED_AFTER}&limit={BATCH_LIMIT}";
/** Property key for connect timeout property */
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
/** Default value to use for connect timeout property if not configured */
private static final long DEFAULT_CONNECT_TIMEOUT = 5000;
/** Property key for read timeout property */
public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
/** Default value to use for read timeout property if not configured */
private static final long DEFAULT_READ_TIMEOUT = 30000;
/** Property key for max retry attempts property */
public static final String MAX_RETRY_ATTEMPTS_PROPERTY = "maxRetryAttempts";
/** Default value to use for max retry attempts property if not configured */
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 5;
/** settable internal batch limit */
protected int batchLimit;
protected HttpClient httpClient;
protected long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
protected long readTimeout = DEFAULT_READ_TIMEOUT;
protected int maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPTS;
/** list of aws product receivers */
protected List<AwsProductReceiver> receivers = new ArrayList<AwsProductReceiver>();
/** URL template string */
protected String urlTemplate = null;
/**
* default constructor
*/
public ProductsCreatedAfterClient() {
super();
}
/**
* Add the given receiver as a listener to be notified of messages.
*
* @param receiver aws product receiver
*/
public void addReceiver(final AwsProductReceiver receiver) {
synchronized (this.receivers) {
this.receivers.add(receiver);
}
}
@Override
public void configure(Config config) throws Exception {
super.configure(config);
LOGGER.config("[" + this.getName() + "] " + "Starting configuration");
this.urlTemplate = config.getProperty(URL_TEMPLATE_PROPERTY, DEFAULT_URL_TEMPLATE);
LOGGER.config("[" + this.getName() + "] " + "urlTemplate=" + this.urlTemplate);
this.batchLimit = Integer.parseInt(config.getProperty(BATCH_LIMIT_PROPERTY, DEFAULT_BATCH_LIMIT));
LOGGER.config("[" + this.getName() + "] " + "batchLimit=" + String.valueOf(this.batchLimit));
String connectTimeout = config.getProperty(CONNECT_TIMEOUT_PROPERTY);
if (Objects.nonNull(connectTimeout)) {
this.connectTimeout = Long.parseLong(connectTimeout);
LOGGER.config("[" + this.getName() + "] " + "connectTimeout=" + String.valueOf(this.connectTimeout));
}
String readTimeout = config.getProperty(READ_TIMEOUT_PROPERTY);
if (Objects.nonNull(readTimeout)) {
this.readTimeout = Long.parseLong(readTimeout);
LOGGER.config("[" + this.getName() + "] " + "readTimeout=" + String.valueOf(this.readTimeout));
}
String maxRetryAttempts = config.getProperty(MAX_RETRY_ATTEMPTS_PROPERTY);
if (Objects.nonNull(maxRetryAttempts)) {
this.maxRetryAttempts = Math.max(0, Integer.parseInt(maxRetryAttempts));
LOGGER.config("[" + this.getName() + "] " + "maxRetryAttempts=" + String.valueOf(this.maxRetryAttempts));
}
this.httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofMillis(this.connectTimeout)).build();
}
/**
* Getter
*
* @return my batch limit as an int
*/
public int getBatchLimit() {
return this.batchLimit;
}
/**
* Getter
*
* @return my url template as a string
*/
public String getUrlTemplate() {
return this.urlTemplate;
}
/**
* Notify current receives with the message.
*
* @param message json formatted message
*/
public void notify(JsonObject message) {
synchronized (this.receivers) {
for (AwsProductReceiver receiver : this.receivers) {
try {
receiver.onJsonMessage(message);
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] exception while processing message '" + message + "'",
ex);
}
}
}
}
/**
* Called on success within the running async task. Parses the raw response and
* notifies configured receivers about each message in the raw payload.
*
* @param json json formatted message
* @param durationMs duration in milliseconds
* @param createdAfter single point in time
*/
public void onResponse(final JsonObject json, final long durationMs, final Instant createdAfter) {
JsonArray notifications = json.getJsonArray("notifications");
LOGGER.info("[PCATask] Received products_create_after payload, size=" + json.toString().length()
+ " bytes, duration=" + durationMs + " ms, count=" + notifications.size());
for (final JsonValue notification : notifications) {
try {
JsonObject message = Json.createObjectBuilder().add("action", "product").add("notification", notification)
.build();
this.notify(message);
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "[PCATask] exception while processing message '" + notification.toString() + "'", ex);
}
}
// "send" a products_created_after response message as well
JsonObject productsCreatedAfter = Json.createObjectBuilder().add("action", "products_created_after")
.add("created_after", createdAfter.toString()).add("count", notifications.size()).build();
try {
this.notify(productsCreatedAfter);
} catch (Exception ex) {
ProductsCreatedAfterClient.LOGGER.log(Level.WARNING,
"[PCATask] exception while processing message '" + productsCreatedAfter.toString() + '"', ex);
}
}
/**
* Remove a receiver from the list of currently configured receivers
*
* @param receiver aws product receiver
*/
public void removeReceiver(final AwsProductReceiver receiver) {
synchronized (this.receivers) {
this.receivers.remove(receiver);
}
}
/**
* Send an async Http request
*
* @param createdAfter Instant to retrieve products after
* @throws URISyntaxException thrown if an invalid URI is created
*/
public void send(final Instant createdAfter) throws URISyntaxException {
URI uri = new URI(this.urlTemplate.replace("{CREATED_AFTER}", createdAfter.toString()).replace("{BATCH_LIMIT}",
String.valueOf(this.batchLimit)));
HttpRequest httpRequest = HttpRequest.newBuilder().uri(uri).GET().timeout(Duration.ofMillis(this.readTimeout))
.build();
sendWithRetry(httpRequest, createdAfter, 0);
}
private void sendWithRetry(HttpRequest httpRequest, final Instant createdAfter, int attempts) {
if (attempts > this.maxRetryAttempts) {
LOGGER.log(Level.WARNING, "[" + this.getName() + "] " + "Reached max number of retry attempts");
return;
}
Instant start = Instant.now();
httpClient.sendAsync(httpRequest, BodyHandlers.ofString()).whenComplete((response, error) -> {
if (Objects.isNull(error) && response.statusCode() == 200) {
try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
onResponse(reader.readObject(), Duration.between(start, Instant.now()).toMillis(), createdAfter);
} catch (Exception ex) {
LOGGER.log(Level.WARNING, () -> String
.format("[%s] " + "Failed to handle products created after request: %s", this.getName(), ex));
}
} else {
String failureReason = !Objects.isNull(error) ? error.getMessage() : String.valueOf(response.statusCode());
LOGGER.log(Level.WARNING, () -> String.format("[%s] " + "Received %s response from %s, retrying",
this.getName(), failureReason, httpRequest.uri().toString()));
this.sendWithRetry(httpRequest, createdAfter, attempts + 1);
}
});
}
/**
* Setter
*
* @param batchLimit integer indicating what my batch limit should be set to
*/
public void setBatchLimit(int batchLimit) {
this.batchLimit = batchLimit;
}
/**
* Setter
*
* @param urlTemplate a string representing what my url template should be set
* to
*/
public void setUrlTemplate(String urlTemplate) {
this.urlTemplate = urlTemplate;
}
/**
* Setter
*
* @param httpClient an HttpClient used to send requests and receive responses
*/
public void setHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
/**
* Getter
*
*/
public HttpClient getHttpClient() {
return this.httpClient;
}
}