Indexer.java

/*
 * Indexer
 */
package gov.usgs.earthquake.indexer;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DefaultNotificationListener;
import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.earthquake.distribution.HeartbeatListener;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException;
import gov.usgs.earthquake.distribution.ProductStorage;
import gov.usgs.earthquake.geoserve.ANSSRegionsFactory;
import gov.usgs.earthquake.geoserve.GeoserveLayersService;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.util.CompareUtil;
import gov.usgs.util.Config;
import gov.usgs.util.Configurable;
import gov.usgs.util.FutureExecutorTask;
import gov.usgs.util.StringUtils;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * The indexer receives products from Distribution, and adds them to the
 * EventIndex.
 *
 * This class provides the following configurable properties (in addition to
 * those inherited from DefaultNotificationListener):
 * <dl>
 * <dt>associator</dt>
 * <dd>An object that implements the Associator interface.</dd>
 *
 * <dt>storage</dt>
 * <dd>An object that implements the ProductStorage interface.</dd>
 *
 * <dt>index</dt>
 * <dd>An object that implements the ProductIndex interface.</dd>
 *
 * <dt>modules</dt>
 * <dd>A comma delimited list of objects that implement the IndexerModule
 * interface</dd>
 *
 * <dt>listeners</dt>
 * <dd>A comma delimited list of objects that implement the IndexerListener
 * interface</dd>
 * </dl>
 */
public class Indexer extends DefaultNotificationListener {

  /** Logging Utility **/
  private static final Logger LOGGER = Logger.getLogger(Indexer.class.getName());

  /** Preferred weight for persistent trump. */
  public static final long TRUMP_PREFERRED_WEIGHT = 100000000;
  /** Type for persistent trimp */
  public static final String TRUMP_PRODUCT_TYPE = "trump";
  /** Prefix for persistent trump */
  public static final String PERSISTENT_TRUMP_PREFIX = "trump-";

  /** Property name to configure a custom associator. */
  public static final String ASSOCIATOR_CONFIG_PROPERTY = "associator";
  /** Property to associate using current products */
  public static final String ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY = "associateUsingCurrentProducts";
  /** Default state for associate using current products */
  public static final String DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS = "false";

  /** Property name to configure a custom storage. */
  public static final String STORAGE_CONFIG_PROPERTY = "storage";

  /** Shortcut name to configure a file product storage. */
  public static final String STORAGE_DIRECTORY_CONFIG_PROPERTY = "storageDirectory";

  /** Property name to configure a custom index. */
  public static final String INDEX_CONFIG_PROPERTY = "index";

  /** Shortcut name to configure a sqlite index. */
  public static final String INDEXFILE_CONFIG_PROPERTY = "indexFile";

  /** Property name to configure modules. */
  public static final String MODULES_CONFIG_PROPERTY = "modules";

  /** Property name to configure listeners. */
  public static final String LISTENERS_CONFIG_PROPERTY = "listeners";

  /** Property name to configure local regions file. */
  public static final String LOCAL_REGIONS_PROPERTY = "localRegionsFile";
  /** Path to local regions file. */
  public static final String DEFAULT_LOCAL_REGIONS = "regions.json";

  /** Property for geoserve endpoint */
  public static final String GEOSERVE_ENDPOINT_URL_PROPERTY = "geoserveLayersEndpointUrl";

  /** Property name to enable search socket. */
  public static final String ENABLE_SEARCH_PROPERTY = "enableSearch";
  /** Property name for search socket port. */
  public static final String SEARCH_PORT_PROPERTY = "searchPort";
  /** Property name for search socket thread pool size. */
  public static final String SEARCH_THREADS_PROPERTY = "searchThreads";

  /** Default value whether to enable search socket. */
  public static final String DEFAULT_ENABLE_SEARCH = "false";
  /** Default port where search socket listens. */
  public static final String DEFAULT_SEARCH_PORT = "11236";
  /** Number of threads (concurrent searches) allowed. */
  public static final String DEFAULT_SEARCH_THREADS = "5";

  /** Utility used for associating products to events. */
  private Associator associator;

  /** Whether to use (false) all products or (true) current products. */
  private boolean associateUsingCurrentProducts = false;

  /** Where product contents are stored. */
  private ProductStorage productStorage;

  /** Index of stored products, and how they are related. */
  private ProductIndex productIndex;

  /** Read index for {@link #hasProductBeenIndexed(ProductId)} */
  private ProductIndex readProductIndex;

  /** Modules provide product specific functionality. */
  private List<IndexerModule> modules = new LinkedList<IndexerModule>();

  /** Listeners listen for changes to the event index. */
  private Map<IndexerListener, ExecutorService> listeners = new HashMap<IndexerListener, ExecutorService>();

  /** Local file where regions are stored. */
  private File localRegionsFile = new File(DEFAULT_LOCAL_REGIONS);

  /** Timer for archive policy thread. */
  private Timer archiveTimer = null;

  /** Task for archive policy thread. */
  private TimerTask archiveTask = null;

  /** Synchronization object for indexing. */
  private final Object indexProductSync = new Object();

  /**
   * Service used by FutureExecutorTask for execution. See
   * distribution.FutureListenerNotifier for more details.
   */
  private ExecutorService backgroundService;

  /** Whether to (false) or not (true) to run archive policies. */
  private boolean disableArchive = false;

  // -- Configurable property names -- //
  /** Configurable property for index archive internal */
  public static final String INDEX_ARCHIVE_INTERVAL_PROPERTY = "archiveInterval";
  /** Configurable property for index archive policy */
  public static final String INDEX_ARCHIVE_POLICY_PROPERTY = "archivePolicy";

  // -- Default configurable property values -- //
  private static final long INDEX_ARCHIVE_INTERVAL_DEFAULT = 300000L;

  // -- Configured member variables. Values set in configure() method. -- //
  private long archiveInterval = 0;

  private List<ArchivePolicy> archivePolicies = null;

  private SearchServerSocket searchSocket = null;

  private DefaultIndexerModule defaultModule = new DefaultIndexerModule();

  private String geoserveLayersEndpointUrl = null;

  /**
   * Default no-arg constructor. This gets called from the Configurable API. All
   * configuration parameters are set in the "configure" method.
   *
   * @throws Exception If the JDBCProductIndex throws an exception.
   */
  public Indexer() throws Exception {
    addModule(defaultModule);

    associator = new DefaultAssociator();
    productStorage = new FileProductStorage();
    productIndex = new JDBCProductIndex();
    archivePolicies = new LinkedList<ArchivePolicy>();
  }

  /**
   * Returns the current associator used to associate products to one-another and
   * products to events.
   *
   * @return The current Associator.
   */
  public Associator getAssociator() {
    return associator;
  }

  /**
   * Sets the given associator as the current associator to associate products to
   * one-another and products to events.
   *
   * @param associator The associator to use from this point forward.
   */
  public void setAssociator(Associator associator) {
    this.associator = associator;
  }

  /**
   * Returns the product storage component that is used to store products as they
   * are received.
   *
   * @return The current product storage component.
   */
  public ProductStorage getProductStorage() {
    return productStorage;
  }

  /**
   * Sets the current product storage component used to store products as they are
   * received.
   *
   * @param productStorage The product storage component to use from this point
   *                       forward.
   */
  public void setProductStorage(ProductStorage productStorage) {
    this.productStorage = productStorage;
  }

  /**
   * Returns the product index component used to index product information as it
   * is received.
   *
   * @return The current product index component.
   */
  public ProductIndex getProductIndex() {
    return productIndex;
  }

  /**
   * Sets the product index component used to index product information as it is
   * received.
   *
   * @param productIndex The product index component to use from this point
   *                     forward.
   */
  public void setProductIndex(ProductIndex productIndex) {
    this.productIndex = productIndex;
  }

  /**
   * Adds the give indexer module to the current list of modules used by the
   * indexer to handle products.
   *
   * @param toAdd The IndexerModule to add to our list.
   */
  public void addModule(final IndexerModule toAdd) {
    modules.add(toAdd);
  }

  /**
   * Removes the first occurrence of the given indexer module from the current
   * list of known modules.
   *
   * @param toRemove The module to remove.
   * @see java.util.LinkedList#remove(Object)
   */
  public void removeModule(final IndexerModule toRemove) {
    modules.remove(toRemove);
  }

  /**
   * This method checks each module's support level for the given product,
   * returning the first module with the highest support level.
   *
   * @param product the product to summarize.
   * @return module best suited to summarize product.
   */
  protected IndexerModule getModule(final Product product) {
    // mit is the module fetched off the iterator
    // m is the module to return
    IndexerModule mit = null, m = null;
    Iterator<IndexerModule> it = modules.iterator();

    // If there are no known modules, then null. Oops. :)
    if (it.hasNext()) {
      // Use first module so long as it exists
      m = it.next();

      // Check remaining modules if any offer greater support
      while (it.hasNext()) {
        mit = it.next();
        // We use strictly greater than (no equals)
        if (mit.getSupportLevel(product) > m.getSupportLevel(product)) {
          m = mit;
        }
      }
    }
    return m;
  }

