ExternalNotificationListener.java

/*
 * ExternalNotificationListener
 */
package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.XmlUtils;

import java.io.File;

import java.net.URI;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * An external process that is called when new products arrive.
 *
 * The ExternalNotificationListener implements the Configurable interface and
 * can use the following configuration parameters:
 *
 * <dl>
 * <dt>command</dt>
 * <dd>(Required) The command to execute. This must be an executable command and
 * may include arguments. Any product specific arguments are appended at the end
 * of command.</dd>
 *
 * <dt>storage</dt>
 * <dd>(Required) A directory used to store all products. Each product is
 * extracted into a separate directory within this directory and is referenced
 * by the --directory=/path/to/directory argument when command is executed.</dd>
 * </dl>
 *
 */
public class ExternalNotificationListener extends DefaultNotificationListener {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(ExternalNotificationListener.class.getName());

  /** Configuration parameter for storage directory product. */
  public static final String STORAGE_NAME_PROPERTY = "storage";

  /** Configuration parameter for command. */
  public static final String COMMAND_PROPERTY = "command";

  /** Argument used to pass signature to external process. */
  public static final String SIGNATURE_ARGUMENT = "--signature=";

  private static final String STORAGE_DIRECTORY_PROPERTY = "storageDirectory";

  /** Where products are stored in extracted form. */
  private FileProductStorage storage;

  /** Command that is executed after a product is stored. */
  private String command;

  /**
   * Construct a new ExternalNotificationListener.
   *
   * The listener must be configured with a FileProductStorage and command to
   * function.
   */
  public ExternalNotificationListener() {
  }

  /**
   * Configure an ExternalNotificationListener using a Config object.
   *
   * @param config the config containing a
   */
  public void configure(Config config) throws Exception {
    super.configure(config);

    command = config.getProperty(COMMAND_PROPERTY);
    if (command == null) {
      throw new ConfigurationException("[" + getName() + "] 'command' is a required configuration property");
    }
    LOGGER.config("[" + getName() + "] command is '" + command + "'");

    // storage references an object in the global configuration
    String storageName = config.getProperty(STORAGE_NAME_PROPERTY);
    String storageDirectory = config.getProperty(STORAGE_DIRECTORY_PROPERTY);
    if (storageName == null && storageDirectory == null) {
      throw new ConfigurationException("[" + getName() + "] 'storage' is a required configuration property.");
    }
    if (storageName != null) {
      LOGGER.config("[" + getName() + "] loading FileProductStorage '" + storageName + "'");
      storage = (FileProductStorage) Config.getConfig().getObject(storageName);
      if (storage == null) {
        throw new ConfigurationException("[" + getName() + "] unable to load FileProductStorage '" + storageName + "'");
      }
    } else {
      LOGGER.config("[" + getName() + "] using storage directory '" + storageDirectory + "'");
      storage = new FileProductStorage(new File(storageDirectory));
    }
  }

  /**
   * Called when client is shutting down.
   */
  public void shutdown() throws Exception {
    super.shutdown();
    // maybe make current process a member and kill process?
    // or find way of detaching so client process can exit but product
    // process can complete?
    storage.shutdown();
  }

  /**
   * Called after client has been configured and should begin processing.
   */
  public void startup() throws Exception {
    // no background threads to start or objects to create
    storage.startup();
    super.startup();
  }

  /**
   * Append product arguments to the base command.
   *
   * @param product the product used to generate arguments.
   * @return command as a string.
   * @throws Exception if error occurs
   */
  public String getProductCommand(final Product product) throws Exception {
    StringBuffer buf = new StringBuffer(command);

    ProductId id = product.getId();

    // get path to product in storage, should be a directory
    File productDirectory = storage.getProductFile(id);

    buf.append(" ").append(CLIProductBuilder.DIRECTORY_ARGUMENT).append(productDirectory.getCanonicalPath());

    buf.append(" ").append(CLIProductBuilder.TYPE_ARGUMENT).append(id.getType());
    buf.append(" ").append(CLIProductBuilder.CODE_ARGUMENT).append(id.getCode());
    buf.append(" ").append(CLIProductBuilder.SOURCE_ARGUMENT).append(id.getSource());
    buf.append(" ").append(CLIProductBuilder.UPDATE_TIME_ARGUMENT).append(XmlUtils.formatDate(id.getUpdateTime()));
    buf.append(" ").append(CLIProductBuilder.STATUS_ARGUMENT).append(product.getStatus());
    if (product.isDeleted()) {
      buf.append(" ").append(CLIProductBuilder.DELETE_ARGUMENT);
    }

    Map<String, String> props = product.getProperties();
    Iterator<String> iter = props.keySet().iterator();
    while (iter.hasNext()) {
      String name = iter.next();
      buf.append(" \"").append(CLIProductBuilder.PROPERTY_ARGUMENT).append(name).append("=")
          .append(props.get(name).replace("\"", "\\\"")).append("\"");
    }

    Map<String, List<URI>> links = product.getLinks();
    iter = links.keySet().iterator();
    while (iter.hasNext()) {
      String relation = iter.next();
      Iterator<URI> iter2 = links.get(relation).iterator();
      while (iter2.hasNext()) {
        buf.append(" ").append(CLIProductBuilder.LINK_ARGUMENT).append(relation).append("=")
            .append(iter2.next().toString());
      }
    }

    Content content = product.getContents().get("");
    if (content != null) {
      buf.append(" ").append(CLIProductBuilder.CONTENT_ARGUMENT);
      buf.append(" ").append(CLIProductBuilder.CONTENT_TYPE_ARGUMENT).append(content.getContentType());
    }

    if (product.getSignature() != null) {
      buf.append(" ").append(SIGNATURE_ARGUMENT).append(product.getSignature());
    }

    return buf.toString();
  }

