SearchResponseXmlProductSource.java

/*
 * SearchResponseXmlProductSource
 */
package gov.usgs.earthquake.indexer;

import java.util.logging.Level;
import java.util.logging.Logger;

import org.xml.sax.Attributes;
import org.xml.sax.SAXException;

import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.ProductHandler;
import gov.usgs.earthquake.product.io.XmlProductHandler;
import gov.usgs.earthquake.product.io.XmlProductSource;

/**
 * Used by SearchResponseParser to store products during parsing.
 *
 * Creates a "background" storage thread for storing, while this classes
 * startElement, characters, and endElement methods are called by the
 * "foreground" xml parsing thread.
 */
public class SearchResponseXmlProductSource extends XmlProductSource {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(SearchResponseXmlProductSource.class.getName());

  /** The storage where the product is streamed. */
  private FileProductStorage storage;

  /** The stored id. */
  private Product storedProduct = null;

  /** The thread where storage does its thing (current thread is xml parsing). */
  private Thread storageThread;

  /** */
  private final Object waitForSetHandlerSync = new Object();

  /** */
  private final Object waitForStreamToSync = new Object();

  /** start product attributes, for acquiring writelock in background thread */
  private String uri;
  private String localName;
  private String qName;
  private Attributes attributes;
  private SAXException exception;

  /**
   * Construct a SearchResponseXmlProductSource.
   *
   * @param storage the storage where the parsed product is stored.
   */
  public SearchResponseXmlProductSource(final FileProductStorage storage) {
    super((ProductHandler) null);
    this.setStorage(storage);
  }

  /**
   * Called by the underlying product storage as part os storeProductSource.
   *
   * This method notifies the XML parsing thread that parsing may continue, since
   * the handler is now setup.
   */
  @Override
  public void streamTo(final ProductHandler handler) {
    this.setHandler(handler);

    try {
      // start the product in this thread, so the write lock is acquired
      // and released in the same thread
      super.startElement(uri, localName, qName, attributes);
    } catch (SAXException e) {
      exception = e;
    }
    // clear references that are no longer needed
    this.uri = null;
    this.localName = null;
    this.qName = null;
    this.attributes = null;

    synchronized (waitForSetHandlerSync) {
      // notify xml parsing thread that handler is all set
      waitForSetHandlerSync.notify();
    }

    synchronized (waitForStreamToSync) {
      try {
        // wait for xml parsing thread to notify streamTo is complete
        waitForStreamToSync.wait();
      } catch (Exception e) {
        // ignore
      }
    }
  }

  @Override
  public void startElement(final String uri, final String localName, final String qName, final Attributes attributes)
      throws SAXException {
    boolean startElementAlreadySent = false;

    if (uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE) && localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
      // save these to write lock can be acquired by correct thread.
      this.uri = uri;
      this.localName = localName;
      this.qName = qName;
      this.attributes = attributes;

      // starting a product, set up the storage handler/thread
      // reference used by storage thread to set product
      final SearchResponseXmlProductSource thisSource = this;
      storageThread = new Thread() {
        public void run() {
          try {
            ProductId id = storage.storeProductSource(thisSource);
            thisSource.setProduct(storage.getProduct(id));
          } catch (Exception e) {
            LOGGER.log(Level.WARNING, "Exception while storing product", e);
            thisSource.setProduct(null);
          }
        }
      };

      synchronized (waitForSetHandlerSync) {
        storageThread.start();
        try {
          // wait for storage thread to call streamTo with
          // handler
          waitForSetHandlerSync.wait();
        } catch (InterruptedException e) {
          // ignore
        }
        // handler set, ready to continue parsing

        // signal that we've already sent the startElement
        startElementAlreadySent = true;

        // if an exception was generated in background thread, throw
        // it here
        if (exception != null) {
          throw exception;
        }
      }
    }

    if (!startElementAlreadySent) {
      // forward call to parser
      super.startElement(uri, localName, qName, attributes);
    }
  }

  public void endElement(final String uri, final String localName, final String qName) throws SAXException {

    // forward call to parser
    super.endElement(uri, localName, qName);

    if (!uri.equals(XmlProductHandler.PRODUCT_XML_NAMESPACE)) {
      return;
    }

    if (localName.equals(XmlProductHandler.PRODUCT_ELEMENT)) {
      // done parsing the product

      synchronized (waitForStreamToSync) {
        // notify storageThread streamTo is complete
        waitForStreamToSync.notify();
      }

      try {
        // wait for storageThread to complete so storage will have
        // called setProduct before returning
        storageThread.join();
      } catch (InterruptedException e) {
        // ignore
      } finally {
        storageThread = null;
      }
    }
  }

  /** @param storage FileProductStorage to set */
  public void setStorage(FileProductStorage storage) {
    this.storage = storage;
  }

  /** @return FileProductStorage */
  public FileProductStorage getStorage() {
    return storage;
  }

  /**
   * @return the parsed, stored product.
   */
  public Product getProduct() {
    return this.storedProduct;
  }

  /**
   * Method used by storage to provide the parsed product.
   *
   * @param product to set
   */
  protected void setProduct(final Product product) {
    this.storedProduct = product;
  }

}