ProductsCreatedAfterClient.java

  1. package gov.usgs.earthquake.aws;

  2. import java.io.StringReader;
  3. import java.net.URI;
  4. import java.net.URISyntaxException;
  5. import java.net.http.HttpClient;
  6. import java.net.http.HttpRequest;
  7. import java.net.http.HttpResponse.BodyHandlers;
  8. import java.time.Duration;
  9. import java.time.Instant;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.logging.Level;
  13. import java.util.logging.Logger;
  14. import java.util.Objects;

  15. import javax.json.Json;
  16. import javax.json.JsonArray;
  17. import javax.json.JsonObject;
  18. import javax.json.JsonReader;
  19. import javax.json.JsonValue;

  20. import gov.usgs.util.Config;
  21. import gov.usgs.util.DefaultConfigurable;

  22. /**
  23.  * Client to encapsulate REST interactions with server providing
  24.  * `products_created_after` information.
  25.  *
  26.  */
  27. public class ProductsCreatedAfterClient extends DefaultConfigurable {
  28.   /** Logger to log across instances */
  29.   public static Logger LOGGER = Logger.getLogger(ProductsCreatedAfterClient.class.getName());

  30.   /** static property string */
  31.   public static final String BATCH_LIMIT_PROPERTY = "batchLimit";

  32.   /**
  33.    * The template for making REST requests. This URL supports the following
  34.    * replacement strings: - {CREATED_AFTER} - {BATCH_LIMIT}
  35.    */
  36.   public static final String URL_TEMPLATE_PROPERTY = "urlTemplate";

  37.   /** static property default value */
  38.   public static final String DEFAULT_BATCH_LIMIT = "100";

  39.   /** static property default URL */
  40.   public static final String DEFAULT_URL_TEMPLATE = "https://earthquake.usgs.gov/pdl/west/products_create_after?created_after={CREATED_AFTER}&limit={BATCH_LIMIT}";

  41.   /** Property key for connect timeout property */
  42.   public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
  43.   /** Default value to use for connect timeout property if not configured */
  44.   private static final long DEFAULT_CONNECT_TIMEOUT = 5000;

  45.   /** Property key for read timeout property */
  46.   public static final String READ_TIMEOUT_PROPERTY = "readTimeout";
  47.   /** Default value to use for read timeout property if not configured */
  48.   private static final long DEFAULT_READ_TIMEOUT = 30000;

  49.   /** Property key for max retry attempts property */
  50.   public static final String MAX_RETRY_ATTEMPTS_PROPERTY = "maxRetryAttempts";
  51.   /** Default value to use for max retry attempts property if not configured */
  52.   private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 5;

  53.   /** settable internal batch limit */
  54.   protected int batchLimit;
  55.   protected HttpClient httpClient;

  56.   protected long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
  57.   protected long readTimeout = DEFAULT_READ_TIMEOUT;
  58.   protected int maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPTS;

  59.   /** list of aws product receivers */
  60.   protected List<AwsProductReceiver> receivers = new ArrayList<AwsProductReceiver>();
  61.   /** URL template string */
  62.   protected String urlTemplate = null;

  63.   /**
  64.    * default constructor
  65.    */
  66.   public ProductsCreatedAfterClient() {
  67.     super();
  68.   }

  69.   /**
  70.    * Add the given receiver as a listener to be notified of messages.
  71.    *
  72.    * @param receiver aws product receiver
  73.    */
  74.   public void addReceiver(final AwsProductReceiver receiver) {
  75.     synchronized (this.receivers) {
  76.       this.receivers.add(receiver);
  77.     }
  78.   }

  79.   @Override
  80.   public void configure(Config config) throws Exception {
  81.     super.configure(config);
  82.     LOGGER.config("[" + this.getName() + "] " + "Starting configuration");

  83.     this.urlTemplate = config.getProperty(URL_TEMPLATE_PROPERTY, DEFAULT_URL_TEMPLATE);
  84.     LOGGER.config("[" + this.getName() + "] " + "urlTemplate=" + this.urlTemplate);

  85.     this.batchLimit = Integer.parseInt(config.getProperty(BATCH_LIMIT_PROPERTY, DEFAULT_BATCH_LIMIT));
  86.     LOGGER.config("[" + this.getName() + "] " + "batchLimit=" + String.valueOf(this.batchLimit));

  87.     String connectTimeout = config.getProperty(CONNECT_TIMEOUT_PROPERTY);
  88.     if (Objects.nonNull(connectTimeout)) {
  89.       this.connectTimeout = Long.parseLong(connectTimeout);
  90.       LOGGER.config("[" + this.getName() + "] " + "connectTimeout=" + String.valueOf(this.connectTimeout));
  91.     }

  92.     String readTimeout = config.getProperty(READ_TIMEOUT_PROPERTY);
  93.     if (Objects.nonNull(readTimeout)) {
  94.       this.readTimeout = Long.parseLong(readTimeout);
  95.       LOGGER.config("[" + this.getName() + "] " + "readTimeout=" + String.valueOf(this.readTimeout));
  96.     }

  97.     String maxRetryAttempts = config.getProperty(MAX_RETRY_ATTEMPTS_PROPERTY);
  98.     if (Objects.nonNull(maxRetryAttempts)) {
  99.       this.maxRetryAttempts = Math.max(0, Integer.parseInt(maxRetryAttempts));
  100.       LOGGER.config("[" + this.getName() + "] " + "maxRetryAttempts=" + String.valueOf(this.maxRetryAttempts));
  101.     }

  102.     this.httpClient = HttpClient.newBuilder().connectTimeout(Duration.ofMillis(this.connectTimeout)).build();
  103.   }

  104.   /**
  105.    * Getter
  106.    *
  107.    * @return my batch limit as an int
  108.    */
  109.   public int getBatchLimit() {
  110.     return this.batchLimit;
  111.   }

  112.   /**
  113.    * Getter
  114.    *
  115.    * @return my url template as a string
  116.    */
  117.   public String getUrlTemplate() {
  118.     return this.urlTemplate;
  119.   }

  120.   /**
  121.    * Notify current receives with the message.
  122.    *
  123.    * @param message json formatted message
  124.    */
  125.   public void notify(JsonObject message) {
  126.     synchronized (this.receivers) {
  127.       for (AwsProductReceiver receiver : this.receivers) {
  128.         try {
  129.           receiver.onJsonMessage(message);
  130.         } catch (Exception ex) {
  131.           LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] exception while processing message '" + message + "'",
  132.               ex);
  133.         }
  134.       }
  135.     }
  136.   }

  137.   /**
  138.    * Called on success within the running async task. Parses the raw response and
  139.    * notifies configured receivers about each message in the raw payload.
  140.    *
  141.    * @param json         json formatted message
  142.    * @param durationMs   duration in milliseconds
  143.    * @param createdAfter single point in time
  144.    */
  145.   public void onResponse(final JsonObject json, final long durationMs, final Instant createdAfter) {
  146.     JsonArray notifications = json.getJsonArray("notifications");

  147.     LOGGER.info("[PCATask] Received products_create_after payload, size=" + json.toString().length()
  148.         + " bytes, duration=" + durationMs + " ms, count=" + notifications.size());

  149.     for (final JsonValue notification : notifications) {
  150.       try {
  151.         JsonObject message = Json.createObjectBuilder().add("action", "product").add("notification", notification)
  152.             .build();

  153.         this.notify(message);
  154.       } catch (Exception ex) {
  155.         LOGGER.log(Level.WARNING, "[PCATask] exception while processing message '" + notification.toString() + "'", ex);
  156.       }
  157.     }

  158.     // "send" a products_created_after response message as well
  159.     JsonObject productsCreatedAfter = Json.createObjectBuilder().add("action", "products_created_after")
  160.         .add("created_after", createdAfter.toString()).add("count", notifications.size()).build();

  161.     try {
  162.       this.notify(productsCreatedAfter);
  163.     } catch (Exception ex) {
  164.       ProductsCreatedAfterClient.LOGGER.log(Level.WARNING,
  165.           "[PCATask] exception while processing message '" + productsCreatedAfter.toString() + '"', ex);
  166.     }
  167.   }

  168.   /**
  169.    * Remove a receiver from the list of currently configured receivers
  170.    *
  171.    * @param receiver aws product receiver
  172.    */
  173.   public void removeReceiver(final AwsProductReceiver receiver) {
  174.     synchronized (this.receivers) {
  175.       this.receivers.remove(receiver);
  176.     }
  177.   }

  178.   /**
  179.    * Send an async Http request
  180.    *
  181.    * @param createdAfter Instant to retrieve products after
  182.    * @throws URISyntaxException thrown if an invalid URI is created
  183.    */
  184.   public void send(final Instant createdAfter) throws URISyntaxException {
  185.     URI uri = new URI(this.urlTemplate.replace("{CREATED_AFTER}", createdAfter.toString()).replace("{BATCH_LIMIT}",
  186.         String.valueOf(this.batchLimit)));

  187.     HttpRequest httpRequest = HttpRequest.newBuilder().uri(uri).GET().timeout(Duration.ofMillis(this.readTimeout))
  188.         .build();

  189.     sendWithRetry(httpRequest, createdAfter, 0);
  190.   }

  191.   private void sendWithRetry(HttpRequest httpRequest, final Instant createdAfter, int attempts) {
  192.     if (attempts > this.maxRetryAttempts) {
  193.       LOGGER.log(Level.WARNING, "[" + this.getName() + "] " + "Reached max number of retry attempts");
  194.       return;
  195.     }

  196.     Instant start = Instant.now();
  197.     httpClient.sendAsync(httpRequest, BodyHandlers.ofString()).whenComplete((response, error) -> {
  198.       if (Objects.isNull(error) && response.statusCode() == 200) {
  199.         try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
  200.           onResponse(reader.readObject(), Duration.between(start, Instant.now()).toMillis(), createdAfter);
  201.         } catch (Exception ex) {
  202.           LOGGER.log(Level.WARNING, () -> String
  203.               .format("[%s] " + "Failed to handle products created after request: %s", this.getName(), ex));
  204.         }
  205.       } else {
  206.         String failureReason = !Objects.isNull(error) ? error.getMessage() : String.valueOf(response.statusCode());
  207.         LOGGER.log(Level.WARNING, () -> String.format("[%s] " + "Received %s response from %s, retrying",
  208.             this.getName(), failureReason, httpRequest.uri().toString()));
  209.         this.sendWithRetry(httpRequest, createdAfter, attempts + 1);
  210.       }
  211.     });
  212.   }

  213.   /**
  214.    * Setter
  215.    *
  216.    * @param batchLimit integer indicating what my batch limit should be set to
  217.    */
  218.   public void setBatchLimit(int batchLimit) {
  219.     this.batchLimit = batchLimit;
  220.   }

  221.   /**
  222.    * Setter
  223.    *
  224.    * @param urlTemplate a string representing what my url template should be set
  225.    *                    to
  226.    */
  227.   public void setUrlTemplate(String urlTemplate) {
  228.     this.urlTemplate = urlTemplate;
  229.   }

  230.   /**
  231.    * Setter
  232.    *
  233.    * @param httpClient an HttpClient used to send requests and receive responses
  234.    */
  235.   public void setHttpClient(HttpClient httpClient) {
  236.     this.httpClient = httpClient;
  237.   }

  238.   /**
  239.    * Getter
  240.    *
  241.    */
  242.   public HttpClient getHttpClient() {
  243.     return this.httpClient;
  244.   }
  245. }