DefaultIndexerListener.java

package gov.usgs.earthquake.indexer;

import gov.usgs.earthquake.indexer.IndexerChange.IndexerChangeType;
import gov.usgs.earthquake.product.AbstractListener;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.util.CompareUtil;
import gov.usgs.util.Config;

import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;

/**
 * DefaultIndexerListener provides a starting point from which all
 * IndexerListeners may extend.
 *
 * As a child-class of the AbstractListener, this may be configured with all of
 * the parent parameters and also accepts the following:
 *
 * <dl>
 * <dt>command</dt>
 * <dd>(Required) The command to execute. This must be an executable command and
 * may include arguments. Any product-specific arguments are appended at the end
 * of command.</dd>
 *
 * <dt>storage</dt>
 * <dd>(Required) A directory used to store all products. Each product is
 * extracted into a separate directory within this directory and is referenced
 * by the --directory=/path/to/directory argument when command is executed.</dd>
 *
 * <dt>processUnassociated</dt>
 * <dd>(Optional, Default = false) Whether or not to process unassociated
 * products. Valid values are "true" and "false".</dd>
 *
 * <dt>processPreferredOnly</dt>
 * <dd>(Optional, Default = false) Whether or not to process only preferred
 * products of the type accepted by this listener. Valid values are "true" and
 * "false".</dd>
 *
 * <dt>ignoreArchive</dt>
 * <dd>(Optional, Default = false) Whether or not to ignore EVENT_ARCHIVED and
 * PRODUCT_ARCHIVED indexer events. Value values are "true" and "false".</dd>
 *
 * </dl>
 */
public class DefaultIndexerListener extends AbstractListener implements IndexerListener {
  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(DefaultIndexerListener.class.getName());

  /** Property for process preferred only */
  public static final String PROCESS_PREFERRED_ONLY_PROPERTY = "processPreferredOnly";
  /** Default state of process preferred only */
  public static final String PROCESS_PREFERRED_ONLY_DEFAULT = "false";

  /** Property for process unassociated */
  public static final String PROCESS_UNASSOCIATED_PROPERTY = "processUnassociated";
  /** Default state of process unassociated */
  public static final String PROCESS_UNASSOCIATED_DEFAULT = "true";

  /** Property for process only when event change */
  public static final String PROCESS_ONLY_WHEN_EVENT_CHANGE_PROPERTY = "processOnlyWhenEventChanged";
  /** Default state of process only when event change */
  public static final String PROCESS_ONLY_WHEN_EVENT_CHANGE_DEFAULT = "false";

  /** Property for Ignore archive */
  public static final String IGNORE_ARCHIVE_PROPERTY = "ignoreArchive";
  /** Default state of ignore archive */
  public static final String IGNORE_ARCHIVE_DEFAULT = "true";

  /** Whether or not to process only preferred products. */
  private boolean processOnlyPreferredProducts = false;

  /** Whether or not to process unassociated products. */
  private boolean processUnassociatedProducts = true;

  /**
   * Whether or not to process updates that don't change preferred event
   * parameters.
   */
  private boolean processOnlyWhenEventChanged = false;

  /** Whether or not to process archive events. */
  private boolean ignoreArchive = false;

  @Override
  public void onIndexerEvent(IndexerEvent event) throws Exception {
    StringBuffer buf = new StringBuffer();
    Iterator<IndexerChange> changes = event.getIndexerChanges().iterator();
    while (changes.hasNext()) {
      IndexerChange change = changes.next();
      buf.append("\n").append(change.getType().toString()).append(" ");
      if (change.getOriginalEvent() == null) {
        buf.append("null");
      } else {
        buf.append(change.getOriginalEvent().getEventId());
      }
      buf.append(" => ");
      if (change.getNewEvent() == null) {
        buf.append("null");
      } else {
        buf.append(change.getNewEvent().getEventId());
      }
    }
    LOGGER.info(buf.toString());
  }

  /**
   * @param change the indexer event that has occurred
   * @return whether this external indexer listener handles this product type
   * @throws Exception if error occurs
   */
  public boolean accept(IndexerEvent change) throws Exception {
    String productType = null;

    if (change.getSummary() != null) {
      ProductId productId = change.getSummary().getId();

      productType = productId.getType();

      // use default notification listener first
      if (!super.accept(productId)) {
        return false;
      }
    }

    List<Event> events = change.getEvents();
    if (!processUnassociatedProducts && events.size() == 0) {
      LOGGER.fine("[" + getName() + "] product is unassociated");
      return false;
    }

    if (processOnlyPreferredProducts && events.size() > 0) {
      // check if preferred for any event
      boolean isPreferred = false;

      // can only be a preferred product if a summary associated
      if (productType != null) {
        Iterator<Event> iter = events.iterator();
        while (iter.hasNext()) {
          Event event = iter.next();
          ProductSummary preferred = event.getPreferredProduct(productType);
          if (preferred != null && preferred.getId().equals(change.getSummary().getId())) {
            // it is the most preferred product for this event
            isPreferred = true;
            break;
          }
        }
      }

      if (!isPreferred) {
        LOGGER.fine("[" + getName() + "] product is not preferred in any event");
        return false;
      }
    }

    // accept by default
    return true;
  }