  /**
   * Split a command string into a command array.
   *
   * This version uses a StringTokenizer to split arguments. Quoted arguments are
   * supported (single or double), with quotes removed before passing to runtime.
   * Double quoting arguments will preserve quotes when passing to runtime.
   *
   * @param command command to run.
   * @return Array of arguments suitable for passing to Runtime.exec(String[]).
   */
  protected static String[] splitCommand(final String command) {
    List<String> arguments = new LinkedList<String>();
    String currentArgument = null;

    // use a tokenizer because that's how Runtime.exec does it currently...
    StringTokenizer tokens = new StringTokenizer(command);
    while (tokens.hasMoreTokens()) {
      String token = tokens.nextToken();

      if (currentArgument == null) {
        currentArgument = token;
      } else {
        // continuing previous argument, that was split on whitespace
        currentArgument = currentArgument + " " + token;
      }

      if (currentArgument.startsWith("\"")) {
        // double quoted argument
        if (currentArgument.endsWith("\"")) {
          // that has balanced quotes
          // remove quotes and add argument
          currentArgument = currentArgument.substring(1, currentArgument.length() - 1);
        } else {
          // unbalanced quotes, keep going
          continue;
        }
      } else if (currentArgument.startsWith("'")) {
        // single quoted argument
        if (currentArgument.endsWith("'")) {
          // that has balanced quotes
          // remove quotes and add argument
          currentArgument = currentArgument.substring(1, currentArgument.length() - 1);
        } else {
          // unbalanced quotes, keep going
          continue;
        }
      }

      arguments.add(currentArgument);
      currentArgument = null;
    }

    if (currentArgument != null) {
      // weird, but add argument anyways
      arguments.add(currentArgument);
    }

    return arguments.toArray(new String[] {});
  }

  /**
   * Call the external process for this product.
   *
   * @param product Product
   * @throws Exception if error occurs
   */
  public void onProduct(final Product product) throws Exception {
    // store product
    try {
      storage.storeProduct(product);
    } catch (ProductAlreadyInStorageException e) {
      LOGGER.info("[" + getName() + "] product already in storage " + product.getId().toString());
    }

    // now run command
    String productCommand = null;
    Process process = null;
    int exitValue = -1;

    try {
      productCommand = getProductCommand(product);
      LOGGER.info("[" + getName() + "] running command " + productCommand);
      process = Runtime.getRuntime().exec(productCommand);

      // inline product content, may or may not be null
      Content content = product.getContents().get("");
      if (content != null) {
        StreamUtils.transferStream(content.getInputStream(), process.getOutputStream());
      } else {
        // need to close process stdin either way
        StreamUtils.closeStream(process.getOutputStream());
      }

      // maybe log/capture process input/error streams
      // or switch to "Command"

      exitValue = process.waitFor();
    } catch (Exception e) {
      if (process != null) {
        // make sure to kill zombies
        process.destroy();
      }

      // signal that process did not exit normally
      exitValue = -1;

      // give subclasses chance to handle exception
      commandException(product, productCommand, e);
    }

    // if process exited normally
    if (exitValue != -1) {
      // give subclasses chance to handle exitValue, which may be non-zero
      commandComplete(product, productCommand, exitValue);
    }
  }

  /**
   * Called when the command finishes executing normally.
   *
   * This implementation throws a NotificationListenerException if the exitValue
   * is non-zero.
   *
   * @param product   the product being processed.
   * @param command   the generated command, as a string.
   * @param exitValue the exit status of the process.
   * @throws Exception When re-notification should occur, based on maxTries, or
   *                   none if done.
   */
  public void commandComplete(final Product product, final String command, final int exitValue) throws Exception {
    LOGGER.info("[" + getName() + "] command '" + command + "' exited with status '" + exitValue + "'");

    // send heartbeat info
    HeartbeatListener.sendHeartbeatMessage(getName(), "command", command);
    HeartbeatListener.sendHeartbeatMessage(getName(), "exit value", Integer.toString(exitValue));

    if (exitValue != 0) {
      throw new NotificationListenerException("[" + getName() + "] command exited with status " + exitValue);
    }
  }

  /**
   * Called when an exception occurs while running command.
   *
   * This implementation throws a NotificationListenerException with exception as
   * the cause.
   *
   * @param product        product being processed
   * @param productCommand command that was built
   * @param exception      exception that was thrown during execution. This will
   *                       be an InterruptedException if the process timed out.
   * @throws Exception When re-notification should occur, based on maxTries, or
   *                   none if done.
   */
  public void commandException(final Product product, final String productCommand, final Exception exception)
      throws Exception {
    if (exception instanceof InterruptedException) {
      LOGGER.warning("[" + getName() + "] command '" + productCommand + "' timed out");
    } else {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception running command '" + productCommand + "'", exception);
    }

    // send heartbeat info
    HeartbeatListener.sendHeartbeatMessage(getName(), "exception", productCommand);
    HeartbeatListener.sendHeartbeatMessage(getName(), "exception class", exception.getClass().getName());

    throw new NotificationListenerException(exception);
  }

  /**
   * @return the storage
   */
  public FileProductStorage getStorage() {
    return storage;
  }

  /**
   * @param storage the storage to set
   */
  public void setStorage(FileProductStorage storage) {
    this.storage = storage;
  }

  /**
   * @return the command
   */
  public String getCommand() {
    return command;
  }

  /**
   * @param command the command to set
   */
  public void setCommand(String command) {
    this.command = command;
  }

}