XmlProductSource.java

/*
 * XmlProductSource
 */
package gov.usgs.earthquake.product.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;
import gov.usgs.util.CryptoUtils.Version;
import gov.usgs.earthquake.product.InputStreamContent;
import gov.usgs.earthquake.product.URLContent;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.ProductSignature;

/**
 * Load a product from an InputStream containing XML.
 */
public class XmlProductSource extends DefaultHandler implements ProductSource {

  /** The input stream where xml is read. */
  private InputStream in;

  /** The ProductOutput where events are sent. */
  private ProductHandler out;

  /** The Product being parsed. */
  private ProductId id;

  /** Used to read content in ProductOutput while parsing continues. */
  private ContentOutputThread contentOutputThread;

  /** Used to send content to ProductOutput as it is read. */
  private PipedOutputStream contentOutputStream;

  /** Content being read. */
  private InputStreamContent content;

  /** Used for signature ProductOutput. */
  private StringBuffer signatureBuffer;

  private List<ProductSignature> signatureHistory;

  /**
   * Create a new XmlProductSource.
   *
   * @param in the input stream where xml is read.
   */
  public XmlProductSource(final InputStream in) {
    this.in = in;
  }

  /**
   * Create a new XmlProductSource for embedding in another default handler.
   *
   * @param out the ProductHandler to receive product events.
   */
  public XmlProductSource(final ProductHandler out) {
    this.out = out;
  }

  /**
   * Begin reading the input stream, sending events to out.
   *
   * @param out the receiving ProductOutput.
   */
  public synchronized void streamTo(ProductHandler out) throws Exception {
    try {
      this.out = out;
      XmlUtils.parse(in, this);
    } finally {
      StreamUtils.closeStream(in);
    }
  }

  /**
   * Override DefaultHandler startElement. Adds a new element content buffer and
   * calls onStartElement.
   *
   * @param uri        element uri.
   * @param localName  element localName.
   * @param qName      element qName.
   * @param attributes element attributes.
   * @throws SAXException if onStartElement throws a SAXException.
   */
  public synchronized void startElement(final String uri, final String localName, final String qName,
      final Attributes attributes) throws SAXException {

    if (XmlProductHandler.PRODUCT_XML_NAMESPACE.equals(uri)) {
      // PRODUCT
      if (XmlProductHandler.PRODUCT_ELEMENT.equals(localName)) {
        id = ProductId.parse(XmlUtils.getAttribute(attributes, uri, XmlProductHandler.PRODUCT_ATTRIBUTE_ID));
        id.setUpdateTime(
            XmlUtils.getDate(XmlUtils.getAttribute(attributes, uri, XmlProductHandler.PRODUCT_ATTRIBUTE_UPDATED)));

        String status = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.PRODUCT_ATTRIBUTE_STATUS);

        try {
          out.onBeginProduct(id, status);
        } catch (Exception e) {
          throw new SAXException(e);
        }
      }
      // PROPERTY
      else if (XmlProductHandler.PROPERTY_ELEMENT.equals(localName)) {
        String name = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.PROPERTY_ATTRIBUTE_NAME);
        String value = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.PROPERTY_ATTRIBUTE_VALUE);

