JDBCProductIndex.java
/*
* JDBCProductIndex
*/
package gov.usgs.earthquake.indexer;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.util.JDBCConnection;
import gov.usgs.util.Config;
import gov.usgs.util.JDBCUtils;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.StringUtils;
import java.io.File;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URL;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
* JDBC Implementation of {@link ProductIndex}.
*/
public class JDBCProductIndex extends JDBCConnection implements ProductIndex {
/** Logging Utility **/
private static final Logger LOGGER = Logger.getLogger(Indexer.class.getName());
/** _____ First, set up some constants _____ */
/**
* Default index file. Copied into file system as JDBC_DEFAULT_FILE if doesn't
* already exist.
*/
private static final String JDBC_DEFAULT_INDEX = "etc/schema/productIndex.db";
private static final String JDBC_DEFAULT_DRIVER = JDBCUtils.SQLITE_DRIVER_CLASSNAME;
/**
* Default index file. Created by copying JDBC_DEFAULT_INDEX out of Jar if
* doesn't already exist in file system
*/
public static final String JDBC_DEFAULT_FILE = "productIndex.db";
/**
* Constant used to specify what the index file property should be called in to
* config file
*/
private static final String JDBC_FILE_PROPERTY = "indexfile";
/** Prefix for connecting to a sqlite database */
private static final String JDBC_CONNECTION_PREFIX = "jdbc:sqlite:";
/** Variables to store the event and product column names */
// private static final String EVENT_TABLE = "event";
private static final String EVENT_TABLE_ALIAS = "e";
// private static final String EVENT_INDEX_ID = "id";
// private static final String EVENT_CREATED = "created";
// private static final String EVENT_UPDATED = "updated";
// private static final String EVENT_SOURCE = "source";
// private static final String EVENT_SOURCE_CODE = "sourceCode";
private static final String EVENT_TIME = "eventTime";
private static final String EVENT_LATITUDE = "latitude";
private static final String EVENT_LONGITUDE = "longitude";
private static final String EVENT_DEPTH = "depth";
private static final String EVENT_MAGNITUDE = "magnitude";
// private static final String EVENT_STATUS = "status";
private static final String EVENT_STATUS_UPDATE = "UPDATE";
private static final String EVENT_STATUS_DELETE = "DELETE";
private static final String SUMMARY_TABLE = "productSummary";
private static final String SUMMARY_TABLE_ALIAS = "p";
// private static final String SUMMARY_CREATED = "created";
/** Public var for summary product index IDs */
public static final String SUMMARY_PRODUCT_INDEX_ID = "id";
private static final String SUMMARY_PRODUCT_ID = "productId";
// private static final String SUMMARY_EVENT_ID = "eventId";
private static final String SUMMARY_TYPE = "type";
private static final String SUMMARY_SOURCE = "source";
private static final String SUMMARY_CODE = "code";
private static final String SUMMARY_UPDATE_TIME = "updateTime";
private static final String SUMMARY_EVENT_SOURCE = "eventSource";
private static final String SUMMARY_EVENT_SOURCE_CODE = "eventSourceCode";
private static final String SUMMARY_EVENT_TIME = "eventTime";
private static final String SUMMARY_EVENT_LATITUDE = "eventLatitude";
private static final String SUMMARY_EVENT_LONGITUDE = "eventLongitude";
private static final String SUMMARY_EVENT_DEPTH = "eventDepth";
private static final String SUMMARY_EVENT_MAGNITUDE = "eventMagnitude";
private static final String SUMMARY_VERSION = "version";
private static final String SUMMARY_STATUS = "status";
private static final String SUMMARY_PREFERRED = "preferred";
// private static final String SUMMARY_PROPERTY_TABLE =
// "productSummaryProperty";
// private static final String SUMMARY_PROPERTY_ID = "productSummaryIndexId";
// private static final String SUMMARY_PROPERTY_NAME = "name";
// private static final String SUMMARY_PROPERTY_VALUE = "value";
// private static final String SUMMARY_LINK_TABLE = "productSummaryLink";
// private static final String SUMMARY_LINK_ID = "productSummaryIndexId";
// private static final String SUMMARY_LINK_RELATION = "relation";
// private static final String SUMMARY_LINK_URL = "url";
private String index_file;
/**
* Constructor. Sets index_file to the default value JDBC_DEFAULT_FILE
*
* @throws Exception if error occurs
*/
public JDBCProductIndex() throws Exception {
// Default index file, so calling configure() isn't required
index_file = JDBC_DEFAULT_FILE;
setDriver(JDBC_DEFAULT_DRIVER);
}
/**
* Constructor. Uses custom index_file
*
* @param sqliteFileName String for sqlite file name
* @throws Exception if error occurs
*/
public JDBCProductIndex(final String sqliteFileName) throws Exception {
index_file = sqliteFileName;
setDriver(JDBC_DEFAULT_DRIVER);
}
// ____________________________________
// Public Methods
// ____________________________________
/**
* Grab values from the Config object and put them into private variables.
*
* @param config Configuration for the product index
*/
@Override
public void configure(Config config) throws Exception {
super.configure(config);
index_file = config.getProperty(JDBC_FILE_PROPERTY);
if (getDriver() == null) {
setDriver(JDBC_DEFAULT_DRIVER);
}
if (index_file == null || "".equals(index_file)) {
index_file = JDBC_DEFAULT_FILE;
}
}
/**
* Return a connection to the database.
*
* @return Connection object
* @throws Exception if error occurs
*/
@Override
public Connection connect() throws Exception {
// If they are using the sqlite driver, we need to try to create the
// file
if (getDriver().equals(JDBCUtils.SQLITE_DRIVER_CLASSNAME)) {
// Make sure file exists or copy it out of the JAR
File indexFile = new File(index_file);
if (!indexFile.exists()) {
// extract schema from jar
URL schemaURL = JDBCProductIndex.class.getClassLoader().getResource(JDBC_DEFAULT_INDEX);
if (schemaURL != null) {
StreamUtils.transferStream(schemaURL, indexFile);
} else {
// Failed. Probably because we're not in a Jar file
File defaultIndex = new File(JDBC_DEFAULT_INDEX);
StreamUtils.transferStream(defaultIndex, indexFile);
}
}
indexFile = null;
// Build the JDBC url
setUrl(JDBC_CONNECTION_PREFIX + index_file);
}
return super.connect();
}
/**
* Return all events from the database that meet the parameters specified in the
* ProductIndexQuery object.
*
* @param query A description of which events to retrieve.
* @return List of Event objects
*/
@Override
public synchronized List<Event> getEvents(ProductIndexQuery query) throws Exception {
if (query == null) {
return new ArrayList<Event>();
}
// map of events (index id => event), so products can be added incrementally
final Map<Long, Event> events = new HashMap<>();
// all products for loading details
ArrayList<ProductSummary> products = new ArrayList<>();
// Build up our clause list like always
// These clauses may only match certain products within events,
// and are used to find a list of event ids
List<String> clauses = buildProductClauses(query);
// Build the SQL Query from our ProductIndexQuery object
String sql = "SELECT DISTINCT ps2.*" + " FROM productSummary ps2,"
+ " (SELECT DISTINCT e.id FROM event e, productSummary p" + " WHERE e.id=p.eventId";
// Add all appropriate where clauses
for (final String clause : clauses) {
sql = sql + " AND " + clause;
}
sql = sql + ") eventids" + " WHERE ps2.eventid=eventids.id";
// add current clause to outer query
if (query.getResultType() == ProductIndexQuery.RESULT_TYPE_CURRENT) {
sql = sql + " AND NOT EXISTS (" + " SELECT * FROM productSummary" + " WHERE source=ps2.source"
+ " AND type=ps2.type" + " AND code=ps2.code" + " AND updateTime>ps2.updateTime" + ")";
}
// load event products
try (final PreparedStatement statement = getConnection().prepareStatement(sql);
final ResultSet results = statement.executeQuery();) {
statement.setQueryTimeout(60);
while (results.next()) {
// eventid not part of product summary object,
// so need to do this as products are parsed...
final Long id = results.getLong("eventId");
Event event = events.get(id);
if (event == null) {
// create event to hold products
event = new Event();
event.setIndexId(id);
events.put(id, event);
}
final ProductSummary productSummary = parseProductSummary(results);
event.addProduct(productSummary);
products.add(productSummary);
}
}
// load product details
loadProductSummaries(products);
return events.values().stream().collect(Collectors.toList());
}
/**
* Add an event to the database
*
* @param event Event to store
* @return Event object with eventId set to the database id
*/
@Override
public synchronized Event addEvent(Event event) throws Exception {
Event e = null;
final String sql = "INSERT INTO event (created) VALUES (?)";
try (final PreparedStatement insertEvent = getConnection().prepareStatement(sql, new String[] { "id" });) {
insertEvent.setQueryTimeout(60);
// Add the values to the prepared statement
JDBCUtils.setParameter(insertEvent, 1, new Date().getTime(), Types.BIGINT);
// Execute the prepared statement
int rows = insertEvent.executeUpdate();
if (rows == 1) {
long id = 0;
try (final ResultSet keys = insertEvent.getGeneratedKeys()) {
while (keys.next()) {
id = keys.getLong(1);
}
e = new Event(event);
e.setIndexId(id);
}
LOGGER.finest("Added event id=" + id);
} else {
LOGGER.log(Level.WARNING, "[" + getName() + "] Exception when adding new event to database");
throw new Exception("Error adding new event to database");
}
}
LOGGER.log(Level.FINEST, "[" + getName() + "] Added event to Product Index");
return e;
}
/**
* Delete an event from the database.
*
* @param event Event to remove
* @return List containing all the ProductIds that were deleted by the method
* call
*/
@Override
public synchronized List<ProductId> removeEvent(Event event) throws Exception {
Long id = event.getIndexId();
// If there is no index id on the event, we can assume its
// not in the database
if (id == null) {
return null;
}
// remove event products
final List<ProductId> productIds = removeProductSummaries(event.getProductList());
// and now remove event
final String sql = "DELETE FROM event WHERE id=?";
try (final PreparedStatement deleteEvent = getConnection().prepareStatement(sql);) {
deleteEvent.setQueryTimeout(60);
JDBCUtils.setParameter(deleteEvent, 1, id, Types.BIGINT);
int rows = deleteEvent.executeUpdate();
// If we didn't delete a row, or we deleted more than 1 row, throw an
// exception
if (rows != 1) {
LOGGER.log(Level.WARNING, "[" + getName() + "] Exception when deleting an event from the database");
throw new Exception("Error deleting event from database");
}
LOGGER.finest("[" + getName() + "] Removed event id=" + id);
}
return productIds;
}
/**
* Return all products that aren't associated with an event.
*
* @param query ProductIndexQuery used to further limit the results
* @return List of unassociated Products
* @throws IllegalArgumentException when query event search type is
* SEARCH_EVENT_PREFERRED.
*/
@Override
public synchronized List<ProductSummary> getUnassociatedProducts(ProductIndexQuery query) throws Exception {
if (query.getEventSearchType() == ProductIndexQuery.SEARCH_EVENT_PREFERRED) {
throw new IllegalArgumentException("getUnassociatedProducts does not support SEARCH_EVENT_PREFERRED");
}
final ArrayList<ProductSummary> products = new ArrayList<ProductSummary>();
final List<String> clauseList = buildProductClauses(query);
// Add the unassociated quantifier to the clause list
clauseList.add("eventId IS NULL");
final String sql = buildProductQuery(query, clauseList);
try (final PreparedStatement statement = getConnection().prepareStatement(sql);) {
statement.setQueryTimeout(60);
try (final ResultSet results = statement.executeQuery();) {
// Now lets build product objects from each row in the result set
while (results.next()) {
products.add(parseProductSummary(results));
}
}
}
// load properties and links
loadProductSummaries(products);
return products;
}
/**
* Return all products that meet the parameters specified in the
* ProductIndexQuery object.
*
* @param query A description of which products to retrieve.
* @return List of ProductSummary objects
* @throws IllegalArgumentException when query event search type is
* SEARCH_EVENT_PREFERRED.
*/
@Override
public synchronized List<ProductSummary> getProducts(ProductIndexQuery query) throws Exception {
// load full product summaries by default
return getProducts(query, true);
}
/**
* Load product summaries.
*
* @param query product query
* @param loadDetails whether to call {@link #loadProductSummaries(List)}, which
* loads links and properties with additional queries.
* @return A list of loaded product summaries
* @throws Exception if error occurs
*/
public synchronized List<ProductSummary> getProducts(ProductIndexQuery query, final boolean loadDetails)
throws Exception {
final String sql = buildProductQuery(query);
final List<ProductSummary> products = new LinkedList<ProductSummary>();
LOGGER.finer("Executing query " + sql);
try (final PreparedStatement statement = getConnection().prepareStatement(sql);) {
statement.setQueryTimeout(60);
try (final ResultSet results = statement.executeQuery();) {
// Now lets build product objects from each row in the result set
while (results.next()) {
products.add(parseProductSummary(results));
}
}
}
if (loadDetails) {
// load properties and links
loadProductSummaries(products);
}
return products;
}
/**
* Check whether product summary is in index.
*
* @param id product to search.
*/
public synchronized boolean hasProduct(final ProductId id) throws Exception {
final String sql = "SELECT id FROM productSummary" + " WHERE source=? AND type=? AND code=? AND updateTime=?";
try (final PreparedStatement statement = getConnection().prepareStatement(sql);) {
statement.setQueryTimeout(60);
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
statement.setString(3, id.getCode());
statement.setLong(4, id.getUpdateTime().getTime());
try (final ResultSet results = statement.executeQuery();) {
// return true if there is a matching row, false otherwise
return results.next();
}
}
}
/**
* Add a product summary to the database
*
* @param summary ProductSummary object to store. Must not be null.
* @return Copy of the product summary object with the indexId set to the newly
* inserted id.
* @throws Exception if error occurs
*/
@Override
public synchronized ProductSummary addProductSummary(ProductSummary summary) throws Exception {
// Add values to the prepared statement
long productId = 0;
final ProductId sid = summary.getId();
final String sql = "INSERT INTO productSummary" + "(created, productId, type, source, code"
+ ", updateTime, eventSource, eventSourceCode, eventTime"
+ ", eventLatitude, eventLongitude, eventDepth, eventMagnitude" + ", version, status, preferred"
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
try (final PreparedStatement insertSummary = getConnection().prepareStatement(sql, new String[] { "id" });) {
insertSummary.setQueryTimeout(60);
// Set the created timestamp
JDBCUtils.setParameter(insertSummary, 1, new Date().getTime(), Types.BIGINT);
if (sid != null) {
JDBCUtils.setParameter(insertSummary, 2, sid.toString(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 3, sid.getType(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 4, sid.getSource(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 5, sid.getCode(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 6, (sid.getUpdateTime() != null) ? sid.getUpdateTime().getTime() : null,
Types.BIGINT);
} else {
// Summary product id is null. Set all these parameter to null
JDBCUtils.setParameter(insertSummary, 2, null, Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 3, null, Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 4, null, Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 5, null, Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 6, null, Types.BIGINT);
}
JDBCUtils.setParameter(insertSummary, 7, summary.getEventSource(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 8, summary.getEventSourceCode(), Types.VARCHAR);
Date eventTime = summary.getEventTime();
JDBCUtils.setParameter(insertSummary, 9, (eventTime != null) ? eventTime.getTime() : null, Types.BIGINT);
JDBCUtils.setParameter(insertSummary, 10,
(summary.getEventLatitude() != null) ? summary.getEventLatitude().doubleValue() : null, Types.DECIMAL);
JDBCUtils.setParameter(insertSummary, 11,
(summary.getEventLongitude() != null) ? normalizeLongitude(summary.getEventLongitude().doubleValue()) : null,
Types.DECIMAL);
JDBCUtils.setParameter(insertSummary, 12,
(summary.getEventDepth() != null) ? summary.getEventDepth().doubleValue() : null, Types.DECIMAL);
JDBCUtils.setParameter(insertSummary, 13,
(summary.getEventMagnitude() != null) ? summary.getEventMagnitude().doubleValue() : null, Types.DECIMAL);
JDBCUtils.setParameter(insertSummary, 14, summary.getVersion(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 15, summary.getStatus(), Types.VARCHAR);
JDBCUtils.setParameter(insertSummary, 16, summary.getPreferredWeight(), Types.BIGINT);
// Execute the prepared statement
insertSummary.executeUpdate();
try (final ResultSet keys = insertSummary.getGeneratedKeys()) {
while (keys.next()) {
productId = keys.getLong(1);
}
}
}
// Now that the summary is stored, lets try to store the properties
addProductProperties(productId, summary.getProperties());
// And try to store the links
addProductLinks(productId, summary.getLinks());
ProductSummary p = new ProductSummary(summary);
p.setIndexId(productId);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("[" + getName() + "] Added productSummary " + sid + ", indexid=" + productId + " to product index");
}
return p;
}
/**
* Delete a product summary from the database If the summary doesn't have an
* indexId value set, throw an exception
*
* @param summary ProductSummary object to delete
*/
@Override
public synchronized ProductId removeProductSummary(ProductSummary summary) throws Exception {
List<ProductId> removed = removeProductSummaries(Arrays.asList(summary));
return removed.get(0);
}
/**
* Create an association between the given event and product summary. This
* assumes that both the event and the product are already stored in their
* respective tables.
*
* @param event Event to add association to
* @param summary ProductSummary to add association to
* @return Copy of event with summary added to the event's products list
*/
@Override
public synchronized Event addAssociation(Event event, ProductSummary summary) throws Exception {
if (event.getIndexId() == null || summary.getIndexId() == null) {
throw new Exception(
"[" + getName() + "] Cannot add association between event or summary that are not already in index.");
}
final ProductId sid = summary.getId();
final String sql = "UPDATE productSummary" + " SET eventId=? WHERE source=? AND type=? AND code=?";
try (final PreparedStatement addAssociation = getConnection().prepareStatement(sql);) {
addAssociation.setQueryTimeout(60);
JDBCUtils.setParameter(addAssociation, 1, event.getIndexId(), Types.BIGINT);
// these will target EVERY version of the given product
JDBCUtils.setParameter(addAssociation, 2, sid.getSource(), Types.VARCHAR);
JDBCUtils.setParameter(addAssociation, 3, sid.getType(), Types.VARCHAR);
JDBCUtils.setParameter(addAssociation, 4, sid.getCode(), Types.VARCHAR);
addAssociation.executeUpdate();
}
final Event e = new Event(event);
e.addProduct(summary);
LOGGER.log(Level.FINER,
"[" + getName() + "] Added associations event id=" + event.getIndexId() + ", productSummary source="
+ sid.getSource() + ", type=" + sid.getType() + ", code=" + sid.getCode() + " (id=" + summary.getIndexId()
+ ")");
return e;
}
/**
* Delete the association, if it exists, between the given event and product
* summary.
*
* NOTE: this removes the association between the event and ALL versions of the
* product summary.
*
* @param event An event to remove an association with
* @param summary A ProductSummary to remove an association with
* @throws Exception if error occurs
*/
@Override
public synchronized Event removeAssociation(Event event, ProductSummary summary) throws Exception {
// Deleting the association is really just removing the foreign key
// on the products table
// First check that this summary and event are both in the database
// What happens if runtime objects are set up, but not added to index.
// This would return the event with the association in-tact. Is that
// okay?
Long eventIndexId = event.getIndexId();
Long productIndexId = summary.getIndexId();
if (eventIndexId == null || productIndexId == null) {
return event;
}
final ProductId sid = summary.getId();
final String sql = "UPDATE productSummary" + " SET eventId=? WHERE source=? AND type=? AND code=?";
try (final PreparedStatement removeAssociation = getConnection().prepareStatement(sql);) {
removeAssociation.setQueryTimeout(60);
// Now run the query
JDBCUtils.setParameter(removeAssociation, 1, null, Types.BIGINT);
// these will target EVERY version of the given product
JDBCUtils.setParameter(removeAssociation, 2, summary.getId().getSource(), Types.VARCHAR);
JDBCUtils.setParameter(removeAssociation, 3, summary.getId().getType(), Types.VARCHAR);
JDBCUtils.setParameter(removeAssociation, 4, summary.getId().getCode(), Types.VARCHAR);
int rows = removeAssociation.executeUpdate();
// Throw an exception if we didn't update any
if (rows < 1) {
LOGGER.log(Level.INFO, "[" + getName() + "] Failed to remove an association in the Product Index");
throw new Exception("Failed to remove association");
}
}
LOGGER.finer("[" + getName() + "] Removed associations event id=" + eventIndexId + ", productSummary source="
+ sid.getSource() + ", type=" + sid.getType() + ", code=" + sid.getCode() + " (id=" + productIndexId + ")");
// Should this method remove the summary from the event's list? Yes.
Event updatedEvent = new Event(event);
List<ProductSummary> productsList = updatedEvent.getAllProducts().get(summary.getType());
// pre 1.7.6 archive policies didn't always clean up after themselves
// handle it gracefully
if (productsList != null) {
// remove all product with given source, type, and code
Iterator<ProductSummary> iter = productsList.iterator();
while (iter.hasNext()) {
ProductId id = iter.next().getId();
if (id.isSameProduct(summary.getId())) {
iter.remove();
}
}
if (productsList.size() == 0) {
// if this was the last product of that type, remove the list
// too
updatedEvent.getAllProducts().remove(summary.getType());
}
} else {
LOGGER.warning(
"Products list is empty for summary type " + summary.getId().toString() + ", when removing association");
}
return updatedEvent;
}
// ____________________________________
// Protected Methods
// ____________________________________
/**
* Build a list of all the pieces of the WHERE clause relevant to the
* productSummary table. If the query doesn't set any properties, this method
* will return an empty list. It is up to the calling methods to check if the
* clause list is empty when they build their WHERE clause.
*
* @param query ProductIndexQuery
* @return list containing clauses in the form: column="value"
*/
protected List<String> buildProductClauses(ProductIndexQuery query) {
List<String> clauseList = new ArrayList<String>();
if (query == null) {
return clauseList; /* No query = No clauses */
}
// If they only want current products make a clause that contains a
// subquery
if (query.getResultType() == ProductIndexQuery.RESULT_TYPE_CURRENT) {
String queryCode, querySource, queryType;
queryCode = query.getProductCode();
querySource = query.getProductSource();
queryType = query.getProductType();
if (queryCode != null && querySource != null && queryType != null) {
// Better sub-select when these properties are specified
clauseList.add(String.format(
"%s.%s = (SELECT %s FROM %s ps WHERE ps.%s='%s' AND ps.%s='%s' AND ps.%s='%s' AND ps.%s <> 'DELETE' ORDER BY ps.%s DESC LIMIT 1)",
SUMMARY_TABLE_ALIAS, SUMMARY_PRODUCT_INDEX_ID, SUMMARY_PRODUCT_INDEX_ID, SUMMARY_TABLE, SUMMARY_SOURCE,
querySource, SUMMARY_TYPE, queryType, SUMMARY_CODE, queryCode, SUMMARY_STATUS, SUMMARY_UPDATE_TIME));
} else {
clauseList.add(String.format(
"NOT EXISTS (SELECT %s FROM %s ps WHERE ps.%s=p.%s AND ps.%s=p.%s AND ps.%s=p.%s AND ps.%s > p.%s AND ps.%s <> 'DELETE')",
SUMMARY_PRODUCT_INDEX_ID, SUMMARY_TABLE, SUMMARY_TYPE, SUMMARY_TYPE, SUMMARY_SOURCE, SUMMARY_SOURCE,
SUMMARY_CODE, SUMMARY_CODE, SUMMARY_UPDATE_TIME, SUMMARY_UPDATE_TIME, SUMMARY_STATUS));
}
}
// If they only want superseded products, make a slightly different
// clause that has a subquery
else if (query.getResultType() == ProductIndexQuery.RESULT_TYPE_SUPERSEDED) {
clauseList.add(String.format(
"EXISTS (SELECT %s FROM %s ps WHERE ps.%s=p.%s AND ps.%s=p.%s AND ps.%s=p.%s AND ps.%s > p.%s AND ps.%s <> 'DELETE')",
SUMMARY_PRODUCT_INDEX_ID, SUMMARY_TABLE, SUMMARY_TYPE, SUMMARY_TYPE, SUMMARY_SOURCE, SUMMARY_SOURCE,
SUMMARY_CODE, SUMMARY_CODE, SUMMARY_UPDATE_TIME, SUMMARY_UPDATE_TIME, SUMMARY_STATUS));
}
// Interested in "any" productId in the query.
Iterator<ProductId> productIter = query.getProductIds().iterator();
// If there are one or more productIds we should build this clause
if (productIter.hasNext()) {
// Begin an "IN" clause
StringBuilder clause = new StringBuilder();
clause.append(
String.format("%s.%s IN ('%s", SUMMARY_TABLE_ALIAS, SUMMARY_PRODUCT_ID, productIter.next().toString()));
// Loop over any remaining productIds and add them to clause
while (productIter.hasNext()) {
clause.append("', '");
clause.append(productIter.next().toString());
}
// Finish off our clause and add it to our clauseList
clause.append("')");
clauseList.add(clause.toString());
}
// Build clauses for all specified columns
String eventSource = query.getEventSource();
if (eventSource != null) {
clauseList.add(String.format("%s.%s='%s'", SUMMARY_TABLE_ALIAS, SUMMARY_EVENT_SOURCE, eventSource));
}
String eventSourceCode = query.getEventSourceCode();
if (eventSourceCode != null) {
clauseList.add(String.format("%s.%s='%s'", SUMMARY_TABLE_ALIAS, SUMMARY_EVENT_SOURCE_CODE, eventSourceCode));
}
String eventTimeColumn;
String eventLatitudeColumn;
String eventLongitudeColumn;
String eventMagnitudeColumn;
String eventDepthColumn;
// which table is used for event properties
if (query.getEventSearchType() == ProductIndexQuery.SEARCH_EVENT_PREFERRED) {
// search preferred event parameters in event table
eventTimeColumn = EVENT_TABLE_ALIAS + "." + EVENT_TIME;
eventLatitudeColumn = EVENT_TABLE_ALIAS + "." + EVENT_LATITUDE;
eventLongitudeColumn = EVENT_TABLE_ALIAS + "." + EVENT_LONGITUDE;
eventMagnitudeColumn = EVENT_TABLE_ALIAS + "." + EVENT_MAGNITUDE;
eventDepthColumn = EVENT_TABLE_ALIAS + "." + EVENT_DEPTH;
} else {
// search product summary parameters in summary table
eventTimeColumn = SUMMARY_TABLE_ALIAS + "." + SUMMARY_EVENT_TIME;
eventLatitudeColumn = SUMMARY_TABLE_ALIAS + "." + SUMMARY_EVENT_LATITUDE;
eventLongitudeColumn = SUMMARY_TABLE_ALIAS + "." + SUMMARY_EVENT_LONGITUDE;
eventMagnitudeColumn = SUMMARY_TABLE_ALIAS + "." + SUMMARY_EVENT_MAGNITUDE;
eventDepthColumn = SUMMARY_TABLE_ALIAS + "." + SUMMARY_EVENT_DEPTH;
}
Date minTime = query.getMinEventTime();
if (minTime != null) {
clauseList.add(String.format("%s>=%d", eventTimeColumn, minTime.getTime()));
}
Date maxTime = query.getMaxEventTime();
if (maxTime != null) {
clauseList.add(String.format("%s<=%d", eventTimeColumn, maxTime.getTime()));
}
BigDecimal minLat = query.getMinEventLatitude();
if (minLat != null) {
clauseList.add(String.format("%s>=%f", eventLatitudeColumn, minLat.doubleValue()));
}
BigDecimal maxLat = query.getMaxEventLatitude();
if (maxLat != null) {
clauseList.add(String.format("%s<=%f", eventLatitudeColumn, maxLat.doubleValue()));
}
BigDecimal minDepth = query.getMinEventDepth();
if (minDepth != null) {
clauseList.add(String.format("%s>=%f", eventDepthColumn, minDepth.doubleValue()));
}
BigDecimal maxDepth = query.getMaxEventDepth();
if (maxDepth != null) {
clauseList.add(String.format("%s<=%f", eventDepthColumn, maxDepth.doubleValue()));
}
BigDecimal minMag = query.getMinEventMagnitude();
if (minMag != null) {
clauseList.add(String.format("%s>=%f", eventMagnitudeColumn, minMag.doubleValue()));
}
BigDecimal maxMag = query.getMaxEventMagnitude();
if (maxMag != null) {
clauseList.add(String.format("%s<=%f", eventMagnitudeColumn, maxMag.doubleValue()));
}
Date minUpdateTime = query.getMinProductUpdateTime();
if (minUpdateTime != null) {
clauseList.add(String.format("%s>=%d", SUMMARY_UPDATE_TIME, minUpdateTime.getTime()));
}
Date maxUpdateTime = query.getMaxProductUpdateTime();
if (maxUpdateTime != null) {
clauseList.add(String.format("%s<=%d", SUMMARY_UPDATE_TIME, maxUpdateTime.getTime()));
}
String source = query.getProductSource();
if (source != null) {
clauseList.add(String.format("%s='%s'", SUMMARY_SOURCE, source));
}
String type = query.getProductType();
if (type != null) {
clauseList.add(String.format("%s='%s'", SUMMARY_TYPE, type));
}
String code = query.getProductCode();
if (code != null) {
clauseList.add(String.format("%s='%s'", SUMMARY_CODE, code));
}
String version = query.getProductVersion();
if (version != null) {
clauseList.add(String.format("%s='%s'", SUMMARY_VERSION, version));
}
String status = query.getProductStatus();
if (status != null) {
clauseList.add(String.format("%s='%s'", SUMMARY_STATUS, status));
}
Long minProductIndexId = query.getMinProductIndexId();
if (minProductIndexId != null) {
clauseList.add(String.format("%s>=%d", SUMMARY_PRODUCT_INDEX_ID, minProductIndexId));
}
BigDecimal minLon = query.getMinEventLongitude();
BigDecimal maxLon = query.getMaxEventLongitude();
// Normalize the longitudes between -180 and 180
minLon = normalizeLongitude(minLon);
maxLon = normalizeLongitude(maxLon);
if (minLon != null && maxLon != null) {
if (maxLon.doubleValue() < minLon.doubleValue()) {
// If the normalized maxLon is less than the normalized minLon,
// the
// span crosses
// the date line
Double minLonDouble = minLon.doubleValue();
Double maxLonDouble = maxLon.doubleValue();
// If the range crosses the date line, split it into 2 clauses
String lonClause = String.format("((%s > %f AND %s <= 180) OR (%s < %f AND %s > -180))", eventLongitudeColumn,
minLonDouble, eventLongitudeColumn, eventLongitudeColumn, maxLonDouble, eventLongitudeColumn);
clauseList.add(lonClause);
} else {
clauseList.add(String.format("%s>=%f and %s<=%f", eventLongitudeColumn, minLon.doubleValue(),
eventLongitudeColumn, maxLon.doubleValue()));
}
} else if (minLon != null) {
clauseList.add(String.format("%s>=%f", eventLongitudeColumn, minLon.doubleValue()));
} else if (maxLon != null) {
clauseList.add(String.format("%s<=%f", eventLongitudeColumn, maxLon.doubleValue()));
}
return clauseList;
}
/**
* Create the full SELECT query for the products table using the default
* clauseList.
*
* @param query Query to build.
* @return String containing the full SELECT query
* @see #buildProductClauses(ProductIndexQuery)
*/
protected String buildProductQuery(final ProductIndexQuery query) {
final List<String> clauseList = buildProductClauses(query);
return buildProductQuery(query, clauseList);
}
/**
* Create the full SELECT query for the products table using a custom
* clauseList.
*
* @param query Query to build.
* @param clauseList List of clauses for WHERE
* @return String containing the full SELECT query
*/
protected String buildProductQuery(final ProductIndexQuery query, final List<String> clauseList) {
final StringBuffer sql = new StringBuffer();
sql.append("SELECT * FROM " + SUMMARY_TABLE + " p");
// optional where
if (clauseList.size() > 0) {
sql.append(" WHERE ").append(String.join(" AND ", clauseList));
}
// optional order by
String queryOrderBy = query.getOrderBy();
if (queryOrderBy != null) {
sql.append(" ORDER BY ").append(queryOrderBy);
}
// limit is after order by
Integer queryLimit = query.getLimit();
if (queryLimit != null) {
sql.append(" LIMIT ").append(queryLimit);
}
return sql.toString();
}
/**
* Populate links and properties for provided product summaries.
*
* @param summaries List of ProductSummaries
* @throws Exception if error occurs
*/
protected synchronized void loadProductSummaries(final List<ProductSummary> summaries) throws Exception {
if (summaries.size() == 0) {
// nothing to load
return;
}
// index by id
final Map<Long, ProductSummary> summaryMap = new HashMap<>();
for (final ProductSummary summary : summaries) {
summaryMap.put(summary.getIndexId(), summary);
}
// load all links in one query
final String linkSql = "SELECT productSummaryIndexId as id, relation, url" + " FROM productSummaryLink"
+ " WHERE productSummaryIndexId IN ("
+ StringUtils.join(summaryMap.keySet().stream().collect(Collectors.toList()), ",") + ")";
try (final PreparedStatement statement = getConnection().prepareStatement(linkSql);) {
statement.setQueryTimeout(60);
try (final ResultSet results = statement.executeQuery();) {
while (results.next()) {
Long id = results.getLong("id");
String relation = results.getString("relation");
String uri = results.getString("url");
// add properties to existing objects
summaryMap.get(id).addLink(relation, new URI(uri));
}
}
}
// load all properties in one query
final String propertySql = "SELECT productSummaryIndexId as id, name, value" + " FROM productSummaryProperty"
+ " WHERE productSummaryIndexId IN ("
+ StringUtils.join(summaryMap.keySet().stream().collect(Collectors.toList()), ",") + ")";
try (final PreparedStatement statement = getConnection().prepareStatement(propertySql);) {
statement.setQueryTimeout(60);
try (final ResultSet results = statement.executeQuery();) {
while (results.next()) {
Long id = results.getLong("id");
String name = results.getString("name");
String value = results.getString("value");
// add properties to existing objects
summaryMap.get(id).getProperties().put(name, value);
}
}
}
}
/**
* Parse ProductSummary without loading links or properties.
*
* @param results ResultSet to parse
* @return ProductSummary object without links or properties.
* @throws Exception if error occurs
*/
protected ProductSummary parseProductSummary(ResultSet results) throws Exception {
ProductSummary p = new ProductSummary();
p.setIndexId(results.getLong(SUMMARY_PRODUCT_INDEX_ID));
ProductId pid = ProductId.parse(results.getString(SUMMARY_PRODUCT_ID));
p.setId(pid);
p.setEventSource(results.getString(SUMMARY_EVENT_SOURCE));
p.setEventSourceCode(results.getString(SUMMARY_EVENT_SOURCE_CODE));
try {
p.setEventTime(new Date(results.getLong(SUMMARY_EVENT_TIME)));
} catch (Exception e) {
p.setEventTime(null);
}
// getDouble() returns 0 if the value was actually NULL. In this case,
// we are going to set the value to null
String latitude = results.getString(SUMMARY_EVENT_LATITUDE);
if (latitude == null) {
p.setEventLatitude(null);
} else {
p.setEventLatitude(new BigDecimal(latitude));
}
String longitude = results.getString(SUMMARY_EVENT_LONGITUDE);
if (longitude == null) {
p.setEventLongitude(null);
} else {
p.setEventLongitude(new BigDecimal(longitude));
}
String depth = results.getString(SUMMARY_EVENT_DEPTH);
if (depth == null) {
p.setEventDepth(null);
} else {
p.setEventDepth(new BigDecimal(depth));
}
String magnitude = results.getString(SUMMARY_EVENT_MAGNITUDE);
if (magnitude == null) {
p.setEventMagnitude(null);
} else {
p.setEventMagnitude(new BigDecimal(magnitude));
}
p.setVersion(results.getString(SUMMARY_VERSION));
p.setStatus(results.getString(SUMMARY_STATUS));
p.setPreferredWeight(results.getLong(SUMMARY_PREFERRED));
return p;
}
/**
*
* @param summaries List of product summaries to remove
* @return List of ProductIds that were removed
* @throws Exception if error occurs
*/
public synchronized List<ProductId> removeProductSummaries(final List<ProductSummary> summaries) throws Exception {
// index by id
final ArrayList<ProductId> ids = new ArrayList<>();
// index by id
final Map<Long, ProductSummary> summaryMap = new HashMap<>();
for (final ProductSummary summary : summaries) {
if (summary.getIndexId() == null) {
LOGGER.log(Level.WARNING, "[" + getName() + "] Could not delete product summary. Index id not found");
throw new Exception("[" + getName() + "] Could not delete summary. Index id not found.");
}
summaryMap.put(summary.getIndexId(), summary);
ids.add(summary.getId());
}
if (summaries.size() == 0) {
return ids;
}
// remove all products in one query
// on delete cascade wasn't always set...
final String[] sqls = { "DELETE FROM productSummaryLink WHERE productSummaryIndexId IN",
"DELETE FROM productSummaryProperty WHERE productSummaryIndexId IN",
"DELETE FROM productSummary WHERE id IN", };
final String idsIn = " (" + StringUtils.join(summaryMap.keySet().stream().collect(Collectors.toList()), ",") + ")";
for (final String sql : sqls) {
try (final PreparedStatement statement = verifyConnection().prepareStatement(sql + idsIn);) {
statement.setQueryTimeout(60);
int rows = statement.executeUpdate();
LOGGER.log(Level.FINER, "[" + getName() + "] removed " + rows + " rows");
}
}
return ids;
}
/**
* Save the properties in the database and associate them to the given productId
*
* @param productId long product ID to associate to
* @param properties Map of properties to save
* @throws SQLException if SQL error occurs
*/
protected synchronized void addProductProperties(final long productId, final Map<String, String> properties)
throws SQLException {
// Loop through the properties list and add them all to the database
final String sql = "INSERT INTO productSummaryProperty" + " (productSummaryIndexId, name, value) VALUES (?, ?, ?)";
try (final PreparedStatement insertProperty = getConnection().prepareStatement(sql);) {
insertProperty.setQueryTimeout(60);
for (String key : properties.keySet()) {
JDBCUtils.setParameter(insertProperty, 1, productId, Types.BIGINT);
JDBCUtils.setParameter(insertProperty, 2, key, Types.VARCHAR);
JDBCUtils.setParameter(insertProperty, 3, properties.get(key), Types.VARCHAR);
insertProperty.addBatch();
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST,
"[" + getName() + "] Added property " + key + ":" + properties.get(key) + " for product " + productId);
}
}
insertProperty.executeBatch();
}
}
/**
* Save the links in the database and associate them to the given productId
*
* @param productId Index id of the product to select
* @param links Map of relations to URIs
* @throws SQLException if sql error occurs
*/
protected synchronized void addProductLinks(long productId, Map<String, List<URI>> links) throws SQLException {
// Loop through the properties list and add them all to the database
final String sql = "INSERT INTO productSummaryLink" + " (productSummaryIndexId, relation, url) VALUES (?, ?, ?)";
try (final PreparedStatement insertLink = getConnection().prepareStatement(sql);) {
insertLink.setQueryTimeout(60);
for (final String relation : links.keySet()) {
for (final URI uri : links.get(relation)) {
JDBCUtils.setParameter(insertLink, 1, productId, Types.BIGINT);
JDBCUtils.setParameter(insertLink, 2, relation, Types.VARCHAR);
JDBCUtils.setParameter(insertLink, 3, uri.toString(), Types.VARCHAR);
insertLink.addBatch();
LOGGER.log(Level.FINEST,
"[" + getName() + "] Added link " + relation + ":" + uri.toString() + " for product " + productId);
}
}
insertLink.executeBatch();
}
}
/**
* Convert the given longitude to be between -180 and 180. If the given value is
* already in the range, this method just returns the value.
*
* @param lon Double longitude
* @return double normalized between -180 and 180
*/
public static double normalizeLongitude(double lon) {
double normalizedLon = lon;
if (normalizedLon <= 180 && normalizedLon > -180) {
return normalizedLon;
}
// If the value is above 180, make it negative by subtracting 360
if (normalizedLon > 180) {
normalizedLon = normalizedLon % 360;
normalizedLon = normalizedLon - 360;
return normalizedLon;
}
// If the value is below 180, make it positive by adding 360
if (normalizedLon <= -180) {
normalizedLon = normalizedLon % 360;
normalizedLon = normalizedLon + 360;
return normalizedLon;
}
return normalizedLon;
}
/**
* Wrapper to normalize BigDecimal longitudes
*
* @param lon BigDecimal Longitude
* @return Normalized BigDecimal latitude
*/
public static BigDecimal normalizeLongitude(BigDecimal lon) {
if (lon == null) {
return null;
}
return BigDecimal.valueOf(JDBCProductIndex.normalizeLongitude(lon.doubleValue()));
}
/**
* Called when the indexer is done updating events after a product is processed.
* Stores the preferred attributes for each event in the list
*
* @param events the events that have been updated.
*/
@Override
public synchronized void eventsUpdated(List<Event> events) throws Exception {
Long indexId = null;
final String deletedSql = "UPDATE event SET status=? WHERE id=?";
final String updatedSql = "UPDATE event" + " SET updated=?, source=?, sourceCode=?, eventTime=?"
+ " , latitude=?, longitude=?, depth=?, magnitude=?, status=?" + " WHERE id=?";
try (final PreparedStatement updateDeletedEvent = getConnection().prepareStatement(deletedSql);
final PreparedStatement updateEvent = getConnection().prepareStatement(updatedSql);) {
// big events take time...
updateDeletedEvent.setQueryTimeout(300);
updateEvent.setQueryTimeout(300);
Iterator<Event> iter = events.iterator();
while (iter.hasNext()) {
Event updated = iter.next();
indexId = updated.getIndexId();
LOGGER.finer("[" + getName() + "] Updating event indexid=" + indexId);
updated.log(LOGGER);
try {
if (updated.isDeleted()) {
// only update status if event deleted, leave other
// parameters intact
JDBCUtils.setParameter(updateDeletedEvent, 1, EVENT_STATUS_DELETE, Types.VARCHAR);
JDBCUtils.setParameter(updateDeletedEvent, 2, indexId, Types.BIGINT);
updateDeletedEvent.executeUpdate();
} else {
EventSummary summary = updated.getEventSummary();
// otherwise update event parameters
JDBCUtils.setParameter(updateEvent, 1, new Date().getTime(), Types.BIGINT);
JDBCUtils.setParameter(updateEvent, 2, summary.getSource(), Types.VARCHAR);
JDBCUtils.setParameter(updateEvent, 3, summary.getSourceCode(), Types.VARCHAR);
Long eventTime = null;
if (summary.getTime() != null) {
eventTime = summary.getTime().getTime();
}
JDBCUtils.setParameter(updateEvent, 4, eventTime, Types.BIGINT);
Double latitude = null;
if (summary.getLatitude() != null) {
latitude = summary.getLatitude().doubleValue();
}
JDBCUtils.setParameter(updateEvent, 5, latitude, Types.DOUBLE);
Double longitude = null;
if (summary.getLongitude() != null) {
longitude = summary.getLongitude().doubleValue();
}
JDBCUtils.setParameter(updateEvent, 6, longitude, Types.DOUBLE);
// these may be null, handle carefully
Double depth = null;
if (summary.getDepth() != null) {
depth = summary.getDepth().doubleValue();
}
JDBCUtils.setParameter(updateEvent, 7, depth, Types.DOUBLE);
Double magnitude = null;
if (summary.getMagnitude() != null) {
magnitude = summary.getMagnitude().doubleValue();
}
JDBCUtils.setParameter(updateEvent, 8, magnitude, Types.DOUBLE);
JDBCUtils.setParameter(updateEvent, 9, EVENT_STATUS_UPDATE, Types.VARCHAR);
JDBCUtils.setParameter(updateEvent, 10, indexId, Types.BIGINT);
updateEvent.executeUpdate();
}
LOGGER.log(Level.FINEST, "[" + getName() + "] Updated event properties in Product Index");
} catch (Exception e) {
LOGGER.log(Level.WARNING, "[" + getName() + "] Error updating event properties, eventid=" + indexId, e);
// trigger a rollback
throw e;
}
}
}
}
}