ContentPublisher.java

package gov.usgs.earthquake.aws;

import java.io.IOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;

import gov.usgs.earthquake.product.Content;

/**
 * A simple publisher that wraps an InputStreamPublisher for primary
 * implementation, but overrides the contentLength method since this value is
 * actually known for Product Content.
 *
 * This class is meant to be used by the java.net.http.HttpRequest as a
 * BodyPublisher for product content. Typically this might be used for PUT or
 * POST requests and was specifically designed to work witht AWS S3 presigned
 * URL file uploads.
 *
 * This is necessary because we do not want to buffer [potentially large]
 * product content in memory, thus streaming is desired. The default streaming
 * publisher cannot know the content length ahead of time and thus uses a
 * `Transfer-Encoding: chunked` header, but the presigned URLs do not support
 * this. By using knowledge of the Content to set the content length value, a
 * `Content-Length: LENGTH` header is set and the presigned URLs work as
 * expected.
 *
 * This is implemented in this way ("has-a" vs "extends") because the parent
 * class (InputStreamPublisher) is a Java internal from which we cannot extend
 * directly.
 */
public class ContentPublisher implements HttpRequest.BodyPublisher {
  private final Content content;
  private final HttpRequest.BodyPublisher publisher;

  /**
   * Create a new ContentPublisher
   *
   * @param content The product content to be published
   */
  public ContentPublisher(Content content) {
    this.content = content;
    this.publisher = HttpRequest.BodyPublishers.ofInputStream(() -> {
      try {
        return this.content.getInputStream();
      } catch (IOException iox) {
        // Returning null will internally cause an IOException to be published
        // to the subscriber for subsequent handling
        return null;
      }
    });
  }

  /**
   * Subscribes the given subscriber to this publisher. Implementation is
   * delegated to internal InputStreamPublisher.
   *
   * @param subscriber The object requesting to be subscribed to this publisher
   */
  @Override
  public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    this.publisher.subscribe(subscriber);
  }

  /**
   * Returns the total content length to be published.
   *
   * @return The total content length
   */
  @Override
  public long contentLength() {
    return this.content.getLength();
  }
}