ContentPublisher.java
- package gov.usgs.earthquake.aws;
- import java.io.IOException;
- import java.net.http.HttpRequest;
- import java.nio.ByteBuffer;
- import java.util.concurrent.Flow;
- import gov.usgs.earthquake.product.Content;
- /**
- * A simple publisher that wraps an InputStreamPublisher for primary
- * implementation, but overrides the contentLength method since this value is
- * actually known for Product Content.
- *
- * This class is meant to be used by the java.net.http.HttpRequest as a
- * BodyPublisher for product content. Typically this might be used for PUT or
- * POST requests and was specifically designed to work witht AWS S3 presigned
- * URL file uploads.
- *
- * This is necessary because we do not want to buffer [potentially large]
- * product content in memory, thus streaming is desired. The default streaming
- * publisher cannot know the content length ahead of time and thus uses a
- * `Transfer-Encoding: chunked` header, but the presigned URLs do not support
- * this. By using knowledge of the Content to set the content length value, a
- * `Content-Length: LENGTH` header is set and the presigned URLs work as
- * expected.
- *
- * This is implemented in this way ("has-a" vs "extends") because the parent
- * class (InputStreamPublisher) is a Java internal from which we cannot extend
- * directly.
- */
- public class ContentPublisher implements HttpRequest.BodyPublisher {
- private final Content content;
- private final HttpRequest.BodyPublisher publisher;
- /**
- * Create a new ContentPublisher
- *
- * @param content The product content to be published
- */
- public ContentPublisher(Content content) {
- this.content = content;
- this.publisher = HttpRequest.BodyPublishers.ofInputStream(() -> {
- try {
- return this.content.getInputStream();
- } catch (IOException iox) {
- // Returning null will internally cause an IOException to be published
- // to the subscriber for subsequent handling
- return null;
- }
- });
- }
- /**
- * Subscribes the given subscriber to this publisher. Implementation is
- * delegated to internal InputStreamPublisher.
- *
- * @param subscriber The object requesting to be subscribed to this publisher
- */
- @Override
- public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
- this.publisher.subscribe(subscriber);
- }
- /**
- * Returns the total content length to be published.
- *
- * @return The total content length
- */
- @Override
- public long contentLength() {
- return this.content.getLength();
- }
- }