JsonProductStorage.java

package gov.usgs.earthquake.aws;

import java.io.ByteArrayInputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.json.Json;

import gov.usgs.earthquake.distribution.ContentTypeNotSupportedException;
import gov.usgs.earthquake.distribution.InvalidSignatureException;
import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductKeyChain;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.distribution.SignatureVerifier;
import gov.usgs.earthquake.distribution.StorageEvent;
import gov.usgs.earthquake.distribution.StorageListener;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;

/**
 * Store Products in a database.
 *
 * Note that this storage does not store Product Content, and is intended for
 * Products that use URLContent and can be serialized using JsonProduct.
 *
 * Only SQLITE or local development should rely on createSchema. Products (data
 * column) have exceeded 64kb, plan accordingly.
 *
 * Mysql Schema Example:<br>
 *
 * <pre>
 * CREATE TABLE IF NOT EXISTS indexer_storage
 * (id INTEGER PRIMARY KEY AUTO_INCREMENT
 * , source VARCHAR(255)
 * , type VARCHAR(255)
 * , code VARCHAR(255)
 * , updatetime BIGINT
 * , data LONGTEXT
 * , UNIQUE KEY product_index (source, type, code, updatetime)
 * ) ENGINE=innodb CHARSET=utf8;
 * </pre>
 */
public class JsonProductStorage extends JDBCConnection implements ProductStorage {

  private static final Logger LOGGER = Logger.getLogger(JsonProductStorage.class.getName());

  /** Variable for the default driver */
  public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
  /** Variable for the default table */
  public static final String DEFAULT_TABLE = "product";
  /** Variable for the default URL */
  public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";

  /** Database table name. */
  private String table;

  /** SignatureVerifier to test for valid signatures */
  private SignatureVerifier verifier = new SignatureVerifier();

  /**
   * Create a JsonProductStorage using defaults.
   */
  public JsonProductStorage() {
    this(DEFAULT_DRIVER, DEFAULT_URL);
  }

  /**
   * Create a JsonProductStorage with a default table.
   *
   * @param driver Driver to use
   * @param url    URL to use
   */
  public JsonProductStorage(final String driver, final String url) {
    this(driver, url, DEFAULT_TABLE);
  }

  /**
   * Create a JsonProductStorage with a custom driver, url, and table.
   *
   * @param driver Driver to use
   * @param url    URL to use
   * @param table  Table to use
   */
  public JsonProductStorage(final String driver, final String url, final String table) {
    super(driver, url);
    this.table = table;
  }

  /** @return table */
  public String getTable() {
    return this.table;
  }

  /** @param table Table to set */
  public void setTable(final String table) {
    this.table = table;
  }

  @Override
  public void configure(final Config config) throws Exception {

    verifier.configure(config);

    super.configure(config);
    if (getDriver() == null) {
      setDriver(DEFAULT_DRIVER);
    }
    if (getUrl() == null) {
      setUrl(DEFAULT_URL);
    }

    setTable(config.getProperty("table", DEFAULT_TABLE));
    LOGGER.config("[" + getName() + "] driver=" + getDriver());
    LOGGER.config("[" + getName() + "] table=" + getTable());
    // do not log url, it may contain user/pass
  }

  /**
   * After normal startup, check whether schema exists and attempt to create.
   *
   * @throws Exception if error occurs
   */
  @Override
  public void startup() throws Exception {
    super.startup();
    // make sure schema exists
    if (!schemaExists()) {
      LOGGER.warning("[" + getName() + "] schema not found, creating");
      createSchema();
    }
  }