  /**
   * Adds a listener to this indexer. Listeners are notified when an event is
   * added, updated, or deleted, or when a new product arrives and is
   * un-associated to an event.
   *
   * @param toAdd The IndexerListener to add
   */
  public void addListener(final IndexerListener toAdd) {
    if (!listeners.containsKey(toAdd)) {
      ExecutorService listenerExecutor = Executors.newSingleThreadExecutor();
      listeners.put(toAdd, listenerExecutor);
    }
  }

  /**
   * Removes a listener from this indexer.Listeners are notified when an event is
   * added, updated, or deleted, or when a new product arrives and is
   * un-associated to an event.
   *
   * @param toRemove The IndexerListener to remove
   */
  public void removeListener(final IndexerListener toRemove) {
    // Remove listener from map
    ExecutorService listenerExecutor = listeners.remove(toRemove);

    if (listenerExecutor != null) {
      // Shutdown executor thread
      listenerExecutor.shutdown();
    }

    backgroundService.shutdown();
    backgroundService = null;
  }

  /**
   * Send an indexer event to all registered IndexerListeners.
   *
   * Creates a NotificationEvent, with a reference to this object and calls each
   * notificationListeners onNotification method in separate threads.
   *
   * This method usually returns before registered NotificationListeners have
   * completed processing a notification.
   *
   * @param event The event that occurred to trigger the notification. Note: An
   *              IndexerEvent has a specific "type" to clarify the type of event
   *              that occurred.
   */
  protected synchronized void notifyListeners(final IndexerEvent event) {
    StringBuffer buf = new StringBuffer();
    Iterator<IndexerChange> changes = event.getIndexerChanges().iterator();
    while (changes.hasNext()) {
      IndexerChange change = changes.next();
      buf.append("\n").append(change.getType().toString()).append(" ");
      if (change.getOriginalEvent() == null) {
        buf.append("null");
      } else {
        buf.append(change.getOriginalEvent().getEventId());
      }
      buf.append(" => ");
      if (change.getNewEvent() == null) {
        buf.append("null");
      } else {
        buf.append(change.getNewEvent().getEventId());
      }
    }

    // Can't rely on event.getSummary because that might be null
    ProductSummary theSummary = event.getSummary();
    if (theSummary == null && event.getEvents().size() > 0) {
      theSummary = event.getEvents().get(0).getEventIdProduct();
    }

    if (theSummary != null) {
      LOGGER.log(Level.INFO, "[" + getName() + "] indexed product id=" + theSummary.getId().toString() + ", status="
          + theSummary.getStatus() + buf.toString());
    } else {
      LOGGER.log(Level.FINE, "[" + getName() + "] event summary was null. This probably "
          + "means the archive policy is notifying of an archived " + "event.");
    }

    Iterator<IndexerListener> it = listeners.keySet().iterator();
    while (it.hasNext()) {
      final IndexerListener listener = it.next();
      ExecutorService listenerExecutor = listeners.get(listener);
      FutureExecutorTask<Void> listenerTask = new FutureExecutorTask<Void>(backgroundService, listenerExecutor,
          listener.getMaxTries(), listener.getTimeout(), new IndexerListenerCallable(listener, event));
      listenerExecutor.submit(listenerTask);
    }
  }

  /**
   * Check whether this product is in the index.
   *
   * NOT synchronized to allow multiple threads to access.
   * readProductIndex.hasProduct is synchronized.
   *
   * @param id ProductId to check
   * @return true if product has already been indexed.
   */
  protected boolean hasProductBeenIndexed(final ProductId id) {
    try {
      if (readProductIndex == productIndex) {
        // synchronize on this if read and product index are same
        synchronized (indexProductSync) {
          readProductIndex.beginTransaction();
          try {
            boolean hasProduct = readProductIndex.hasProduct(id);
            readProductIndex.commitTransaction();
            return hasProduct;
          } catch (Exception e) {
            readProductIndex.rollbackTransaction();
          }
        }
      } else {
        // otherwise synchronize on readProductIndex
        // transaction reconnects if needed
        synchronized (readProductIndex) {
          readProductIndex.beginTransaction();
          try {
            boolean hasProduct = readProductIndex.hasProduct(id);
            readProductIndex.commitTransaction();
            return hasProduct;
          } catch (Exception e) {
            readProductIndex.rollbackTransaction();
          }
        }
      }
    } catch (Exception wtf) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception checking if product already indexed", wtf);
    }

