ProductsCreatedAfterClient.java
- package gov.usgs.earthquake.aws;
- import java.io.StringReader;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.net.http.HttpClient;
- import java.net.http.HttpRequest;
- import java.net.http.HttpResponse.BodyHandlers;
- import java.time.Duration;
- import java.time.Instant;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.logging.Level;
- import java.util.logging.Logger;
- import java.util.Objects;
- import javax.json.Json;
- import javax.json.JsonArray;
- import javax.json.JsonObject;
- import javax.json.JsonReader;
- import javax.json.JsonValue;
- import gov.usgs.util.Config;
- import gov.usgs.util.DefaultConfigurable;
- /**
- * Client to encapsulate REST interactions with server providing
- * `products_created_after` information.
- *
- */
- public class ProductsCreatedAfterClient extends DefaultConfigurable {
- /** Logger to log across instances */
- public static Logger LOGGER = Logger.getLogger(ProductsCreatedAfterClient.class.getName());
- /** static property string */
- public static final String BATCH_LIMIT_PROPERTY = "batchLimit";
- /**
- * The template for making REST requests. This URL supports the following
- * replacement strings: - {CREATED_AFTER} - {BATCH_LIMIT}
- */
- public static final String URL_TEMPLATE_PROPERTY = "urlTemplate";
- /** static property default value */
- public static final String DEFAULT_BATCH_LIMIT = "100";
- /** static property default URL */
- public static final String DEFAULT_URL_TEMPLATE = "https://earthquake.usgs.gov/pdl/west/products_create_after?created_after={CREATED_AFTER}&limit={BATCH_LIMIT}";
- /** Property key for connect timeout property */
- public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
- /** Default value to use for connect timeout property if not configured */
- private static final long DEFAULT_CONNECT_TIMEOUT = 5000;
- /** Property key for read timeout property */
- public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
- /** Default value to use for read timeout property if not configured */
- private static final long DEFAULT_READ_TIMEOUT = 30000;
- /** Property key for max retry attempts property */
- public static final String MAX_RETRY_ATTEMPTS_PROPERTY = "maxRetryAttempts";
- /** Default value to use for max retry attempts property if not configured */
- private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 5;
- /** settable internal batch limit */
- protected int batchLimit;
- protected HttpClient httpClient;
- protected long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
- protected long readTimeout = DEFAULT_READ_TIMEOUT;
- protected int maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPTS;
- /** list of aws product receivers */
- protected List<AwsProductReceiver> receivers = new ArrayList<AwsProductReceiver>();
- /** URL template string */
- protected String urlTemplate = null;
- /**
- * default constructor
- */
- public ProductsCreatedAfterClient() {
- super();
- }
- /**
- * Add the given receiver as a listener to be notified of messages.
- *
- * @param receiver aws product receiver
- */
- public void addReceiver(final AwsProductReceiver receiver) {
- synchronized (this.receivers) {
- this.receivers.add(receiver);
- }
- }
- @Override
- public void configure(Config config) throws Exception {
- super.configure(config);
- LOGGER.config("[" + this.getName() + "] " + "Starting configuration");
- this.urlTemplate = config.getProperty(URL_TEMPLATE_PROPERTY, DEFAULT_URL_TEMPLATE);
- LOGGER.config("[" + this.getName() + "] " + "urlTemplate=" + this.urlTemplate);
- this.batchLimit = Integer.parseInt(config.getProperty(BATCH_LIMIT_PROPERTY, DEFAULT_BATCH_LIMIT));
- LOGGER.config("[" + this.getName() + "] " + "batchLimit=" + String.valueOf(this.batchLimit));
- String connectTimeout = config.getProperty(CONNECT_TIMEOUT_PROPERTY);
- if (Objects.nonNull(connectTimeout)) {
- this.connectTimeout = Long.parseLong(connectTimeout);
- LOGGER.config("[" + this.getName() + "] " + "connectTimeout=" + String.valueOf(this.connectTimeout));
- }
- String readTimeout = config.getProperty(READ_TIMEOUT_PROPERTY);
- if (Objects.nonNull(readTimeout)) {
- this.readTimeout = Long.parseLong(readTimeout);
- LOGGER.config("[" + this.getName() + "] " + "readTimeout=" + String.valueOf(this.readTimeout));
- }
- String maxRetryAttempts = config.getProperty(MAX_RETRY_ATTEMPTS_PROPERTY);
- if (Objects.nonNull(maxRetryAttempts)) {
- this.maxRetryAttempts = Math.max(0, Integer.parseInt(maxRetryAttempts));
- LOGGER.config("[" + this.getName() + "] " + "maxRetryAttempts=" + String.valueOf(this.maxRetryAttempts));
- }
- this.httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofMillis(this.connectTimeout)).build();
- }
- /**
- * Getter
- *
- * @return my batch limit as an int
- */
- public int getBatchLimit() {
- return this.batchLimit;
- }
- /**
- * Getter
- *
- * @return my url template as a string
- */
- public String getUrlTemplate() {
- return this.urlTemplate;
- }
- /**
- * Notify current receives with the message.
- *
- * @param message json formatted message
- */
- public void notify(JsonObject message) {
- synchronized (this.receivers) {
- for (AwsProductReceiver receiver : this.receivers) {
- try {
- receiver.onJsonMessage(message);
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] exception while processing message '" + message + "'",
- ex);
- }
- }
- }
- }
- /**
- * Called on success within the running async task. Parses the raw response and
- * notifies configured receivers about each message in the raw payload.
- *
- * @param json json formatted message
- * @param durationMs duration in milliseconds
- * @param createdAfter single point in time
- */
- public void onResponse(final JsonObject json, final long durationMs, final Instant createdAfter) {
- JsonArray notifications = json.getJsonArray("notifications");
- LOGGER.info("[PCATask] Received products_create_after payload, size=" + json.toString().length()
- + " bytes, duration=" + durationMs + " ms, count=" + notifications.size());
- for (final JsonValue notification : notifications) {
- try {
- JsonObject message = Json.createObjectBuilder().add("action", "product").add("notification", notification)
- .build();
- this.notify(message);
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, "[PCATask] exception while processing message '" + notification.toString() + "'", ex);
- }
- }
- // "send" a products_created_after response message as well
- JsonObject productsCreatedAfter = Json.createObjectBuilder().add("action", "products_created_after")
- .add("created_after", createdAfter.toString()).add("count", notifications.size()).build();
- try {
- this.notify(productsCreatedAfter);
- } catch (Exception ex) {
- ProductsCreatedAfterClient.LOGGER.log(Level.WARNING,
- "[PCATask] exception while processing message '" + productsCreatedAfter.toString() + '"', ex);
- }
- }
- /**
- * Remove a receiver from the list of currently configured receivers
- *
- * @param receiver aws product receiver
- */
- public void removeReceiver(final AwsProductReceiver receiver) {
- synchronized (this.receivers) {
- this.receivers.remove(receiver);
- }
- }
- /**
- * Send an async Http request
- *
- * @param createdAfter Instant to retrieve products after
- * @throws URISyntaxException thrown if an invalid URI is created
- */
- public void send(final Instant createdAfter) throws URISyntaxException {
- URI uri = new URI(this.urlTemplate.replace("{CREATED_AFTER}", createdAfter.toString()).replace("{BATCH_LIMIT}",
- String.valueOf(this.batchLimit)));
- HttpRequest httpRequest = HttpRequest.newBuilder().uri(uri).GET().timeout(Duration.ofMillis(this.readTimeout))
- .build();
- sendWithRetry(httpRequest, createdAfter, 0);
- }
- private void sendWithRetry(HttpRequest httpRequest, final Instant createdAfter, int attempts) {
- if (attempts > this.maxRetryAttempts) {
- LOGGER.log(Level.WARNING, "[" + this.getName() + "] " + "Reached max number of retry attempts");
- return;
- }
- Instant start = Instant.now();
- httpClient.sendAsync(httpRequest, BodyHandlers.ofString()).whenComplete((response, error) -> {
- if (Objects.isNull(error) && response.statusCode() == 200) {
- try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
- onResponse(reader.readObject(), Duration.between(start, Instant.now()).toMillis(), createdAfter);
- } catch (Exception ex) {
- LOGGER.log(Level.WARNING, () -> String
- .format("[%s] " + "Failed to handle products created after request: %s", this.getName(), ex));
- }
- } else {
- String failureReason = !Objects.isNull(error) ? error.getMessage() : String.valueOf(response.statusCode());
- LOGGER.log(Level.WARNING, () -> String.format("[%s] " + "Received %s response from %s, retrying",
- this.getName(), failureReason, httpRequest.uri().toString()));
- this.sendWithRetry(httpRequest, createdAfter, attempts + 1);
- }
- });
- }
- /**
- * Setter
- *
- * @param batchLimit integer indicating what my batch limit should be set to
- */
- public void setBatchLimit(int batchLimit) {
- this.batchLimit = batchLimit;
- }
- /**
- * Setter
- *
- * @param urlTemplate a string representing what my url template should be set
- * to
- */
- public void setUrlTemplate(String urlTemplate) {
- this.urlTemplate = urlTemplate;
- }
- /**
- * Setter
- *
- * @param httpClient an HttpClient used to send requests and receive responses
- */
- public void setHttpClient(HttpClient httpClient) {
- this.httpClient = httpClient;
- }
- /**
- * Getter
- *
- */
- public HttpClient getHttpClient() {
- return this.httpClient;
- }
- }