ContentPublisher.java

  1. package gov.usgs.earthquake.aws;

  2. import java.io.IOException;
  3. import java.net.http.HttpRequest;
  4. import java.nio.ByteBuffer;
  5. import java.util.concurrent.Flow;

  6. import gov.usgs.earthquake.product.Content;

  7. /**
  8.  * A simple publisher that wraps an InputStreamPublisher for primary
  9.  * implementation, but overrides the contentLength method since this value is
  10.  * actually known for Product Content.
  11.  *
  12.  * This class is meant to be used by the java.net.http.HttpRequest as a
  13.  * BodyPublisher for product content. Typically this might be used for PUT or
  14.  * POST requests and was specifically designed to work witht AWS S3 presigned
  15.  * URL file uploads.
  16.  *
  17.  * This is necessary because we do not want to buffer [potentially large]
  18.  * product content in memory, thus streaming is desired. The default streaming
  19.  * publisher cannot know the content length ahead of time and thus uses a
  20.  * `Transfer-Encoding: chunked` header, but the presigned URLs do not support
  21.  * this. By using knowledge of the Content to set the content length value, a
  22.  * `Content-Length: LENGTH` header is set and the presigned URLs work as
  23.  * expected.
  24.  *
  25.  * This is implemented in this way ("has-a" vs "extends") because the parent
  26.  * class (InputStreamPublisher) is a Java internal from which we cannot extend
  27.  * directly.
  28.  */
  29. public class ContentPublisher implements HttpRequest.BodyPublisher {
  30.   private final Content content;
  31.   private final HttpRequest.BodyPublisher publisher;

  32.   /**
  33.    * Create a new ContentPublisher
  34.    *
  35.    * @param content The product content to be published
  36.    */
  37.   public ContentPublisher(Content content) {
  38.     this.content = content;
  39.     this.publisher = HttpRequest.BodyPublishers.ofInputStream(() -> {
  40.       try {
  41.         return this.content.getInputStream();
  42.       } catch (IOException iox) {
  43.         // Returning null will internally cause an IOException to be published
  44.         // to the subscriber for subsequent handling
  45.         return null;
  46.       }
  47.     });
  48.   }

  49.   /**
  50.    * Subscribes the given subscriber to this publisher. Implementation is
  51.    * delegated to internal InputStreamPublisher.
  52.    *
  53.    * @param subscriber The object requesting to be subscribed to this publisher
  54.    */
  55.   @Override
  56.   public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
  57.     this.publisher.subscribe(subscriber);
  58.   }

  59.   /**
  60.    * Returns the total content length to be published.
  61.    *
  62.    * @return The total content length
  63.    */
  64.   @Override
  65.   public long contentLength() {
  66.     return this.content.getLength();
  67.   }
  68. }