    // default is it hasn't been processed
    return false;
  }

  /**
   * Override the DefaultNotificationListener accept method, to always process
   * products that may affect event association.
   *
   * @param id the product id to check.
   * @return boolean whether the product should be indexed.
   */
  @Override
  public boolean accept(final ProductId id) {
    final boolean superAccept = super.accept(id);

    if (!superAccept && isIncludeActuals()) {
      // automatically accept products that affect association
      // (if processing non-scenario products)
      final String type = id.getType();
      if (Event.ORIGIN_PRODUCT_TYPE.equals(type) || Event.ASSOCIATE_PRODUCT_TYPE.equals(type)
          || Event.DISASSOCIATE_PRODUCT_TYPE.equals(type) || type.startsWith(TRUMP_PRODUCT_TYPE)) {
        return true;
      }
    }

    return superAccept;
  }

  /**
   * Check whether to skip products that have already been indexed.
   */
  @Override
  protected boolean onBeforeProcessNotification(Notification notification) throws Exception {
    // try to short-circuit duplicates
    if (!isProcessDuplicates() && hasProductBeenIndexed(notification.getProductId())) {
      LOGGER.finer(
          "[" + getName() + "] notification already indexed, skipping " + notification.getProductId().toString());
      return false;
    }
    // otherwise, use default behavior
    return super.onBeforeProcessNotification(notification);
  }

  /**
   * This method receives a product from Product Distribution and adds it to the
   * index.
   *
   * Implementation follows from Product Indexer Diagram (pg.10) of
   * ProductIndexer.pdf document dated 09/09/2010.
   *
   * Calls onProduct(product, false), which will not reprocess already processed
   * products.
   *
   * @param product The product triggering the event.
   * @throws Exception if an exception occurs.
   */
  @Override
  public void onProduct(final Product product) throws Exception {
    onProduct(product, false);
  }

  /**
   * Receive a product and add it to the index. Optionally, reprocessing a product
   * that has already been processed.
   *
   * @param product The product triggering the event.
   * @param force   Whether to reprocess products that have already been processed
   *                (true), or skip (false).
   * @throws Exception if error occurs
   */
  public void onProduct(final Product product, final boolean force) throws Exception {
    ProductId id = product.getId();
    final long beginStore = new Date().getTime();

    // -------------------------------------------------------------------//
    // -- Step 1: Store product
    // -------------------------------------------------------------------//
    final Product storedProduct = storeProduct(product, force);
    if (storedProduct == null) {
      return;
    }

    // -------------------------------------------------------------------//
    // -- Step 2: Use product module to summarize product
    // -------------------------------------------------------------------//

    LOGGER.finer("[" + getName() + "] summarizing product id=" + id.toString());
    final ProductSummary productSummary = summarizeProduct(product);

    // -------------------------------------------------------------------//
    // -- Step 3: Add product summary to the product index
    // -------------------------------------------------------------------//

    LOGGER.finer("[" + getName() + "] indexing product id=" + id.toString());
    // measure time waiting to enter synchronized block
    final long beforeEnterSync = new Date().getTime();
    synchronized (indexProductSync) {
      final long afterEnterSync = new Date().getTime();

      try {
        indexProduct(productSummary);
      } finally {
        final long endIndex = new Date().getTime();
        LOGGER.fine("[" + getName() + "] indexer processed product id=" + id.toString() + " in "
            + (endIndex - beginStore) + " ms" + " (" + (afterEnterSync - beforeEnterSync) + " ms sync delay)");
      }
    }
  }

  /**
   * Stores a product
   *
   * @param product Product to store
   * @param force   if should skip already indexed check
   * @return Product if stored, null if not
   * @throws Exception if error occurs
   */
  public Product storeProduct(final Product product, final boolean force) throws Exception {
    final ProductId id = product.getId();
    final long beginStore = new Date().getTime();
    try {
      LOGGER.finest("[" + getName() + "] storing product id=" + id.toString());
      productStorage.storeProduct(product);
      LOGGER.finest("[" + getName() + "] stored product id=" + id.toString());
    } catch (ProductAlreadyInStorageException paise) {
      LOGGER.finer("[" + getName() + "] product already in indexer storage, checking if indexed");
      if (force) {
        LOGGER.finer("[" + getName() + "] force=true skipping check, (re)process product");
      } else if (hasProductBeenIndexed(id)) {
        LOGGER.fine("[" + getName() + "] product already indexed " + product.getId());
        // don't reindex for now
        return null;
      }
    }
    final long endStore = new Date().getTime();
    LOGGER.fine("[" + getName() + "] indexer downloaded product id=" + id.toString() + " in " + (endStore - beginStore)
        + " ms");
    return product;
  }

  /**
   * Use modules to summarize product.
   *
   * @param product To summarize
   * @return A product summary
   * @throws Exception if error occurs
   */
  public ProductSummary summarizeProduct(final Product product) throws Exception {
    // Find best available indexer module
    IndexerModule module = getModule(product);
    return module.getProductSummary(product);
  }

  /**
   * Add product summary to product index.
   *
   * @param productSummary to add
   * @return Summary added to index
   * @throws Exception if error occurs
   */
  protected synchronized ProductSummary indexProduct(ProductSummary productSummary) throws Exception {
    LOGGER.finest("[" + getName() + "] beginning index transaction");

    // The notification to be sent when we are finished with this product
    IndexerEvent notification = new IndexerEvent(this);
    notification.setIndex(getProductIndex());
    notification.setSummary(productSummary);

    // Start the product index transaction, only proceed if able
    productIndex.beginTransaction();

    try {
      LOGGER.finer("[" + getName() + "] finding previous version");
      // Check index for previous version of this product
      ProductSummary prevSummary = getPrevProductVersion(productSummary);

      LOGGER.finer("[" + getName() + "] finding previous event");
      Event prevEvent = null;
      boolean redundantProduct = isRedundantProduct(prevSummary, productSummary);
      if (!redundantProduct) {
        // Skip association queries and use existing product association
        // performed in next branch (should be associated already if
        // "redundant").

        // Check index for existing event candidate
        prevEvent = getPrevEvent(productSummary, true);
      }

      // may be an update/delete to a product that previously associated
      // to an event, even though this product isn't associating on its
      // own
      if (prevSummary != null && prevEvent == null) {
        // see if prevSummary associated with an event
        ProductIndexQuery prevEventQuery = new ProductIndexQuery();
        prevEventQuery.getProductIds().add(prevSummary.getId());
        if (associateUsingCurrentProducts) {
          prevEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
        }
        List<Event> prevEvents = productIndex.getEvents(prevEventQuery);
        if (prevEvents.size() != 0) {
          // just use first (there can really only be one).
          prevEvent = prevEvents.get(0);
        }
      }

      // special handling to allow trump products to associate based on
      // a product link. Not used when eventsource/eventsourcecode set.
      if (prevEvent == null && productSummary.getId().getType().equals(TRUMP_PRODUCT_TYPE)
          && productSummary.getLinks().containsKey("product")
          && !productSummary.getStatus().equalsIgnoreCase(Product.STATUS_DELETE)) {
        // see if we can associate via another product
        ProductIndexQuery otherEventQuery = new ProductIndexQuery();
        otherEventQuery.getProductIds()
            .add(ProductId.parse(productSummary.getLinks().get("product").get(0).toString()));
        if (associateUsingCurrentProducts) {
          otherEventQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
        }
        List<Event> prevEvents = productIndex.getEvents(otherEventQuery);
        if (prevEvents.size() != 0) {
          // just use first (there can really only be one).
          prevEvent = prevEvents.get(0);
        }
      }

      // Add the summary to the index
      LOGGER.finer("[" + getName() + "] adding summary to index");
      if (prevSummary != null && prevSummary.equals(productSummary)) {
        // implied force=true, prevEvent!=null

        // remove the previous version of this product summary
        // so the new one can take its place
        if (prevEvent != null) {
          productIndex.removeAssociation(prevEvent, prevSummary);
        } else {
          LOGGER.fine("[" + getName() + "] reprocessing unassociated summary");
        }
        productIndex.removeProductSummary(prevSummary);
      }
      productSummary = productIndex.addProductSummary(productSummary);

      Event event = null;
      if (prevEvent == null) {
        // No existing event, try to create one and associate
        event = createEvent(productSummary);
        if (event != null) {
          LOGGER.finer("[" + getName() + "] created event indexid=" + event.getIndexId());
          event.log(LOGGER);
        } else {
          LOGGER.finer("[" + getName() + "] unable to create event for product.");
        }
      } else {
        LOGGER.finer("[" + getName() + "] found existing event indexid=" + prevEvent.getIndexId());
        prevEvent.log(LOGGER);

        // Existing event found associate to it
        event = productIndex.addAssociation(prevEvent, productSummary);
      }

      // Can't split or merge a non-existent event
      if (prevEvent != null && event != null) {
        LOGGER.finer("[" + getName() + "] checking for event splits");
        // Check for event splits
        notification.addIndexerChanges(checkForEventSplits(productSummary, prevEvent, event));
      }

      // Is this a problem??? split may modify the event, and then
      // the unmodified version of that event is passed to merge???
      // If this is a problem, checkForEventSplits and checkForEventMerges
      // could be modified to accept the notification object (and add
      // changes to it) and return the potentially modified object by
      // reference.

      if (event != null) {
        LOGGER.finer("[" + getName() + "] checking for event merges");
        // Check for event merges
        notification.addIndexerChanges(checkForEventMerges(productSummary, prevEvent, event));
      }

      // see if this is a trump product that needs special processing.
      event = checkForTrump(event, productSummary, prevSummary);

      // Set our notification indexer changes if not set yet
      if (notification.getIndexerChanges().size() == 0) {
        if (prevEvent == null && event != null) {
          // No previous event, so event added.
          notification.addIndexerChange(new IndexerChange(IndexerChange.EVENT_ADDED, prevEvent, event));
        } else if (prevEvent != null && event != null) {
          // Previous existed so event updated.
          notification.addIndexerChange(new IndexerChange(
              event.isDeleted() ? IndexerChange.EVENT_DELETED : IndexerChange.EVENT_UPDATED, prevEvent, event));
        } else if (prevEvent == null && event == null) {
          // No event existed or could be created.

          if (prevSummary == null) {
            // No previous summary, product added.
            notification.addIndexerChange(new IndexerChange(IndexerChange.PRODUCT_ADDED, null, null));
          } else {
            // Previous summary existed. Product updated.
            notification.addIndexerChange(new IndexerChange(
                productSummary.isDeleted() ? IndexerChange.PRODUCT_DELETED : IndexerChange.PRODUCT_UPDATED, null,
                null));
          }
        }
      }

      LOGGER.finer("[" + getName() + "] updating event summary parameters");
      // update preferred event parameters in index
      productIndex.eventsUpdated(notification.getEvents());

      LOGGER.finer("[" + getName() + "] committing transaction");
      // Commit our changes to the index (after updating summary attrs)
      productIndex.commitTransaction();
    } catch (Exception e) {
      LOGGER.log(Level.FINE, "[" + getName() + "] rolling back transaction", e);
      // just rollback since it wasn't successful
      productIndex.rollbackTransaction();

      // send heartbeat info
      HeartbeatListener.sendHeartbeatMessage(getName(), "index exception", productSummary.getId().toString());
      // send heartbeat info
      HeartbeatListener.sendHeartbeatMessage(getName(), "index exception class", e.getClass().getName());

      throw e;
    }

    try {
      LOGGER.fine("[" + getName() + "] notifying listeners");
      // ---------------------------------------------------------//
      // -- Step 5: Notify listeners with our indexer event
      // ---------------------------------------------------------//
      notifyListeners(notification);
    } catch (Exception e) {
      // this doesn't affect success of index transaction...
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while notifying listeners", e);
    }

    // send heartbeat info
    HeartbeatListener.sendHeartbeatMessage(getName(), "indexed product", productSummary.getId().toString());

    // return summary after added to index
    return productSummary;
  }

  /**
   * Check whether two products are redundant, meaning would not affect event
   * associations and indexer can skip split/merge steps.
   *
   * @param previous previous version of product.
   * @param current  current version of product.
   * @return true if products are equivalent for association purposes.
   */
  private boolean isRedundantProduct(final ProductSummary previous, final ProductSummary current) {
    if (previous == null || previous.equals(current) || !previous.getId().isSameProduct(current.getId())) {
      return false;
    }
    if (previous.getPreferredWeight() == current.getPreferredWeight()
        && CompareUtil.nullSafeCompare(previous.getStatus(), current.getStatus()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventDepth(), current.getEventDepth()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventLatitude(), current.getEventLatitude()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventLongitude(), current.getEventLongitude()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventMagnitude(), current.getEventMagnitude()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventSource(), current.getEventSource()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventSourceCode(), current.getEventSourceCode()) == 0
        && CompareUtil.nullSafeCompare(previous.getEventTime(), current.getEventTime()) == 0) {
      // these are the properties that would influence indexer associations
      // or preferred event properties.
      return true;
    }
    return false;
  }

  /**
   * Check for, and handle incoming trump products.
   *
   * Handles version specific trump products, calls
   * {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)} to
   * handle "persistent" trump products.
   *
   * VERSION SPECIFIC TRUMP
   *
   * Version specific trump products include: - a link with relation "product"
   * that is a product id urn. - a property "weight" that defines the new
   * preferred weight.
   *
   * Finds the associated product, resummarizes. If trump is deleted, product is
   * associated as is. If trump is not deleted, set's preferred weight before
   * reassociating.
   *
   * Preconditions:
   * <ul>
   * <li>The "trump" type product must associate with the correct event on its
   * own. The admin pages accomplish this by sending eventsource/eventsourcecode
   * of the associated product. This means that no product without an
   * eventsource/eventsourcecode property may be trumped.</li>
   * </ul>
   *
   * @param event          current event being updated.
   * @param productSummary product summary associated with event.
   * @return updated event, or same event if not updated.
   * @throws Exception
   * @see {@link #checkForPersistentTrump(Event, ProductSummary, ProductSummary)}
   */
  private Event checkForTrump(Event event, ProductSummary productSummary, ProductSummary prevSummary) throws Exception {
    if (event == null) {
      return event;
    }

    String type = productSummary.getId().getType();
    if (type.equals(TRUMP_PRODUCT_TYPE)) {
      // version specific trump
      ProductId trumpedId = null;
      ProductSummary trumpedSummary = null;
      if (productSummary.isDeleted()) {
        // deleting version specific trump
        // reset preferred weight of reference product
        // (is is a link in previous version of trump)
        trumpedId = getTrumpedProductId(prevSummary);
        if (trumpedId == null) {
          LOGGER.warning("Unable to process trump delete, " + "missing 'product' link from previous version");
          return event;
        }
        trumpedSummary = getProductSummaryById(trumpedId);
        if (trumpedSummary == null) {
          // no matching product in index, possibly already deleted
          return event;
        }
        // resummarize product
        event = resummarizeProduct(event, trumpedSummary);
      } else {
        // updating product weight
        trumpedId = getTrumpedProductId(productSummary);
        Long weight = Long.valueOf(productSummary.getProperties().get("weight"));
        if (trumpedId == null || weight == null) {
          LOGGER.warning("Invalid trump, " + "missing 'product' link or 'weight' property");
          return event;
        }
        trumpedSummary = getProductSummaryById(trumpedId);
        if (trumpedSummary == null) {
          // no matching product in index, possibly already deleted
          LOGGER.info("Unable to process trump, " + "product '" + trumpedId.toString() + "' not found");
          return event;
        }
        event = setSummaryWeight(event, trumpedSummary, weight);
      }
    } else {
      return checkForPersistentTrump(event, productSummary);
    }

    return event;
  }

  /**
   * Check for, and handle persistent trump products.
   *
   * PERSISTENT TRUMP
   *
   * Persistent trump products include: - a type "trump-PRODUCT" where PRODUCT is
   * the type of product receiving trump. - a property "trump-source" that is the
   * source of product receiving trump. - a property "trump-code" that is the code
   * of product receiving trump.
   *
   * Steps:
   *
   * 1) Find preferred persistent trump product for product being associated (may
   * be a persistent trump product being associated) 2) If a non-trump product
   * being associated, stop processing if not affected by trump. 3) set
   * TRUMP_PREFERRED_WEIGHT on product referenced by preferred persistent trump
   * product; resummarize any other product that has TRUMP_PREFERRED_WEIGHT.
   *
   * Preconditions:
   * <ul>
   * <li>The "trump" type product must associate with the correct event on its
   * own. The admin pages accomplish this by sending eventsource/eventsourcecode
   * of the associated product.</li>
   * </ul>
   *
   * @param event          current event being updated.
   * @param productSummary product summary associated with event.
   * @return updated event, or same event if not updated.
   * @throws Exception
   */
  private Event checkForPersistentTrump(final Event event, final ProductSummary productSummary) throws Exception {
    Event updatedEvent = event;

    // the type of product currently being indexed
    String type = productSummary.getType();
    // the "trump-TYPE" product type
    String persistentTrumpType = null;
    // whether productSummary is a persistent trump product
    boolean associatingTrump = false;
    // the product source receiving trump
    String trumpSource = null;
    // the product type receiving trump
    String trumpType = null;
    // the product code receiving trump
    String trumpCode = null;

    // determine persistentTrumpType and trumpType
    if (type.startsWith(PERSISTENT_TRUMP_PREFIX)) {
      // incoming trump product
      persistentTrumpType = type;
      trumpType = type.replace(PERSISTENT_TRUMP_PREFIX, "");
      associatingTrump = true;
      // always set persistent trump preferred weight to 1,
      // so most recent updateTime is most preferred
      updatedEvent = setSummaryWeight(updatedEvent, productSummary, 1L);
    } else {
      // incoming product, possibly affected by existing trump
      persistentTrumpType = PERSISTENT_TRUMP_PREFIX + type;
      trumpType = type;
    }

    // find active persistent trump product for type
    ProductSummary persistentTrump = updatedEvent.getPreferredProduct(persistentTrumpType);
    if (persistentTrump != null) {
      trumpSource = persistentTrump.getProperties().get("trump-source");
      trumpCode = persistentTrump.getProperties().get("trump-code");
    }

    // if a non-trump product is coming in,
    // only continue processing if it is affected by persistentTrump.
    // (otherwise weights should already be set)
    if (!associatingTrump
        && !(productSummary.getSource().equals(trumpSource) && productSummary.getCode().equals(trumpCode))) {
      // not affected by trump
      return event;
    }

    // update products affected by trump
    List<ProductSummary> products = updatedEvent.getProducts(trumpType);
    if (products != null) {
      for (ProductSummary summary : products) {
        if (summary.getSource().equals(trumpSource) && summary.getCode().equals(trumpCode)) {
          // add trump to product
          updatedEvent = setSummaryWeight(updatedEvent, summary, TRUMP_PREFERRED_WEIGHT);
        } else if (summary.getPreferredWeight() == TRUMP_PREFERRED_WEIGHT) {
          // remove trump from previously trumped product.
          updatedEvent = resummarizeProduct(updatedEvent, summary);
        }
      }
    }
    // return updated event
    return updatedEvent;
  }

  /**
   * Get the productId referred to by a trump product.
   *
   * @param trumpSummary trump product with reference to product id.
   * @return product id, or null if unable to parse product id.
   */
  protected ProductId getTrumpedProductId(final ProductSummary trumpSummary) {
    try {
      // use 'product' link from previous summary
      ProductId trumpedId = ProductId.parse(trumpSummary.getLinks().get("product").get(0).toString());
      return trumpedId;
    } catch (Exception e) {
      return null;
    }
  }

  /**
   * Get a product summary object using its product id.
   *
   * @param id id to find.
   * @return matching product summary or null.
   * @throws Exception if error occurs
   */
  protected ProductSummary getProductSummaryById(final ProductId id) throws Exception {
    ProductIndexQuery query = new ProductIndexQuery();
    query.getProductIds().add(id);
    List<ProductSummary> summaries = productIndex.getProducts(query);
    if (summaries.size() > 0) {
      return summaries.get(0);
    }
    return null;
  }

  /**
   * Update a product summary weight
   *
   * @param event           the event.
   * @param summary         the summary.
   * @param preferredWeight the weight to set.
   * @return event with updated summary.
   * @throws Exception if error occurs
   */
  protected Event setSummaryWeight(Event event, ProductSummary summary, final Long preferredWeight) throws Exception {
    if (summary.getPreferredWeight() == preferredWeight) {
      // already set
      return event;
    }

    LOGGER.info("Setting product preferred weight " + summary.getId().toString() + ", weight "
        + summary.getPreferredWeight() + " (old) => " + preferredWeight + " (new)");

    // remove existing summary from event
    event = productIndex.removeAssociation(event, summary);
    productIndex.removeProductSummary(summary);
    // set custom weight
    summary.setPreferredWeight(preferredWeight);
    // add updated summary to event
    summary = productIndex.addProductSummary(summary);
    event = productIndex.addAssociation(event, summary);
    // return updated event
    return event;
  }

  /**
   * Resummarize a product within an event.
   *
   * @param event   the event.
   * @param summary the summary.
   * @return event with updated summary.
   * @throws Exception if error occurs
   */
  protected Event resummarizeProduct(final Event event, final ProductSummary summary) throws Exception {
    Event updatedEvent = null;
    ProductSummary updatedSummary = null;
    // remove existing summary from event
    updatedEvent = productIndex.removeAssociation(event, summary);
    productIndex.removeProductSummary(summary);
    // use module to summarize original product
    Product product = productStorage.getProduct(summary.getId());
    if (product == null) {
      throw new Exception("Unable to resummarize product, " + "product not in storage " + summary.getId().toString());
    }
    updatedSummary = getModule(product).getProductSummary(product);
    LOGGER.info("Resummarizing product " + summary.getId().toString() + ", weight " + summary.getPreferredWeight()
        + " (old) => " + updatedSummary.getPreferredWeight() + " (new)");
    // add updated summary to event
    updatedSummary = productIndex.addProductSummary(updatedSummary);
    updatedEvent = productIndex.addAssociation(updatedEvent, updatedSummary);
    // return updated event
    return updatedEvent;
  }

  /**
   * Check for event splits (and split them if needed).
   *
   * @param summary       the summary the indexer is currently processing.
   * @param originalEvent the event before the indexer made any changes.
   * @param updatedEvent  the event after the indexer made any changes.
   * @return List of changes made during this method.
   * @throws Exception if error occurs
   */
  protected synchronized List<IndexerChange> checkForEventSplits(final ProductSummary summary,
      final Event originalEvent, final Event updatedEvent) throws Exception {
    List<IndexerChange> changes = new ArrayList<IndexerChange>();

    // save reference so we can check later if this has changed
    Event splitEvent = updatedEvent;

    // ## 1) Split event into sub events
    Map<String, Event> subEvents = splitEvent.getSubEvents();
    if (subEvents.size() == 1) {
      // still only one event, cannot split
      return changes;
    }

    // the original eventid before the indexer started processing this
    // update (may have already changed in updatedEvent).
    String originalEventId = originalEvent.getEventId();

    // ## 2) See if sub events still associate
    // list of events that actually are split
    List<Event> alreadySplit = new ArrayList<Event>();

    // see how sub events associate compared to this event (since the
    // original event is the one that should be "UPDATED")
    Event originalSubEvent = subEvents.remove(originalEventId);
    if (originalSubEvent == null) {
      // wtf
      // this should always exist because its ID was returned by getEventId,
      // should should have at least one product. Log:
      LOGGER.warning("[" + getName() + "] originalSubEvent is null" + ", originalEventId=" + originalEventId);
      for (final String id : subEvents.keySet()) {
        final Event subEvent = subEvents.get(id);
        subEvent.log(LOGGER);
      }
      return changes;
    }

    Iterator<String> subEventsIter = new ArrayList<String>(subEvents.keySet()).iterator();
    while (subEventsIter.hasNext()) {
      String nextEventId = subEventsIter.next();
      Event nextEvent = subEvents.get(nextEventId);

      if (!originalSubEvent.isAssociated(nextEvent, associator)) {
        // not associated, so split
        splitEvent = splitEvents(splitEvent, nextEvent);

        // see if associated to any that already split
        Iterator<Event> alreadySplitIter = alreadySplit.iterator();
        while (alreadySplitIter.hasNext()) {
          Event alreadySplitEvent = alreadySplitIter.next();
          if (alreadySplitEvent.isAssociated(nextEvent, associator)) {
            // need to merge with event that already split

            // will need reference to alreadySplitEvent, so keep
            // reference to merged event in nextEvent
            nextEvent = mergeEvents(alreadySplitEvent, nextEvent);
            // remove original already split
            alreadySplit.remove(alreadySplitEvent);
            // add merged nextEvent
            alreadySplit.add(nextEvent);
            // signal that nextEvent was already added to
            // alreadySplit
            nextEvent = null;
            // associated, and one at a time, so stop checking
            break;
          }
        }

        if (nextEvent != null) {
          // wasn't merged with an already split event
          alreadySplit.add(nextEvent);
        }
      }
    }
    if (alreadySplit.size() == 0) {
      // didn't split any events...
      return changes;
    }

    // ## 3) Build list of Indexer changes that actually happened.
    String splitEventId = splitEvent.getEventId();

    if (!originalEventId.equalsIgnoreCase(splitEventId)) {
      LOGGER.warning("[" + getName() + "] eventid (" + splitEventId + ") no longer matches original (" + originalEventId
          + ") after split.");
    }

    // first notify about updated original event
    changes.add(new IndexerChange((splitEvent.isDeleted() ? IndexerChange.EVENT_DELETED : IndexerChange.EVENT_UPDATED),
        originalEvent, splitEvent));

    // now notify about all events that split from original event
    Iterator<Event> alreadySplitIter = alreadySplit.iterator();
    while (alreadySplitIter.hasNext()) {
      Event alreadySplitEvent = alreadySplitIter.next();
      changes.add(new IndexerChange(IndexerChange.EVENT_SPLIT, null, alreadySplitEvent));
    }

    // done
    return changes;
  }

  /**
   * Removes the leaf event (and all its products) from the root event. This
   * method modifies the runtime objects as well as updating the index DB.
   *
   * @param root The root event from which all leaf products will be removed
   * @param leaf The event (with products) that will be removed from the root
   * @return copy of root without the products that have been removed. The indexId
   *         property of leaf is updated to its new value.
   * @throws Exception if error occurs
   */
  protected synchronized Event splitEvents(final Event root, final Event leaf) throws Exception {
    Event updated = root;
    Iterator<ProductSummary> leafProducts = leaf.getProductList().iterator();

    // assign leaf indexId by reference
    Event insertedLeafEvent = productIndex.addEvent(leaf);
    leaf.setIndexId(insertedLeafEvent.getIndexId());

    while (leafProducts.hasNext()) {
      ProductSummary product = leafProducts.next();
      if (updated != null) {
        updated = productIndex.removeAssociation(updated, product);
      }
      // leaf already has the product in its list, not returning anyways.
      productIndex.addAssociation(leaf, product);
    }

    return updated;
  }

  /**
   * Merges the child event (and all its products) into the target event. If the
   * child event attempts to merge in a product that is the same as one already
   * associated to the target event, the child version of the product takes
   * precedence. Note: This only applies when the target and child product have
   * the same type, code, source, and update time; i.e. the products are
   * duplicates. This method modifies the runtime objects as well as the index DB.
   * The child event is then deleted.
   *
   * @param target The target event into which the child is merged.
   * @param child  The child event to be merged into the target.
   * @return the updated event
   * @throws Exception if error occurs
   */
  protected synchronized Event mergeEvents(final Event target, final Event child) throws Exception {
    Iterator<ProductSummary> childProducts = child.getProductList().iterator();
    Event updatedEvent = target;
    Event updatedChild = child;

    while (childProducts.hasNext()) {
      ProductSummary product = childProducts.next();
      productIndex.removeAssociation(child, product);
      updatedChild = productIndex.removeAssociation(updatedChild, product);
      updatedEvent = productIndex.addAssociation(updatedEvent, product);
    }

    productIndex.removeEvent(updatedChild);

    return updatedEvent;
  }

  /**
   * Check and merge any nearby events or previously unassociated products that
   * now associate.
   *
   * @param summary       the summary currently being processed by the indexer.
   * @param originalEvent the event before any changes.
   * @param updatedEvent  the event after the summary was associated.
   * @return list of any merge type changes.
   * @throws Exception if error occurs
   */
  protected synchronized List<IndexerChange> checkForEventMerges(final ProductSummary summary,
      final Event originalEvent, final Event updatedEvent) throws Exception {
    List<IndexerChange> changes = new ArrayList<IndexerChange>();
    Event mergedEvent = updatedEvent;

    // ## 1) Check for nearby events
    if (originalEvent != null) {
      // only if the event was not just created, because otherwise this
      // product would have associated to an existing event

      // build the query
      EventSummary mergedSummary = mergedEvent.getEventSummary();
      ProductIndexQuery nearbyEvents = associator.getLocationQuery(mergedSummary.getTime(), mergedSummary.getLatitude(),
          mergedSummary.getLongitude());
      if (associateUsingCurrentProducts && nearbyEvents != null) {
        nearbyEvents.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
      }

      LOGGER.finer("[" + getName() + "] searching for nearby events");
      // do the search
      Iterator<Event> events = productIndex.getEvents(nearbyEvents).iterator();
      LOGGER.finer("[" + getName() + "] search for nearby events complete");
      while (events.hasNext()) {
        Event foundEvent = events.next();
        if (foundEvent.getIndexId().equals(mergedEvent.getIndexId())) {
          // found the event currently being checked for merges,
          // ignore
          continue;
        } else if (mergedEvent.isAssociated(foundEvent, associator)) {
          // event associates to another event, merge them
          mergedEvent = mergeEvents(mergedEvent, foundEvent);
          changes.add(new IndexerChange(IndexerChange.EVENT_MERGED, foundEvent, null));
        }
      }
    }

    // ## 2) Now look for products that were previously unassociated, but
    // that can now associate because of the event id of the incoming
    // product
    // (if the event already had this id, these products would already be
    // associated...)
    String source = summary.getEventSource();
    String sourceCode = summary.getEventSourceCode();
    // without this check, all unassociated products would be added...(BAD)
    if (source != null && sourceCode != null) {
      // build the query
      ProductIndexQuery unassociatedProducts = new ProductIndexQuery();
      unassociatedProducts.setEventSource(source);
      unassociatedProducts.setEventSourceCode(sourceCode);

      // run the query
      LOGGER.finer("[" + getName() + "] searching for unassociated products");
      Iterator<ProductSummary> summaries = productIndex.getUnassociatedProducts(unassociatedProducts).iterator();
      LOGGER.finer("[" + getName() + "] search for unassociated products complete");
      // add associations
      while (summaries.hasNext()) {
        mergedEvent = productIndex.addAssociation(mergedEvent, summaries.next());
      }
    }

    // ## 2.5) Check for merge by associate product
    // only need to check when associate product is first added
    // THIS IMPLEMENTATION ASSUMES: both events exist when associate product
    // is sent. Search for existing event (during getPrevEvent) does not
    // search associate products othereventsource or othereventsourcecode
    // properties.
    if (summary.getType().equals(Event.ASSOCIATE_PRODUCT_TYPE) && !summary.isDeleted()) {
      String otherEventSource = summary.getProperties().get(Event.OTHEREVENTSOURCE_PROPERTY);
      String otherEventSourceCode = summary.getProperties().get(Event.OTHEREVENTSOURCECODE_PROPERTY);

      if (otherEventSource == null || otherEventSourceCode == null) {
        LOGGER.warning(Event.ASSOCIATE_PRODUCT_TYPE + " product without " + Event.OTHEREVENTSOURCE_PROPERTY + " or "
            + Event.OTHEREVENTSOURCECODE_PROPERTY + " properties, ignoring");
      } else {
        // search for associated event
        ProductIndexQuery associateQuery = new ProductIndexQuery();
        associateQuery.setEventSource(otherEventSource);
        associateQuery.setEventSourceCode(otherEventSourceCode);

        if (associateUsingCurrentProducts) {
          associateQuery.setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
        }

        LOGGER.finer("[" + getName() + "] searching for associated event");
        // do the search
        Iterator<Event> events = productIndex.getEvents(associateQuery).iterator();
        LOGGER.finer("[" + getName() + "] search for associated event complete");
        while (events.hasNext()) {
          Event foundEvent = events.next();
          if (foundEvent.getIndexId().equals(mergedEvent.getIndexId())) {
            // found the event currently being checked for merges,
            // ignore
            continue;
          } else if (mergedEvent.isAssociated(foundEvent)) {
            // event associates to another event, merge them
            mergedEvent = mergeEvents(mergedEvent, foundEvent);
            changes.add(new IndexerChange(IndexerChange.EVENT_MERGED, foundEvent, null));
          }
        }
      }
    }

    // ## 4) Check if the event has changed during this method
    if (mergedEvent != updatedEvent) {
      // something has changed, so add an IndexerChange
      if (originalEvent == null) {
        // no previous event, it was added (although unassociated
        // products were associated)
        changes.add(new IndexerChange(IndexerChange.EVENT_ADDED, null, mergedEvent));
      } else {
        // may have merged with other events, or associated unassociated
        // products. Other changes represent the merges, so just
        // indicate update/delete.
        changes.add(
            new IndexerChange((mergedEvent.isDeleted() ? IndexerChange.EVENT_DELETED : IndexerChange.EVENT_UPDATED),
                originalEvent, mergedEvent));
      }
    }

    return changes;
  }

  /**
   * Takes a summary return the previous
   *
   * @param summary A product summary
   * @return The previous summary
   * @throws Exception if error occurs
   */
  protected synchronized ProductSummary getPrevProductVersion(ProductSummary summary) throws Exception {
    ProductSummary prevSummary = null;
    List<ProductSummary> candidateSummaries = null;
    ProductIndexQuery query = new ProductIndexQuery();

    // Set type, code and source
    query.setProductType(summary.getType());
    query.setProductCode(summary.getCode());
    query.setProductSource(summary.getSource());

    // Query the index (first look for associated products)
    candidateSummaries = productIndex.getProducts(query);

    if (candidateSummaries == null || candidateSummaries.size() == 0) {
      // No summaries found associated to events, try unassociated.
      candidateSummaries = productIndex.getUnassociatedProducts(query);
    }

    if (candidateSummaries != null && candidateSummaries.size() > 0) {
      prevSummary = candidateSummaries.get(0);
      if (candidateSummaries.size() != 1) {
        LOGGER.warning("[" + getName() + "] " + summary.getId().toString()
            + ": More than one existing summary is claiming to be most recent.");
      }
    }
    return prevSummary;
  }

  /**
   * Associate products are processed during
   * {@link #checkForEventMerges(ProductSummary, Event, Event)} and are ignored
   * during this method.
   *
   * @see Associator#getSearchRequest(ProductSummary)
   * @see Associator#chooseEvent(List, ProductSummary)
   *
   * @param summary ProductSummary
   * @return Event to which a productSummary is associated, or null if not found.
   * @throws Exception if error occurs
   */
  protected synchronized Event getPrevEvent(ProductSummary summary) throws Exception {
    return getPrevEvent(summary, false);
  }

  /**
   * Find an existing event that summary should associate with.
   *
   * @param summary     the previous event.
   * @param associating whether associating (vs archiving).
   * @return previous event, or null if none found.
   * @throws Exception if error occurs
   */
  protected synchronized Event getPrevEvent(ProductSummary summary, boolean associating) throws Exception {
    Event prevEvent = null;
    List<Event> candidateEvents = null;

    SearchRequest request = associator.getSearchRequest(summary);

    if (associating && associateUsingCurrentProducts) {
      for (SearchQuery query : request.getQueries()) {
        query.getProductIndexQuery().setResultType(ProductIndexQuery.RESULT_TYPE_CURRENT);
      }
    }

    SearchResponse response = search(request);
    if (response != null) {
      candidateEvents = response.getEvents();
    }

    if (candidateEvents != null && candidateEvents.size() > 0) {
      // Found some events. Find best match.
      prevEvent = associator.chooseEvent(candidateEvents, summary);
    }

    return prevEvent;
  }

  /*
   * protected IndexerEvent createIndexerEvent(ProductSummary prevSummary, Event
   * prevEvent, ProductSummary summary, Event event) { IndexerType type = null;
   * IndexerEvent indexerEvent = new IndexerEvent(this);
   *
   * // ---------------------------------- // Determine the type if IndexerEvent
   * // ----------------------------------
   *
   * if (summary.getStatus() == Product.STATUS_DELETE) { type =
   * IndexerEvent.PRODUCT_DELETED; if (event != null) { // Since we have an event,
   * this is now an EVENT_UPDATED type type = IndexerEvent.EVENT_UPDATED;
   *
   * // Check if all products on event are deleted. if
   * (event.getProductList().size() == 0) { type = IndexerEvent.EVENT_DELETED; } }
   * } else { // Product was not a "DELETE" status. Must be an added or updated.
   * if (prevEvent == null && event != null) { type = IndexerEvent.EVENT_ADDED; }
   * else if (prevEvent != null && event != null) { type =
   * IndexerEvent.EVENT_UPDATED; } else if (prevSummary == null && summary !=
   * null) { type = IndexerEvent.PRODUCT_ADDED; } else if (prevSummary != null &&
   * summary != null) { type = IndexerEvent.PRODUCT_UPDATED; }
   *
   * if (summary == null) { // Not sure how this happens.
   * LOGGER.warning("Trying to notify of a null summary."); } }
   *
   * // Set parameters indexerEvent.setEventType(type);
   * indexerEvent.setOldEvent(prevEvent); indexerEvent.setSummary(summary);
   * indexerEvent.setEvent(event);
   *
   * return indexerEvent; }
   */
  /**
   * Loads parent, specific, and dependent configurations; in that order.
   */
  @Override
  public synchronized void configure(Config config) throws Exception {
    // -- Load parent configurations -- //
    super.configure(config);

    // reads properties from same config section
    defaultModule.getSignatureVerifier().configure(config);

    // -- Load specific configurations -- //
    String associatorName = config.getProperty(ASSOCIATOR_CONFIG_PROPERTY);
    if (associatorName != null) {
      associator = (Associator) Config.getConfig().getObject(associatorName);
    }

    String storageName = config.getProperty(STORAGE_CONFIG_PROPERTY);
    String storageDirectory = config.getProperty(STORAGE_DIRECTORY_CONFIG_PROPERTY);
    if (storageName != null) {
      LOGGER.config("[" + getName() + "] loading ProductStorage '" + storageName + "'");
      productStorage = (ProductStorage) Config.getConfig().getObject(storageName);
      if (productStorage == null) {
        throw new ConfigurationException(
            "[" + getName() + "] ProductStorage '" + storageName + "' is not properly configured");
      }
    } else if (storageDirectory != null) {
      LOGGER.config("[" + getName() + "] using storage directory '" + storageDirectory + "'");
      productStorage = new FileProductStorage(new File(storageDirectory));
    } else {
      productStorage.configure(config);
    }

    String indexName = config.getProperty(INDEX_CONFIG_PROPERTY);
    String indexFileName = config.getProperty(INDEXFILE_CONFIG_PROPERTY);
    if (indexName != null) {
      LOGGER.config("[" + getName() + "] loading ProductIndex '" + indexName + "'");
      productIndex = (ProductIndex) Config.getConfig().getObject(indexName);
      if (productIndex == null) {
        throw new ConfigurationException(
            "[" + getName() + "] ProductIndex '" + indexName + "' is not properly configured");
      }
    } else if (indexFileName != null) {
      LOGGER.config("[" + getName() + "] using sqlite product index '" + indexFileName + "'");
      productIndex = new JDBCProductIndex(indexFileName);
    } else {
      productIndex.configure(config);
    }

    // How often to check for expired products
    String archivePolicy = config.getProperty(INDEX_ARCHIVE_POLICY_PROPERTY);
    if (archivePolicy != null) {
      Iterator<String> iter = StringUtils.split(archivePolicy, ",").iterator();
      while (iter.hasNext()) {
        String policyName = iter.next();

        LOGGER.config("[" + getName() + "] loading ArchivePolicy '" + policyName + "'");
        ArchivePolicy policy = (ArchivePolicy) Config.getConfig().getObject(policyName);
        if (policy == null) {
          throw new ConfigurationException(
              "[" + getName() + "] ArchivePolicy '" + policyName + "' is not configured properly");
        }

        // Only use archive policies that are valid
        if (policy.isValidPolicy()) {
          archivePolicies.add(policy);
        } else {
          LOGGER.warning("[" + getName() + "] ArchivePolicy '" + policyName + "' is not valid");
        }
      }
    }

    // How often should the archive policies be run
    String buffer = config.getProperty(INDEX_ARCHIVE_INTERVAL_PROPERTY);
    if (buffer != null) {
      archiveInterval = Long.parseLong(buffer);
    } else {
      // Use default age
      archiveInterval = INDEX_ARCHIVE_INTERVAL_DEFAULT;
    }
    LOGGER.config("[" + getName() + "] archive interval is '" + archiveInterval + "'");

    // Always use at least a default indexer module
    String moduleNames = config.getProperty(MODULES_CONFIG_PROPERTY);
    if (moduleNames != null) {
      Iterator<String> modules = StringUtils.split(moduleNames, ",").iterator();
      while (modules.hasNext()) {
        String moduleName = modules.next();
        if (moduleName.equals("")) {
          continue;
        }
        LOGGER.config("[" + getName() + "] loading indexer module '" + moduleName + "'");
        IndexerModule module = (IndexerModule) Config.getConfig().getObject(moduleName);
        if (module == null) {
          throw new ConfigurationException(
              "[" + getName() + "] indexer module '" + moduleName + "' is not configured properly");
        }
        addModule(module);
      }
    } else {
      LOGGER.config("[" + getName() + "] no indexer modules configured.");
    }

    String listenerNames = config.getProperty(LISTENERS_CONFIG_PROPERTY);
    if (listenerNames != null) {
      Iterator<String> listeners = StringUtils.split(listenerNames, ",").iterator();
      while (listeners.hasNext()) {
        String listenerName = listeners.next();
        if (listenerName.equals("")) {
          continue;
        }
        LOGGER.config("[" + getName() + "] loading indexer listener '" + listenerName + "'");
        IndexerListener listener = (IndexerListener) Config.getConfig().getObject(listenerName);
        if (listener == null) {
          throw new ConfigurationException(
              "[" + getName() + "] indexer listener '" + listenerName + "' is not configured properly");
        }
        addListener(listener);
      }
    } else {
      LOGGER.config("[" + getName() + "] no indexer listeners configured.");
    }

    this.geoserveLayersEndpointUrl = config.getProperty(GEOSERVE_ENDPOINT_URL_PROPERTY);

    String localRegions = config.getProperty(LOCAL_REGIONS_PROPERTY, DEFAULT_LOCAL_REGIONS);
    this.localRegionsFile = new File(localRegions);
    LOGGER.config("[" + getName() + "] Local regions file: " + this.localRegionsFile);

    String enableSearch = config.getProperty(ENABLE_SEARCH_PROPERTY, DEFAULT_ENABLE_SEARCH);
    if (Boolean.valueOf(enableSearch)) {
      searchSocket = new SearchServerSocket();
      searchSocket.setIndex(this);

      int searchPort = Integer.parseInt(config.getProperty(SEARCH_PORT_PROPERTY, DEFAULT_SEARCH_PORT));
      searchSocket.setPort(searchPort);

      int searchThreads = Integer.parseInt(config.getProperty(SEARCH_THREADS_PROPERTY, DEFAULT_SEARCH_THREADS));
      searchSocket.setThreads(searchThreads);

      LOGGER.config("[" + getName() + "] SearchServerSocket running at localhost:" + searchPort + ", with "
          + searchThreads + " threads");
    }
    // -- Load dependent configurations -- //

    associateUsingCurrentProducts = Boolean.valueOf(
        config.getProperty(ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY, DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS));
    LOGGER.config("[" + getName() + "] associateUsingCurrentProducts = " + associateUsingCurrentProducts);
  }

  /**
   * Shuts down the Indexer. The parent shutdown method is called and then all
   * executor services (from listeners) are shutdown in sequence.
   */
  @Override
  public synchronized void shutdown() throws Exception {
    // -- Shut down dependent processes -- //
    try {
      if (readProductIndex != productIndex) {
        readProductIndex.shutdown();
      }
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down read product index", e);
    }
    try {
      productIndex.shutdown();
    } catch (Exception e) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down product index", e);
    }
    productStorage.shutdown();

    // ExecutorServices tied to known listeners.
    Iterator<IndexerListener> iter = listeners.keySet().iterator();
    while (iter.hasNext()) {
      IndexerListener listener = iter.next();
      try {
        listeners.get(listener).shutdown();
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down listener executor", e);
      }
      if (listener instanceof Configurable) {
        try {
          ((Configurable) listener).shutdown();
        } catch (Exception e) {
          LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down listener", e);
        }
      }
    }

    Iterator<IndexerModule> modules = this.modules.iterator();
    while (modules.hasNext()) {
      IndexerModule module = modules.next();
      if (module instanceof Configurable) {
        try {
          ((Configurable) module).shutdown();
        } catch (Exception e) {
          LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down module", e);
        }
      }
    }
    // -- Shut down our own specific processes -- //

    // Shut down our timers if they exist
    if (archiveTask != null) {
      archiveTask.cancel();
      archiveTask = null;
    }
    if (archiveTimer != null) {
      archiveTimer.cancel();
      archiveTimer = null;
    }

    if (searchSocket != null) {
      searchSocket.shutdown();
    }
    // -- Call parent shutdown method -- //
    super.shutdown();
  }

  /**
   * Starts up the necessary parent, specific, and dependent processes, in that
   * order.
   */
  @Override
  public synchronized void startup() throws Exception {
    // -- Call parent startup method -- //
    super.startup();

    // -- Start up our own specific processes -- //

    backgroundService = Executors.newCachedThreadPool();

    // -- Start dependent processes -- //
    // ExecutorServices tied to known listeners.
    Iterator<IndexerListener> iter = listeners.keySet().iterator();
    while (iter.hasNext()) {
      IndexerListener listener = iter.next();
      if (listener instanceof Configurable) {
        ((Configurable) listener).startup();
      }
    }

    // configure regions factory before modules
    ANSSRegionsFactory factory = ANSSRegionsFactory.getFactory(false);
    if (this.geoserveLayersEndpointUrl != null) {
      GeoserveLayersService layersService = factory.getGeoserveLayersService();
      layersService.setEndpointURL(geoserveLayersEndpointUrl);
    }
    factory.setLocalRegions(localRegionsFile);
    factory.startup();

    Iterator<IndexerModule> modules = this.modules.iterator();
    while (modules.hasNext()) {
      IndexerModule module = modules.next();
      if (module instanceof Configurable) {
        ((Configurable) module).startup();
      }
    }

    // ProductIndex
    productStorage.startup();
    productIndex.startup();

    // if using mysql product index, create separate read index
    readProductIndex = null;
    if (productIndex instanceof JDBCProductIndex) {
      JDBCProductIndex jdbcProductIndex = (JDBCProductIndex) productIndex;
      if (jdbcProductIndex.getDriver().contains("mysql")) {
        readProductIndex = new JDBCProductIndex();
        ((JDBCProductIndex) readProductIndex).setDriver(jdbcProductIndex.getDriver());
        ((JDBCProductIndex) readProductIndex).setUrl(jdbcProductIndex.getUrl());
        readProductIndex.startup();
      }
    }
    if (readProductIndex == null) {
      // otherwise use same index
      readProductIndex = productIndex;
    }

    // Cleanup thread to purge old products
    if (archivePolicies.size() > 0) {
      // Instantiate a timer object
      archiveTimer = new Timer();
      // Instantiate the task object
      archiveTask = new TimerTask() {
        public void run() {
          try {
            int[] counts = purgeExpiredProducts();
            LOGGER.info(
                String.format("[" + getName() + "] purged %d expired events and %d expired unassociated products.",
                    counts[0], counts[1]));
          } catch (Exception ex) {
            LOGGER.log(Level.WARNING, "[" + getName() + "] indexer cleanup thread threw exception", ex);
          }
        }
      };
      // Run archiver immediately at startup, then at regular intervals
      archiveTimer.schedule(archiveTask, 0L, archiveInterval);
    }

    if (searchSocket != null) {
      searchSocket.startup();
    }
  }

  /**
   * Checks the index for content that match a configured archive policy. Events
   * are checked first and matched events are removed along with all their
   * products. Listeners are notified of each archived event with an
   * EVENT_ARCHIVED type. Unassociated products are checked next, matched
   * unassociated products are archived and listeners are notified with
   * PRODUCT_ARCHIVE type.
   *
   * Note: Product "age" is determined by when the earthquake for that product
   * occurred and does not reflect how long the product has actually been in the
   * index.
   *
   * @see #archivePolicies
   * @return Int array of size 2
   * @throws Exception if error occurs
   */
  public synchronized int[] purgeExpiredProducts() throws Exception {
    int[] counts = { 0, 0 };
    ProductIndexQuery query = null;
    ArchivePolicy policy = null;

    if (isDisableArchive()) {
      LOGGER.info("Archiving disabled");
      return counts;
    }

    for (int i = 0; i < archivePolicies.size(); i++) {
      policy = archivePolicies.get(i);
      query = policy.getIndexQuery();

      if (!(policy instanceof ProductArchivePolicy)) {
        // -- Purge expired events for this policy -- //
        LOGGER.fine("[" + getName() + "] running event archive policy (" + policy.getName() + ")");
        try {
          // Get a list of those events
          List<Event> expiredEvents = productIndex.getEvents(query);

          // Loop over list of expired events and remove each one
          Iterator<Event> eventIter = expiredEvents.iterator();
          while (eventIter.hasNext()) {
            Event event = eventIter.next();

            LOGGER.info("[" + getName() + "] archiving event " + event.getEventId());
            event.log(LOGGER);

            productIndex.beginTransaction();
            try {
              removeEvent(event);

              // Notify of the event archived
              IndexerEvent notification = new IndexerEvent(this);
              notification.setSummary(null);
              notification.addIndexerChange(new IndexerChange(IndexerChange.EVENT_ARCHIVED, event, null));
              notifyListeners(notification);

              ++counts[0];
              productIndex.commitTransaction();
            } catch (Exception e) {
              LOGGER.log(Level.WARNING,
                  "[" + getName() + "] exception archiving event " + event.getEventId() + ", rolling back", e);
              productIndex.rollbackTransaction();
            }
          }
        } catch (Exception e) {
          LOGGER.log(Level.WARNING,
              "[" + getName() + "] exception running event archive policy (" + policy.getName() + ") ", e);
        }
      }

      if (policy instanceof ProductArchivePolicy) {
        ProductArchivePolicy productPolicy = (ProductArchivePolicy) policy;

        // -- Purge expired products for this policy -- //
        LOGGER.fine("[" + getName() + "] running product archive policy (" + policy.getName() + ")");

        try {
          // Get a list of those products
          List<ProductSummary> expiredProducts;

          if (productPolicy.isOnlyUnassociated()) {
            expiredProducts = productIndex.getUnassociatedProducts(query);
          } else {
            expiredProducts = productIndex.getProducts(query);
          }

          // Loop over list of expired products and remove each one
          Iterator<ProductSummary> productIter = expiredProducts.iterator();
          while (productIter.hasNext()) {
            ProductSummary product = productIter.next();

            LOGGER.info("[" + getName() + "] archiving product " + product.getId().toString());
            productIndex.beginTransaction();
            try {
              removeSummary(product);

              // Notify of the product archived
              IndexerEvent notification = new IndexerEvent(this);
              notification.setSummary(product);
              notification.addIndexerChange(new IndexerChange(IndexerChange.PRODUCT_ARCHIVED, null, null));
              notifyListeners(notification);

              ++counts[1];
              productIndex.commitTransaction();
            } catch (Exception e) {
              LOGGER.log(Level.WARNING,
                  "[" + getName() + "] exception archiving event " + product.getId().toString() + ", rolling back", e);
              productIndex.rollbackTransaction();
            }
          }
        } catch (Exception e) {
          LOGGER.log(Level.WARNING,
              "[" + getName() + "] exception running product archive policy (" + policy.getName() + ")", e);
        }

      }
    }

    return counts;
  }

  /**
   * Removes the given event from the Indexer ProductIndex and ProductStorage.
   *
   * @param event event to remove
   * @throws Exception If errors occur while removing the event
   */
  protected synchronized void removeEvent(Event event) throws Exception {
    // Removing an "event" from storage is really just removing all its
    // associated products
    List<ProductSummary> summaries = event.getAllProductList();
    Iterator<ProductSummary> summaryIter = summaries.iterator();
    while (summaryIter.hasNext()) {
      ProductSummary summary = summaryIter.next();
      // Remove product from storage
      productStorage.removeProduct(summary.getId());
      // Remove product summary from index
      productIndex.removeProductSummary(summary);
    }

    // Remove from index
    productIndex.removeEvent(event);
  }

  /**
   * Removes the given summary from the Indexer ProductIndex and ProductStorage.
   *
   * @param summary to remove
   * @throws Exception If errors occur while removing the summary
   */
  protected synchronized void removeSummary(ProductSummary summary) throws Exception {

    Event event = getPrevEvent(summary);
    if (event != null) {
      List<ProductSummary> eventProducts = event.getAllProductList();
      if (eventProducts != null && eventProducts.size() == 1 && eventProducts.get(0).getId().equals(summary.getId())) {
        // last product for the event
        removeEvent(event);
        // product is already removed by removeEvent
        return;
      }
    }

    // Remove product from storage
    productStorage.removeProduct(summary.getId());
    // Remove product summary from index
    productIndex.removeProductSummary(summary);

    // if product was associated to event need to update index
    if (event != null) {
      // remove the product from the event
      event.removeProduct(summary);

      // update event table
      ArrayList<Event> events = new ArrayList<Event>();
      events.add(event);
      productIndex.eventsUpdated(events);
    }
  }

  /**
   * Tries to create an event based on information in the given summary. If
   * successful, the summary is associated to the newly created event. Note: The
   * summary must have been externally added to the ProductIndex before this
   * method can be called.
   *
   * A product summary must have non-null (id) source and code, (location)
   * latitude and longitude, and (time) time, in order to have the minimum
   * properties required to create a new event.
   *
   * @param summary The product summary serving as the basis for the new event.
   * @return The event that is created, added and associated or null if the given
   *         summary can not be used to create a new event.
   * @throws Exception If the ProductIndex.addEvent throws an exception or if the
   *                   ProductIndex.addAssociation throws an exception. This may
   *                   happen if this method is called before the summary is added
   *                   to the ProductIndex.
   */
  private synchronized Event createEvent(ProductSummary summary) throws Exception {
    if (Event.productHasOriginProperties(summary)) {
      Event event = productIndex.addEvent(new Event());
      return productIndex.addAssociation(event, summary);
    } else {
      return null;
    }
  }

  /**
   * Search for products in this index.
   *
   * @param request the search request.
   * @return the search response.
   * @throws Exception if error occurs
   */
  public synchronized SearchResponse search(SearchRequest request) throws Exception {
    SearchResponse response = new SearchResponse();

    // Execute each query
    Iterator<SearchQuery> iter = request.getQueries().iterator();
    while (iter.hasNext()) {
      SearchQuery query = iter.next();

      if (query instanceof EventsSummaryQuery) {
        List<EventSummary> eventSummaries = new LinkedList<EventSummary>();
        Iterator<Event> events = productIndex.getEvents(query.getProductIndexQuery()).iterator();
        // convert events to event summaries
        while (events.hasNext()) {
          Event event = events.next();
          eventSummaries.add(event.getEventSummary());
        }
        ((EventsSummaryQuery) query).setResult(eventSummaries);
      }

      else if (query instanceof EventDetailQuery) {
        List<Event> events = productIndex.getEvents(query.getProductIndexQuery());
        ((EventDetailQuery) query).setResult(events);
      }

      else if (query instanceof ProductsSummaryQuery) {
        List<ProductSummary> products = productIndex.getProducts(query.getProductIndexQuery());
        ((ProductsSummaryQuery) query).setResult(products);
      }

      else if (query instanceof ProductDetailQuery) {
        List<Product> products = new LinkedList<Product>();
        Iterator<ProductId> ids = query.getProductIndexQuery().getProductIds().iterator();
        // fetch products from storage
        while (ids.hasNext()) {
          ProductId id = ids.next();
          Product product = productStorage.getProduct(id);
          if (product != null) {
            products.add(product);
          }
        }
        ((ProductDetailQuery) query).setResult(products);
      }

      response.addResult(query);
    }

    return response;
  }

  /** @return disableArchive */
  public boolean isDisableArchive() {
    return disableArchive;
  }

  /** @param disableArchive boolean to set */
  public void setDisableArchive(boolean disableArchive) {
    this.disableArchive = disableArchive;
  }

  /**
   * @return the archiveInterval
   */
  public long getArchiveInterval() {
    return archiveInterval;
  }

  /**
   * @param archiveInterval the archiveInterval to set
   */
  public void setArchiveInterval(long archiveInterval) {
    this.archiveInterval = archiveInterval;
  }

  /**
   * @return the archivePolicies
   */
  public List<ArchivePolicy> getArchivePolicies() {
    return archivePolicies;
  }

}