  /**
   * Check whether schema exists.
   *
   * @return boolean
   * @throws Exception if error occurs
   */
  public boolean schemaExists() throws Exception {
    final String sql = "select * from " + this.table + " limit 1";
    beginTransaction();
    try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
      test.setQueryTimeout(60);
      // should throw exception if table does not exist
      try (final ResultSet rs = test.executeQuery()) {
        rs.next();
      }
      commitTransaction();
      // schema exists
      return true;
    } catch (Exception e) {
      rollbackTransaction();
      return false;
    }
  }

  /**
   * Attempt to create schema.
   *
   * Only supports sqlite or mysql. When not using sqlite, relying on this method
   * is only recommended for local development.
   *
   * @throws Exception if error occurs
   */
  public void createSchema() throws Exception {
    // create schema
    beginTransaction();
    try (final Statement statement = getConnection().createStatement()) {
      String autoIncrement = "";
      String engine = "";
      if (getDriver().contains("mysql")) {
        autoIncrement = " AUTO_INCREMENT";
        engine = " ENGINE=innodb CHARSET=utf8";
      }
      statement.executeUpdate(
          "CREATE TABLE " + this.table + " (id INTEGER PRIMARY KEY" + autoIncrement + ", source VARCHAR(255)"
              + ", type VARCHAR(255)" + ", code VARCHAR(255)" + ", updatetime BIGINT" + ", data TEXT" + ")" + engine);
      statement
          .executeUpdate("CREATE UNIQUE INDEX product_index ON " + this.table + " (source, type, code, updatetime)");
      commitTransaction();
    } catch (Exception e) {
      rollbackTransaction();
      throw e;
    }
  }

  /**
   * Check whether product found in storage.
   */
  @Override
  public boolean hasProduct(ProductId id) throws Exception {
    return getProduct(id) != null;
  }

  /**
   * Get a product from storage.
   *
   * @param id The product to get.
   * @return product if found, otherwise null.
   */
  @Override
  public synchronized Product getProduct(ProductId id) throws Exception {
    Product product = null;
    final String sql = "SELECT * FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?";
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      statement.setQueryTimeout(60);
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());
      // execute
      try (final ResultSet rs = statement.executeQuery()) {
        if (rs.next()) {
          // found product
          final String data = rs.getString("data");
          product = new JsonProduct()
              .getProduct(Json.createReader(new ByteArrayInputStream(data.getBytes())).readObject());
        }
      }
      commitTransaction();
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      LOGGER.log(Level.INFO, "[" + getName() + "] exception in getProduct(" + id.toString() + ")", e);
    }
    return product;
  }

  /**
   * Add product to storage.
   *
   * @throws ContentTypeNotSupportedException if product content not supported.
   * @throws InvalidSignatureException        if signature is invalid.
   * @throws ProductAlreadyInStorageException if product already in storage.
   */
  @Override
  public synchronized ProductId storeProduct(Product product)
      throws ContentTypeNotSupportedException, InvalidSignatureException, ProductAlreadyInStorageException, Exception {
    try {
      Map<String, Content> contents = product.getContents();
      for (Content c : contents.values()) {
        if (c instanceof FileContent) {
          throw new ContentTypeNotSupportedException("Unsupported file type: FileContent");
        }
        // TODO: add other unsupported content types
      }
      verifier.verifySignature(product);
    } catch (Exception e) {
      LOGGER.warning(e.getMessage() + " not storing product");
      throw e;
    }
    // prepare statement
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(
        "INSERT INTO " + this.table + " (source, type, code, updatetime, data)" + " VALUES (?, ?, ?, ?, ?)")) {
      statement.setQueryTimeout(60);
      final ProductId id = product.getId();
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());
      statement.setString(5, product != null ? new JsonProduct().getJsonObject(product).toString() : "");
      // execute
      statement.executeUpdate();
      commitTransaction();
      return id;
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      if (e.toString().contains("Duplicate entry")) {
        throw new ProductAlreadyInStorageException(e.toString());
      }
      throw e;
    }
  }

  /**
   * Get a ProductSource for product in database.
   *
   * @return ObjectProductSource or null if product not found.
   */
  @Override
  public ProductSource getProductSource(ProductId id) throws Exception {
    final Product product = getProduct(id);
    if (product == null) {
      return null;
    }
    return new ObjectProductSource(product);
  }

  /**
   * Store a ProductSource.
   *
   * Uses ObjectProductHandler to read Product, then calls storeProduct. Uses
   * SignatureVerifier to check for signed products. Checks that the content type
   * is allowed (currently only FileContent does not work).
   *
   * @throws ContentTypeNotSupportedException if product content not supported.
   * @throws InvalidSignatureException        if signature is invalid.
   * @throws ProductAlreadyInStorageException if product already in storage.
   * 
   */
  @Override
  public ProductId storeProductSource(ProductSource input)
      throws ContentTypeNotSupportedException, InvalidSignatureException, ProductAlreadyInStorageException, Exception {
    final ObjectProductHandler handler = new ObjectProductHandler();
    input.streamTo(handler);
    return storeProduct(handler.getProduct());

  }

  /**
   * Remove product from storage.
   */
  @Override
  public synchronized void removeProduct(ProductId id) throws Exception {
    // prepare statement
    final String sql = "DELETE FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?";
    beginTransaction();
    try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
      statement.setQueryTimeout(60);
      // set parameters
      statement.setString(1, id.getSource());
      statement.setString(2, id.getType());
      statement.setString(3, id.getCode());
      statement.setLong(4, id.getUpdateTime().getTime());
      // execute
      statement.executeUpdate();
      commitTransaction();
    } catch (SQLException e) {
      try {
        // otherwise roll back
        rollbackTransaction();
      } catch (SQLException e2) {
        // ignore
      }
      throw e;
    }
  }

  @Override
  public void notifyListeners(StorageEvent event) {
    // listeners not supported
  }

  @Override
  public void addStorageListener(StorageListener listener) {
    // listeners not supported
  }

  @Override
  public void removeStorageListener(StorageListener listener) {
    // listeners not supported
  }

  /**
   * @return the rejectInvalidSignatures
   */
  public boolean isRejectInvalidSignatures() {
    return verifier.isRejectInvalidSignatures();
  }

  /**
   * @param rejectInvalidSignatures the rejectInvalidSignatures to set
   */
  public void setRejectInvalidSignatures(boolean rejectInvalidSignatures) {
    verifier.setRejectInvalidSignatures(rejectInvalidSignatures);
  }

  /**
   * @return the testSignatures
   */
  public boolean isTestSignatures() {
    return verifier.isTestSignatures();
  }

  /**
   * @param testSignatures the testSignatures to set
   */
  public void setTestSignatures(boolean testSignatures) {
    verifier.setTestSignatures(testSignatures);
  }

  /**
   * @return the keychain
   */
  public ProductKeyChain getKeychain() {
    return verifier.getKeychain();
  }

  /**
   * @param keychain the keychain to set
   */
  public void setKeychain(ProductKeyChain keychain) {
    verifier.setKeychain(keychain);
  }

  public SignatureVerifier getVerifier() {
    return verifier;
  }

}