BinaryProductSource.java

package gov.usgs.earthquake.product.io;

import gov.usgs.earthquake.product.InputStreamContent;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.ProductSignature;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.CryptoUtils.Version;

import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * Parser for binary format for product data.
 */
public class BinaryProductSource implements ProductSource {

  /** product being parsed. */
  private ProductId id;
  /** stream being parsed. */
  private InputStream in;
  /** binary io utility. */
  private BinaryIO io;

  /**
   * Constructor. Sets up a new BinaryIO
   *
   * @param in an InputStream
   */
  public BinaryProductSource(final InputStream in) {
    this.in = in;
    this.io = new BinaryIO();
  }

  @Override
  public void streamTo(ProductHandler out) throws Exception {
    try {
      while (true) {
        String next = io.readString(in);

        if (next.equals(BinaryProductHandler.HEADER)) {
          // begin product
          id = ProductId.parse(io.readString(in));
          String status = io.readString(in);
          out.onBeginProduct(id, status);
        } else if (next.equals(BinaryProductHandler.PROPERTY)) {
          String name = io.readString(in);
          String value = io.readString(in);
          out.onProperty(id, name, value);
        } else if (next.equals(BinaryProductHandler.LINK)) {
          String relation = io.readString(in);
          URI href = new URI(io.readString(in));
          out.onLink(id, relation, href);
        } else if (next.equals(BinaryProductHandler.CONTENT)) {
          String path = io.readString(in);
          String contentType = io.readString(in);
          Date lastModified = io.readDate(in);
          Long length = io.readLong(in);

          // use a piped output stream to deliver content to separate
          // processing thread. this thread will continue to read
          // InputStream, transfer content to pipedOutputStream.
          // Background thread calls onContent, and reads from
          // pipedInputStream.
          PipedOutputStream pipedOut = new PipedOutputStream();
          PipedInputStream pipedIn = new PipedInputStream(pipedOut);

          final InputStreamContent content = new InputStreamContent(pipedIn);
          content.setContentType(contentType);
          content.setLastModified(lastModified);
          content.setLength(length);

          // background thread delivers content object to product handler
          ContentOutputThread outputThread = new ContentOutputThread(out, id, path, content);

          try {
            outputThread.start();

            // read stream content
            io.readStream(length, in, pipedOut);
          } finally {
            // done reading content, close piped stream to signal EOF.
            StreamUtils.closeStream(pipedOut);
            pipedOut = null;
            try {
              // wait for background thread to complete
              outputThread.join();
            } catch (Exception e) {
              // ignore
            }
            outputThread = null;
            content.close();
          }

        } else if (next.equals(BinaryProductHandler.SIGNATUREVERSION)) {
          Version version = Version.fromString(io.readString(in));
          out.onSignatureVersion(id, version);
        } else if (next.equals(BinaryProductHandler.SIGNATURE)) {
          String signature = io.readString(in);
          out.onSignature(id, signature);
        } else if (next.equals(BinaryProductHandler.SIGNATUREHISTORY)) {
          List<ProductSignature> signatureHistory = new ArrayList<>();
          try {
            while (io.readString(in).equals(BinaryProductHandler.SIGNATUREHISTORYENTRY)) {
              String signature = io.readString(in);
              String signatureVersion = io.readString(in);
              ProductSignature temp = new ProductSignature(signature, Version.fromString(signatureVersion));
              signatureHistory.add(temp);
            }

          } catch (Exception e) {
          }
          out.onSignatureHistory(id, signatureHistory);
        } else if (next.equals(BinaryProductHandler.FOOTER)) {
          out.onEndProduct(id);
          id = null;

          // end of product stream
          break;
        }
      }
    } finally {
      StreamUtils.closeStream(in);
    }
  }

  /**
   * Free any resources associated with this source.
   */
  @Override
  public void close() {
    StreamUtils.closeStream(in);
    in = null;
  }

}