  /**
   * Returns a boolean based on if the preferred event params have changed Returns
   * false if change is an archive indexer
   *
   * @param event  an IndexerEvent
   * @param change and IndexerChange
   * @return boolean
   * @throws Exception if error occurs
   */
  public boolean accept(IndexerEvent event, IndexerChange change) throws Exception {
    // check whether this is an archive indexer change
    if (ignoreArchive && (change.getType() == IndexerChangeType.PRODUCT_ARCHIVED
        || change.getType() == IndexerChangeType.EVENT_ARCHIVED)) {
      return false;
    }

    // see if preferred event parameters have changed
    if (processOnlyWhenEventChanged) {
      Event originalEvent = change.getOriginalEvent();
      Event newEvent = change.getNewEvent();
      if (originalEvent != null && newEvent != null) {
        EventSummary originalEventSummary = originalEvent.getEventSummary();
        EventSummary newEventSummary = newEvent.getEventSummary();
        if (CompareUtil.nullSafeCompare(originalEventSummary.getMagnitude(), newEventSummary.getMagnitude()) != 0) {
          // magnitude changed
        } else if (CompareUtil.nullSafeCompare(originalEventSummary.getLatitude(),
            newEventSummary.getLatitude()) != 0) {
          // latitude changed
        } else if (CompareUtil.nullSafeCompare(originalEventSummary.getLongitude(),
            newEventSummary.getLongitude()) != 0) {
          // longitude changed
        } else if (CompareUtil.nullSafeCompare(originalEventSummary.getDepth(), newEventSummary.getDepth()) != 0) {
          // depth changed
        } else if (CompareUtil.nullSafeCompare(originalEventSummary.getTime(), newEventSummary.getTime()) != 0) {
          // time changed
        } else if (originalEventSummary.isDeleted() != newEventSummary.isDeleted()) {
          // status changed
        } else {
          // preferred event parameters haven't changed
          return false;
        }
      }
    }

    // accept changes by default
    return true;
  }

  public void configure(Config config) throws Exception {
    super.configure(config);

    processOnlyPreferredProducts = Boolean
        .valueOf(config.getProperty(PROCESS_PREFERRED_ONLY_PROPERTY, PROCESS_PREFERRED_ONLY_DEFAULT));
    LOGGER.config("[" + getName() + "] process only preferred products = " + processOnlyPreferredProducts);

    processUnassociatedProducts = Boolean
        .valueOf(config.getProperty(PROCESS_UNASSOCIATED_PROPERTY, PROCESS_UNASSOCIATED_DEFAULT));
    LOGGER.config("[" + getName() + "] process unassociated products = " + processUnassociatedProducts);

    processOnlyWhenEventChanged = Boolean
        .valueOf(config.getProperty(PROCESS_ONLY_WHEN_EVENT_CHANGE_PROPERTY, PROCESS_ONLY_WHEN_EVENT_CHANGE_DEFAULT));
    LOGGER.config("[" + getName() + "] process only when event changed = " + processOnlyWhenEventChanged);

    ignoreArchive = Boolean.valueOf(config.getProperty(IGNORE_ARCHIVE_PROPERTY, IGNORE_ARCHIVE_DEFAULT));
    LOGGER.config("[" + getName() + "] ignore archive changes = " + ignoreArchive);
  }

  /**
   * @return whether only preferred products are processed
   */
  public boolean getProcessOnlyPreferredProducts() {
    return processOnlyPreferredProducts;
  }

  /**
   * @param processOnlyPreferredProducts whether to process ony preferred products
   */
  public void setProcessOnlyPreferredProducts(final boolean processOnlyPreferredProducts) {
    this.processOnlyPreferredProducts = processOnlyPreferredProducts;
  }

  /** @param processUnassociatedProducts to set */
  public void setProcessUnassociatedProducts(final boolean processUnassociatedProducts) {
    this.processUnassociatedProducts = processUnassociatedProducts;
  }

  /** @return boolean processUnassociatedProducts */
  public boolean getProcessUnassociatedProducts() {
    return processUnassociatedProducts;
  }

  /** @return boolean processOnlyWhenEventChanged */
  public boolean isProcessOnlyWhenEventChanged() {
    return processOnlyWhenEventChanged;
  }

  /** @param processOnlyWhenEventChanged to set */
  public void setProcessOnlyWhenEventChanged(boolean processOnlyWhenEventChanged) {
    this.processOnlyWhenEventChanged = processOnlyWhenEventChanged;
  }

  /** @return ignoreArchive */
  public boolean isIgnoreArchive() {
    return ignoreArchive;
  }

  /** @param ignoreArchive to set */
  public void setIgnoreArchive(boolean ignoreArchive) {
    this.ignoreArchive = ignoreArchive;
  }
}