        try {
          out.onProperty(id, name, value);
        } catch (Exception e) {
          throw new SAXException(e);
        }
      }
      // LINK
      else if (XmlProductHandler.LINK_ELEMENT.equals(localName)) {
        String relation = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.LINK_ATTRIBUTE_RELATION);
        URI href = null;
        try {
          href = new URI(XmlUtils.getAttribute(attributes, uri, XmlProductHandler.LINK_ATTRIBUTE_HREF));
        } catch (Exception e) {
          return;
        }

        try {
          out.onLink(id, relation, href);
        } catch (Exception e) {
          throw new SAXException(e);
        }
      }
      // CONTENT
      else if (XmlProductHandler.CONTENT_ELEMENT.equals(localName)) {
        try {
          String type = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_TYPE);
          Long length = Long
              .valueOf(XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_LENGTH));
          Date modified = XmlUtils
              .getDate(XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_MODIFIED));
          String path = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_PATH);
          String encoded = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_ENCODED);
          String href = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.CONTENT_ATTRIBUTE_HREF);

          if (href != null) {
            // URL CONTENT
            URL url = null;
            try {
              url = new URL(href);
            } catch (Exception e) {
              throw new SAXException(e);
            }
            URLContent content = new URLContent(url);
            content.setContentType(type);
            content.setLength(length);
            content.setLastModified(modified);

            out.onContent(id, path, content);
            return;
          }

          else {
            // EMBEDDED CONTENT
            // set up a piped stream
            InputStream contentInputStream = openContentStream(encoded != null && "true".equals(encoded));

            content = new InputStreamContent(contentInputStream);
            content.setContentType(type);
            content.setLength(length);
            content.setLastModified(modified);

            // call onContent in separate thread so parsing thread
            // can continue. Element content is fed during the
            // characters method.
            contentOutputThread = new ContentOutputThread(out, id, path, content);
            contentOutputThread.start();
          }

        } catch (Exception e) {
          closeContent();
          throw new SAXException(e);
        }
      }
      // SIGNATURE
      else if (XmlProductHandler.SIGNATURE_ELEMENT.equals(localName)) {
        String version = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.SIGNATURE_ATTRIBUTE_VERSION);
        try {
          out.onSignatureVersion(id, version == null ? Version.SIGNATURE_V2 : Version.fromString(version));
        } catch (Exception e) {
          throw new SAXException(e);
        }
        signatureBuffer = new StringBuffer();
      } else if (XmlProductHandler.SIGNATURE_HISTORY_ENTRY_ELEMENT.equals(localName)) {
        if (signatureHistory == null) {
          signatureHistory = new ArrayList<>();
        }
        String signature = XmlUtils.getAttribute(attributes, uri,
            XmlProductHandler.SIGNATURE_HISTORY_SIGNATURE_ATTRIBUTE_);
        String version = XmlUtils.getAttribute(attributes, uri, XmlProductHandler.SIGNATURE_HISTORY_VERSION_ATTRIBUTE_);
        signatureHistory.add(new ProductSignature(signature, Version.fromString(version)));
      }
    }
  }

  /**
   * Override DefaultHandler endElement. Retrieves element content buffer and
   * passes it to onEndElement.
   *
   * @param uri       element uri.
   * @param localName element localName.
   * @param qName     element qName.
   * @throws SAXException if onEndElement throws a SAXException.
   */
  public synchronized void endElement(final String uri, final String localName, final String qName)
      throws SAXException {
    if (XmlProductHandler.PRODUCT_XML_NAMESPACE.equals(uri)) {
      try {
        if (XmlProductHandler.CONTENT_ELEMENT.equals(localName)) {
          // done reading content content, close piped stream to
          // signal EOF.
          closeContent();
        } else if (XmlProductHandler.SIGNATURE_ELEMENT.equals(localName)) {
          String signature = signatureBuffer.toString();
          signatureBuffer = null;
          out.onSignature(id, signature);

        } else if (XmlProductHandler.SIGNATURE_HISTORY_ELEMENT.equals(localName)) {
          out.onSignatureHistory(id, signatureHistory);

        } else if (XmlProductHandler.PRODUCT_ELEMENT.equals(localName)) {
          out.onEndProduct(id);
        }
      } catch (Exception e) {
        closeContent();
        throw new SAXException(e);
      }
    }
  }

  /**
   * Override DefaultHandler characters. Appends content to current element
   * buffer, or skips if before first element.
   *
   * @param ch     content.
   * @param start  position in content to read.
   * @param length lenth of content to read.
   * @throws SAXException never.
   */
  public synchronized void characters(final char[] ch, final int start, final int length) throws SAXException {
    String chars = new String(ch, start, length);
    if (contentOutputStream != null) {
      try {
        contentOutputStream.write(chars.getBytes());
      } catch (Exception e) {
        // close the piped stream if there was an exception
        closeContent();
        throw new SAXException(e);
      }
    } else if (signatureBuffer != null) {
      signatureBuffer.append(chars);
    } else {
      // ignore, only interested in content or signature
    }
  }

  /** @return ProductHandler */
  protected synchronized ProductHandler getHandler() {
    return out;
  }

  /** @param out ProductHandler to set */
  protected synchronized void setHandler(ProductHandler out) {
    this.out = out;
  }

  /**
   * Free any resources associated with this handler.
   */
  @Override
  public void close() {
    closeContent();

    StreamUtils.closeStream(in);
    if (out != null) {
      out.close();
    }
  }

  /**
   * Set up a piped output stream used during parsing.
   *
   * The XmlProductSource parsing thread starts a background thread to deliver
   * content to the handler, then continues parsing XML and delivers parsed
   * content via the piped streams.
   *
   * If xml parsing completes as expected, the parsing thread will close the
   * connection in {@link #closeContent()}. If errors occur, the objects handling
   * the product source object call closeContent to ensure the resource is closed.
   *
   * @param encoded if it needs to decode base 64 content
   * @return a input stream of the Piped output stream
   * @throws IOException if io error occurs
   */
  @SuppressWarnings("resource")
  public InputStream openContentStream(boolean encoded) throws IOException {
    // EMBEDDED CONTENT
    contentOutputStream = new PipedOutputStream();
    InputStream contentInputStream = new PipedInputStream(contentOutputStream);

    // decode base 64 encoded content
    if (encoded) {
      // this stream is closed by closeContent()
      // either in this thread if everything succeeds,
      // or by the objects using the ProductSource.
      contentInputStream = Base64.getDecoder().wrap(contentInputStream);
    }

    return contentInputStream;
  }

  /**
   * Closes an open output stream
   */
  public void closeContent() {
    StreamUtils.closeStream(contentOutputStream);
    contentOutputStream = null;
    if (contentOutputThread != null) {
      try {
        contentOutputThread.join();
      } catch (Exception e) {
        // ignore
      }
    }
    contentOutputThread = null;
    if (content != null) {
      content.close();
    }
  }

}