JsonProductStorage.java
package gov.usgs.earthquake.aws;
import java.io.ByteArrayInputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import gov.usgs.earthquake.distribution.ContentTypeNotSupportedException;
import gov.usgs.earthquake.distribution.InvalidSignatureException;
import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductKeyChain;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.distribution.SignatureVerifier;
import gov.usgs.earthquake.distribution.StorageEvent;
import gov.usgs.earthquake.distribution.StorageListener;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.earthquake.product.io.ObjectProductSource;
import gov.usgs.earthquake.product.io.ProductSource;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
/**
* Store Products in a database.
*
* Note that this storage does not store Product Content, and is intended for
* Products that use URLContent and can be serialized using JsonProduct.
*
* Only SQLITE or local development should rely on createSchema. Products (data
* column) have exceeded 64kb, plan accordingly.
*
* Mysql Schema Example:<br>
*
* <pre>
* CREATE TABLE IF NOT EXISTS indexer_storage
* (id INTEGER PRIMARY KEY AUTO_INCREMENT
* , source VARCHAR(255)
* , type VARCHAR(255)
* , code VARCHAR(255)
* , updatetime BIGINT
* , data LONGTEXT
* , UNIQUE KEY product_index (source, type, code, updatetime)
* ) ENGINE=innodb CHARSET=utf8;
* </pre>
*/
public class JsonProductStorage extends JDBCConnection implements ProductStorage {
private static final Logger LOGGER = Logger.getLogger(JsonProductStorage.class.getName());
/** Variable for the default driver */
public static final String DEFAULT_DRIVER = "org.sqlite.JDBC";
/** Variable for the default table */
public static final String DEFAULT_TABLE = "product";
/** Variable for the default URL */
public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";
/** Database table name. */
private String table;
/** SignatureVerifier to test for valid signatures */
private SignatureVerifier verifier = new SignatureVerifier();
/**
* Create a JsonProductStorage using defaults.
*/
public JsonProductStorage() {
this(DEFAULT_DRIVER, DEFAULT_URL);
}
/**
* Create a JsonProductStorage with a default table.
*
* @param driver Driver to use
* @param url URL to use
*/
public JsonProductStorage(final String driver, final String url) {
this(driver, url, DEFAULT_TABLE);
}
/**
* Create a JsonProductStorage with a custom driver, url, and table.
*
* @param driver Driver to use
* @param url URL to use
* @param table Table to use
*/
public JsonProductStorage(final String driver, final String url, final String table) {
super(driver, url);
this.table = table;
}
/** @return table */
public String getTable() {
return this.table;
}
/** @param table Table to set */
public void setTable(final String table) {
this.table = table;
}
@Override
public void configure(final Config config) throws Exception {
verifier.configure(config);
super.configure(config);
if (getDriver() == null) {
setDriver(DEFAULT_DRIVER);
}
if (getUrl() == null) {
setUrl(DEFAULT_URL);
}
setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}
/**
* After normal startup, check whether schema exists and attempt to create.
*
* @throws Exception if error occurs
*/
@Override
public void startup() throws Exception {
super.startup();
// make sure schema exists
if (!schemaExists()) {
LOGGER.warning("[" + getName() + "] schema not found, creating");
createSchema();
}
}
/**
* Check whether schema exists.
*
* @return boolean
* @throws Exception if error occurs
*/
public boolean schemaExists() throws Exception {
final String sql = "select * from " + this.table + " limit 1";
beginTransaction();
try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
test.setQueryTimeout(60);
// should throw exception if table does not exist
try (final ResultSet rs = test.executeQuery()) {
rs.next();
}
commitTransaction();
// schema exists
return true;
} catch (Exception e) {
rollbackTransaction();
return false;
}
}
/**
* Attempt to create schema.
*
* Only supports sqlite or mysql. When not using sqlite, relying on this method
* is only recommended for local development.
*
* @throws Exception if error occurs
*/
public void createSchema() throws Exception {
// create schema
beginTransaction();
try (final Statement statement = getConnection().createStatement()) {
String autoIncrement = "";
String engine = "";
if (getDriver().contains("mysql")) {
autoIncrement = " AUTO_INCREMENT";
engine = " ENGINE=innodb CHARSET=utf8";
}
statement.executeUpdate(
"CREATE TABLE " + this.table + " (id INTEGER PRIMARY KEY" + autoIncrement + ", source VARCHAR(255)"
+ ", type VARCHAR(255)" + ", code VARCHAR(255)" + ", updatetime BIGINT" + ", data TEXT" + ")" + engine);
statement
.executeUpdate("CREATE UNIQUE INDEX product_index ON " + this.table + " (source, type, code, updatetime)");
commitTransaction();
} catch (Exception e) {
rollbackTransaction();
throw e;
}
}
/**
* Check whether product found in storage.
*/
@Override
public boolean hasProduct(ProductId id) throws Exception {
return getProduct(id) != null;
}
/**
* Get a product from storage.
*
* @param id The product to get.
* @return product if found, otherwise null.
*/
@Override
public synchronized Product getProduct(ProductId id) throws Exception {
Product product = null;
final String sql = "SELECT * FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
// execute
try (final ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
// found product
final String data = rs.getString("data");
product = new JsonProduct()
.getProduct(Json.createReader(new ByteArrayInputStream(data.getBytes())).readObject());
}
}
commitTransaction();
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
LOGGER.log(Level.INFO, "[" + getName() + "] exception in getProduct(" + id.toString() + ")", e);
}
return product;
}
/**
* Add product to storage.
*
* @throws ContentTypeNotSupportedException if product content not supported.
* @throws InvalidSignatureException if signature is invalid.
* @throws ProductAlreadyInStorageException if product already in storage.
*/
@Override
public synchronized ProductId storeProduct(Product product)
throws ContentTypeNotSupportedException, InvalidSignatureException, ProductAlreadyInStorageException, Exception {
try {
Map<String, Content> contents = product.getContents();
for (Content c : contents.values()) {
if (c instanceof FileContent) {
throw new ContentTypeNotSupportedException("Unsupported file type: FileContent");
}
// TODO: add other unsupported content types
}
verifier.verifySignature(product);
} catch (Exception e) {
LOGGER.warning(e.getMessage() + " not storing product");
throw e;
}
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(
"INSERT INTO " + this.table + " (source, type, code, updatetime, data)" + " VALUES (?, ?, ?, ?, ?)")) {
statement.setQueryTimeout(60);
final ProductId id = product.getId();
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
statement.setString(5, product != null ? new JsonProduct().getJsonObject(product).toString() : "");
// execute
statement.executeUpdate();
commitTransaction();
return id;
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
if (e.toString().contains("Duplicate entry")) {
throw new ProductAlreadyInStorageException(e.toString());
}
throw e;
}
}
/**
* Get a ProductSource for product in database.
*
* @return ObjectProductSource or null if product not found.
*/
@Override
public ProductSource getProductSource(ProductId id) throws Exception {
final Product product = getProduct(id);
if (product == null) {
return null;
}
return new ObjectProductSource(product);
}
/**
* Store a ProductSource.
*
* Uses ObjectProductHandler to read Product, then calls storeProduct. Uses
* SignatureVerifier to check for signed products. Checks that the content type
* is allowed (currently only FileContent does not work).
*
* @throws ContentTypeNotSupportedException if product content not supported.
* @throws InvalidSignatureException if signature is invalid.
* @throws ProductAlreadyInStorageException if product already in storage.
*
*/
@Override
public ProductId storeProductSource(ProductSource input)
throws ContentTypeNotSupportedException, InvalidSignatureException, ProductAlreadyInStorageException, Exception {
final ObjectProductHandler handler = new ObjectProductHandler();
input.streamTo(handler);
return storeProduct(handler.getProduct());
}
/**
* Remove product from storage.
*/
@Override
public synchronized void removeProduct(ProductId id) throws Exception {
// prepare statement
final String sql = "DELETE FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?";
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
// execute
statement.executeUpdate();
commitTransaction();
} catch (SQLException e) {
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
throw e;
}
}
@Override
public void notifyListeners(StorageEvent event) {
// listeners not supported
}
@Override
public void addStorageListener(StorageListener listener) {
// listeners not supported
}
@Override
public void removeStorageListener(StorageListener listener) {
// listeners not supported
}
/**
* @return the rejectInvalidSignatures
*/
public boolean isRejectInvalidSignatures() {
return verifier.isRejectInvalidSignatures();
}
/**
* @param rejectInvalidSignatures the rejectInvalidSignatures to set
*/
public void setRejectInvalidSignatures(boolean rejectInvalidSignatures) {
verifier.setRejectInvalidSignatures(rejectInvalidSignatures);
}
/**
* @return the testSignatures
*/
public boolean isTestSignatures() {
return verifier.isTestSignatures();
}
/**
* @param testSignatures the testSignatures to set
*/
public void setTestSignatures(boolean testSignatures) {
verifier.setTestSignatures(testSignatures);
}
/**
* @return the keychain
*/
public ProductKeyChain getKeychain() {
return verifier.getKeychain();
}
/**
* @param keychain the keychain to set
*/
public void setKeychain(ProductKeyChain keychain) {
verifier.setKeychain(keychain);
}
public SignatureVerifier getVerifier() {
return verifier;
}
}