ContentPublisher.java
package gov.usgs.earthquake.aws;
import java.io.IOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
import gov.usgs.earthquake.product.Content;
/**
* A simple publisher that wraps an InputStreamPublisher for primary
* implementation, but overrides the contentLength method since this value is
* actually known for Product Content.
*
* This class is meant to be used by the java.net.http.HttpRequest as a
* BodyPublisher for product content. Typically this might be used for PUT or
* POST requests and was specifically designed to work witht AWS S3 presigned
* URL file uploads.
*
* This is necessary because we do not want to buffer [potentially large]
* product content in memory, thus streaming is desired. The default streaming
* publisher cannot know the content length ahead of time and thus uses a
* `Transfer-Encoding: chunked` header, but the presigned URLs do not support
* this. By using knowledge of the Content to set the content length value, a
* `Content-Length: LENGTH` header is set and the presigned URLs work as
* expected.
*
* This is implemented in this way ("has-a" vs "extends") because the parent
* class (InputStreamPublisher) is a Java internal from which we cannot extend
* directly.
*/
public class ContentPublisher implements HttpRequest.BodyPublisher {
private final Content content;
private final HttpRequest.BodyPublisher publisher;
/**
* Create a new ContentPublisher
*
* @param content The product content to be published
*/
public ContentPublisher(Content content) {
this.content = content;
this.publisher = HttpRequest.BodyPublishers.ofInputStream(() -> {
try {
return this.content.getInputStream();
} catch (IOException iox) {
// Returning null will internally cause an IOException to be published
// to the subscriber for subsequent handling
return null;
}
});
}
/**
* Subscribes the given subscriber to this publisher. Implementation is
* delegated to internal InputStreamPublisher.
*
* @param subscriber The object requesting to be subscribed to this publisher
*/
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
this.publisher.subscribe(subscriber);
}
/**
* Returns the total content length to be published.
*
* @return The total content length
*/
@Override
public long contentLength() {
return this.content.getLength();
}
}