EIDSInputWedge.java

package gov.usgs.earthquake.eids;

import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.xml.bind.JAXBException;

import org.xml.sax.SAXParseException;

import gov.usgs.earthquake.distribution.Bootstrappable;
import gov.usgs.earthquake.distribution.CLIProductBuilder;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.DebugProductSender;
import gov.usgs.earthquake.distribution.ProductBuilder;
import gov.usgs.earthquake.distribution.ProductCreator;
import gov.usgs.earthquake.distribution.ProductSender;
import gov.usgs.earthquake.distribution.QuakemlProductCreator;
import gov.usgs.earthquake.distribution.SocketProductSender;
import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.quakeml.FileToQuakemlConverter;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.FileUtils;
import gov.usgs.util.Poller;
import gov.usgs.util.StringUtils;
import gov.usgs.util.CryptoUtils.Version;

/**
 * Read messages from files or a poll directory, and push products into PDL.
 *
 * This is supports EIDS/QDDS style polling. The input messages are converted to
 * Quakeml using the FileToQuakemlConverter interface, then sent as Quakeml
 * based products.
 *
 * Much of the configuration can be supplied using either a configuration file,
 * or command line arguments.
 */
@Deprecated
public class EIDSInputWedge extends ProductBuilder implements Runnable, Bootstrappable {

  /**
   * Global reference to arguments array, when EIDSInputWedge is run via
   * Bootstrap.
   */
  public static String[] ARGS = null;

  private static final Logger LOGGER = Logger.getLogger(EIDSInputWedge.class.getName());

  /** Property for parser class */
  public static final String PARSER_CLASS_PROPERTY = "parserClass";
  /** Default parser class */
  public static final String DEFAULT_PARSER_CLASS = "gov.usgs.earthquake.event.QuakemlToQuakemlConverter";

  /** Property for polldir */
  public static final String POLLDIR_PROPERTY = "directory";
  /** Default polldir */
  public static final String DEFAULT_POLLDIR = "polldir";
  private File polldir = new File(DEFAULT_POLLDIR);

  /** Property for storage directory */
  public static final String STORAGEDIR_PROPERTY = "oldinputdir";
  /** Default storage directory */
  public static final String DEFAULT_STORAGEDIR = "oldinput";
  private File storagedir = new File(DEFAULT_STORAGEDIR);

  /** Property for error directory */
  public static final String ERRORDIR_PROPERTY = "errordir";
  /** Default error directory */
  public static final String DEFAULT_ERRORDIR = "errordir";
  private File errordir = new File(DEFAULT_ERRORDIR);

  /** Property for validate */
  public static final String VALIDATE_PROPERTY = "validate";
  /** Default status of validate */
  public static final String DEFAULT_VALIDATE = "false";

  /** Property for sendOriginWhenPhasesExist */
  public static final String SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY = "sendOriginWhenPhasesExist";
  /** Default status of sendOrigin... */
  public static final String DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST = "false";

  /** Property for sendMechanismWhenPhasesExist */
  public static final String SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY = "sendMechanismWhenPhasesExist";
  /** Default status of sendMechanism... */
  public static final String DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST = "false";

  /** Convert parsed quakeml to a product. */
  private ProductCreator productCreator = new QuakemlProductCreator();

  /** Whether created products should be converted to internal types. */
  public static final String CREATE_INTERNAL_PRODUCTS_PROPERTY = "createInternalProducts";
  /** Default status of CREATE_INTERNAL_PRODUCTS */
  public static final String DEFAULT_CREATE_INTERNAL_PRODUCTS = "false";
  private boolean createInternalProducts = false;

  /** Whether created products should be converted to scenario types. */
  public static final String CREATE_SCENARIO_PRODUCTS_PROPERTY = "createScenarioProducts";
  /** Default status of CREATE_SCENARIO_PRODUCTS */
  public static final String DEFAULT_CREATE_SCENARIO_PRODUCTS = "false";
  private boolean createScenarioProducts = false;

  /** Directory polling object. */
  private Poller directoryPoller;

  /** Poll interval property */
  public static final String POLLINTERVAL_PROPERTY = "interval";
  /** Default interval for POLLINTERVAL */
  public static final String DEFAULT_POLLINTERVAL = "1000";
  private long pollInterval = 1000L;

  /** Property for pollCarefully */
  public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
  /** Default status of POLL_CAREFULLY */
  public static final String DEFAULT_POLL_CAREFULLY = "false";
  private boolean pollCarefully = false;

  /** Property for doBufferFix */
  public static final String DO_BUFFER_FIX_PROPERTY = "doBufferFix";
  /** Default status of DO_BUFFER_FIX property */
  public static final String DEFAULT_DO_BUFFER_FIX = "true";
  private boolean doBufferFix = true;

  private Thread pollThread = null;

  /**
   * Empty constructor
   *
   * @throws Exception if error occurs
   */
  public EIDSInputWedge() throws Exception {
  }

  /**
   * Gets products from file and iterates through each product During iteration,
   * sets type to internal/scenario if createInternalProducts or
   * createScenarioProducts is true. Attaches Content files to product, Sends
   * product
   *
   * @param file          File containing products
   * @param attachContent Map of String and Content
   * @return Map of product IDs and sent products
   * @throws Exception if error occurs
   */
  public Map<ProductId, Map<ProductSender, Exception>> parseAndSend(final File file,
      final Map<String, Content> attachContent) throws Exception {

    Map<ProductId, Map<ProductSender, Exception>> sendProductResults = new HashMap<ProductId, Map<ProductSender, Exception>>();

    List<Product> products = productCreator.getProducts(file);

    Iterator<Product> iter = products.iterator();
    while (iter.hasNext()) {
      Product product = iter.next();
      ProductId id = product.getId();

      if (createInternalProducts) {
        id.setType("internal-" + id.getType());
      }
      if (createScenarioProducts) {
        id.setType(id.getType() + "-scenario");
      }

      // attach files to generated product
      if (attachContent != null && attachContent.size() > 0) {
        if (products.size() > 1) {
          throw new Exception("Trying to attach files," + " generated more than 1 product");
        }
        product.getContents().putAll(attachContent);
      }

      // send product, save any exceptions
      sendProductResults.put(product.getId(), sendProduct(product));
    }

    return sendProductResults;
  }

  /**
   * Parses given file, looking for send exceptions and reports statistics
   *
   * @param file to parse and look for errors
   */
  public void onFile(File file) {
    Date inputtime = new Date();
    LOGGER.info("Reading file " + file.getName());

    try {
      Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(file, null);

      // check how send went
      int numSenders = getProductSenders().size();
      int total = sendExceptions.size();
      int successful = 0;
      int partialFailures = 0;
      int totalFailures = 0;

      Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
      while (sentIds.hasNext()) {
        ProductId sentId = sentIds.next();
        if (sendExceptions.get(sentId).size() == numSenders) {
          totalFailures++;
          LOGGER.severe("Total failure sending product " + sentId.toString());
        } else {
          // output built product id because it was sent at least once
          System.out.println(sentId.toString());

          if (sendExceptions.get(sentId).size() == 0) {
            successful++;
          } else {
            partialFailures++;
            LOGGER.warning("Partial failure sending product " + sentId.toString());
          }
        }
      }

      LOGGER.info("generated " + total + " products: " + successful + " sent, " + partialFailures + " partially sent, "
          + totalFailures + " failed to send");

      // notify of failures using exit code
      if (totalFailures > 0) {
        // consider this failure, event if some products sent
        throw new Exception();
      }

      // succeeded, at least somewhat
      // move to oldinput directory
      file.renameTo(new File(storagedir, file.getName() + "_" + inputtime.getTime()));
    } catch (Exception e) {
      if (e instanceof JAXBException && ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
        SAXParseException spe = (SAXParseException) ((JAXBException) e).getLinkedException();
        LOGGER.warning(
            "Parse error: " + spe.getMessage() + "; line=" + spe.getLineNumber() + ", column=" + spe.getColumnNumber());
      }

      if (errordir != null) {
        if (!errordir.exists()) {
          errordir.mkdirs();
        }
        File errorfile = new File(errordir, file.getName() + "_" + new Date().getTime());

        // move to error directory
        try {
          LOGGER.log(Level.WARNING, "Errors processing file, moving to " + errorfile.getCanonicalPath(), e);
        } catch (Exception ignore) {
          // ignore
        }
        file.renameTo(errorfile);
      } else {
        LOGGER.warning("Error processing file " + file.getName() + ", and no error directory configured");
        FileUtils.deleteTree(file);
      }
    }
  }

  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);

    String parserClassName = config.getProperty(PARSER_CLASS_PROPERTY);
    if (parserClassName == null) {
      LOGGER.config("Using QuakemlToProductConverter");
    } else {
      Object parserObj = Class.forName(parserClassName).getConstructor().newInstance();
      if (parserObj instanceof ProductCreator) {
        productCreator = (ProductCreator) parserObj;
      } else if (parserObj instanceof FileToQuakemlConverter) {
        QuakemlProductCreator quakemlCreator = new QuakemlProductCreator(doBufferFix);
        quakemlCreator.setConverter((FileToQuakemlConverter) parserObj);
        productCreator = quakemlCreator;
      } else {
        throw new ConfigurationException("configured parser class " + parserClassName + " does not implement "
            + FileToQuakemlConverter.class.getName());
      }
      LOGGER.config("Using parser class " + parserClassName);
    }

    boolean validate = Boolean.getBoolean(config.getProperty(VALIDATE_PROPERTY, DEFAULT_VALIDATE));
    productCreator.setValidate(validate);
    LOGGER.config("Validation " + (validate ? "enabled" : "disabled"));

    boolean sendOriginWhenPhasesExist = Boolean
        .valueOf(config.getProperty(SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY, DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST));
    if (productCreator instanceof EQMessageProductCreator) {
      ((EQMessageProductCreator) productCreator).setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
    } else {
      ((QuakemlProductCreator) productCreator).setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
    }
    LOGGER.config("sendOriginWhenPhasesExist = " + sendOriginWhenPhasesExist);

    boolean sendMechanismWhenPhasesExist = Boolean.valueOf(
        config.getProperty(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY, DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST));
    if (productCreator instanceof EQMessageProductCreator) {
      if (sendMechanismWhenPhasesExist) {
        LOGGER.warning(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY + " is not supported for EQMessageProductCreator");
      }
    } else {
      ((QuakemlProductCreator) productCreator).setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
    }
    LOGGER.config("sendMechanismWhenPhasesExist = " + sendMechanismWhenPhasesExist);

    polldir = new File(config.getProperty(POLLDIR_PROPERTY, DEFAULT_POLLDIR));
    LOGGER.config("Using poll directory " + polldir.getCanonicalPath());

    pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY, DEFAULT_POLLINTERVAL));
    LOGGER.config("Using poll interval " + pollInterval + "ms");

    pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY, DEFAULT_POLL_CAREFULLY));
    LOGGER.config("Poll carefully = " + pollCarefully);

    storagedir = new File(config.getProperty(STORAGEDIR_PROPERTY, DEFAULT_STORAGEDIR));
    LOGGER.config("Using oldinput directory " + storagedir.getCanonicalPath());

    errordir = new File(config.getProperty(ERRORDIR_PROPERTY, DEFAULT_ERRORDIR));
    LOGGER.config("Using error directory " + errordir.getCanonicalPath());

    createInternalProducts = Boolean
        .valueOf(config.getProperty(CREATE_INTERNAL_PRODUCTS_PROPERTY, DEFAULT_CREATE_INTERNAL_PRODUCTS));
    LOGGER.config("createInternalProducts = " + createInternalProducts);

    createScenarioProducts = Boolean
        .valueOf(config.getProperty(CREATE_SCENARIO_PRODUCTS_PROPERTY, DEFAULT_CREATE_SCENARIO_PRODUCTS));
    LOGGER.config("createScenarioProducts = " + createScenarioProducts);

    doBufferFix = Boolean.valueOf(config.getProperty(DO_BUFFER_FIX_PROPERTY, DEFAULT_DO_BUFFER_FIX));
    LOGGER.config("doBufferFix = " + doBufferFix);
  }

  @Override
  public void shutdown() throws Exception {
    if (pollThread != null) {
      pollThread.interrupt();
      pollThread = null;
    }

    super.shutdown();
  }

  @Override
  public void startup() throws Exception {
    super.startup();

    if (pollThread == null) {
      pollThread = new Thread(this);
      pollThread.setName("poll thread");
      pollThread.start();
    }
  }

  /** @return polldir */
  public File getPolldir() {
    return polldir;
  }

  /** @param polldir File to set */
  public void setPolldir(File polldir) {
    this.polldir = polldir;
  }

  /** @return storagedir */
  public File getStoragedir() {
    return storagedir;
  }

  /** @param storagedir File to set */
  public void setStoragedir(File storagedir) {
    this.storagedir = storagedir;
  }

  /** @return errordir */
  public File getErrordir() {
    return errordir;
  }

  /** @param errordir File to send */
  public void setErrordir(File errordir) {
    this.errordir = errordir;
  }

  /** @return productCreator */
  public ProductCreator getProductCreator() {
    return productCreator;
  }

  /** @param productCreator to set */
  public void setProductCreator(ProductCreator productCreator) {
    this.productCreator = productCreator;
  }

  /** @return directoryPoller */
  public Poller getDirectoryPoller() {
    return directoryPoller;
  }

  /** @param directoryPoller to set */
  public void setDirectoryPoller(Poller directoryPoller) {
    this.directoryPoller = directoryPoller;
  }

  /** @return pollInterval long */
  public long getPollInterval() {
    return pollInterval;
  }

  /** @param pollInterval long to set */
  public void setPollInterval(long pollInterval) {
    this.pollInterval = pollInterval;
  }

  /** @return pollCarefully boolean */
  public boolean isPollCarefully() {
    return pollCarefully;
  }

  /** @param pollCarefully boolean to set */
  public void setPollCarefully(boolean pollCarefully) {
    this.pollCarefully = pollCarefully;
  }

  /**
   * @return the createInternalProducts
   */
  public boolean isCreateInternalProducts() {
    return createInternalProducts;
  }

  /**
   * @param createInternalProducts the createInternalProducts to set
   */
  public void setCreateInternalProducts(boolean createInternalProducts) {
    this.createInternalProducts = createInternalProducts;
  }

  /**
   * @return the createScenarioProducts
   */
  public boolean isCreateScenarioProducts() {
    return createScenarioProducts;
  }

  /**
   * @param createScenarioProducts the createScenarioProducts to set
   */
  public void setCreateScenarioProducts(boolean createScenarioProducts) {
    this.createScenarioProducts = createScenarioProducts;
  }

  /**
   * Parses a string of servers into SocketProductSenders, all put into a list of
   * product senders
   *
   * @param servers        String of servers, split by commas
   * @param connectTimeout int timeout
   * @param binaryFormat   boolean if binary format
   * @param enableDeflate  boolean if Deflate should be enabled
   * @return List of product senders
   */
  public static List<ProductSender> parseServers(final String servers, final Integer connectTimeout,
      final boolean binaryFormat, final boolean enableDeflate) {
    List<ProductSender> senders = new ArrayList<ProductSender>();

    Iterator<String> iter = StringUtils.split(servers, ",").iterator();
    while (iter.hasNext()) {
      String server = iter.next();
      String[] parts = server.split(":");
      SocketProductSender sender = new SocketProductSender(parts[0], Integer.parseInt(parts[1]), connectTimeout);
      sender.setBinaryFormat(binaryFormat);
      sender.setEnableDeflate(enableDeflate);
      senders.add(sender);
    }

    return senders;
  }

  /** Argument for help */
  public static final String HELP_ARGUMENT = "--help";
  /** Argument for poll */
  public static final String POLL_ARGUMENT = "--poll";

  /** Argument for poleCarefully */
  public static final String POLL_CAREFULLY_ARGUMENT = "--pollCarefully";
  /** Argument for polldir */
  public static final String POLLDIR_ARGUMENT = "--polldir=";
  /** Argument for errordir */
  public static final String ERRORDIR_ARGUMENT = "--errordir=";
  /** Argument for storagedir */
  public static final String STORAGEDIR_ARGUMENT = "--oldinputdir=";
  /** Argument for poll interval */
  public static final String POLL_INTERVAL_ARGUMENT = "--pollInterval=";

  /** Argument for file */
  public static final String FILE_ARGUMENT = "--file=";

  /** Argument for parser */
  public static final String PARSER_ARGUMENT = "--parser=";
  /** Argument for validate */
  public static final String VALIDATE_ARGUMENT = "--validate";
  /** Argument for privateKey */
  public static final String PRIVATE_KEY_ARGUMENT = "--privateKey=";
  /** Argument for signatureVersion */
  public static final String SIGNATURE_VERSION_ARGUMENT = "--signatureVersion=";

  /** Argument for servers */
  public static final String SERVERS_ARGUMENT = "--servers=";
  /** Default server for server argument */
  public static final String SERVERS_DEFAULT = "prod01-pdl01.cr.usgs.gov:11235,prod02-pdl01.cr.usgs.gov:11235";
  /** Argument for connection timeout */
  public static final String CONNECT_TIMEOUT_ARGUMENT = "--connectTimeout=";
  /** Default timeout for connection */
  public static final Integer DEFAULT_CONNECT_TIMEOUT = 15000;
  /** Argument for binaryFormat */
  public static final String BINARY_FORMAT_ARGUMENT = "--binaryFormat";
  /** Argument for disableDeflate */
  public static final String DISABLE_DEFLATE_ARGUMENT = "--disableDeflate";
  /** Argument for attach */
  public static final String ATTACH_ARGUMENT = "--attach=";
  /** Argument for sending origin with phases */
  public static final String SEND_ORIGINS_WITH_PHASES = "--sendOriginWhenPhasesExist";
  /** Argument for sending mechanisms with phases */
  public static final String SEND_MECHANISMS_WITH_PHASES = "--sendMechanismWhenPhasesExist";

  /** Argument for creating internal products */
  public static final String CREATE_INTERNAL_PRODUCTS = "--internal";
  /** Argument for creating scenario products */
  public static final String CREATE_SCENARIO_PRODUCTS = "--scenario";

  /** Argument for testing */
  public static final String TEST_ARGUMENT = "--test";

  /**
   * Bootstrappable interface.
   */
  @Override
  public void run(final String[] args) throws Exception {
    // save arguments in global for access by FileToQuakemlParser objects.
    EIDSInputWedge.ARGS = args;

    boolean test = false;
    boolean help = false;
    boolean poll = false;
    boolean validate = this.productCreator.isValidate();
    boolean sendOriginWhenPhasesExist = false;
    boolean sendMechanismWhenPhasesExist = false;

    // preserve any existing settings from config file
    if (productCreator instanceof QuakemlProductCreator) {
      sendOriginWhenPhasesExist = ((QuakemlProductCreator) productCreator).isSendOriginWhenPhasesExist();
      sendMechanismWhenPhasesExist = ((QuakemlProductCreator) productCreator).isSendMechanismWhenPhasesExist();
    } else if (productCreator instanceof EQMessageProductCreator) {
      sendOriginWhenPhasesExist = ((EQMessageProductCreator) productCreator).isSendOriginWhenPhasesExist();
    }

    File file = null;
    // when sending 1 product, allow extra files to be attached.
    HashMap<String, Content> attachContent = new HashMap<String, Content>();
    Integer connectTimeout = DEFAULT_CONNECT_TIMEOUT;
    boolean binaryFormat = false;
    boolean enableDeflate = true;

    StringBuffer arguments = new StringBuffer();
    for (String arg : args) {
      arguments.append(arg).append(" ");
      if (arg.equals(HELP_ARGUMENT)) {
        help = true;
      } else if (arg.equals(POLL_ARGUMENT)) {
        poll = true;
      } else if (arg.equals(POLL_CAREFULLY_ARGUMENT)) {
        pollCarefully = true;
      } else if (arg.equals(SEND_ORIGINS_WITH_PHASES)) {
        sendOriginWhenPhasesExist = true;
      } else if (arg.equals(SEND_MECHANISMS_WITH_PHASES)) {
        sendMechanismWhenPhasesExist = true;
      } else if (arg.startsWith(POLLDIR_ARGUMENT)) {
        setPolldir(new File(arg.replace(POLLDIR_ARGUMENT, "")));
      } else if (arg.startsWith(ERRORDIR_ARGUMENT)) {
        setErrordir(new File(arg.replace(ERRORDIR_ARGUMENT, "")));
      } else if (arg.startsWith(STORAGEDIR_ARGUMENT)) {
        setStoragedir(new File(arg.replace(STORAGEDIR_ARGUMENT, "")));
      } else if (arg.startsWith(FILE_ARGUMENT)) {
        file = new File(arg.replace(FILE_ARGUMENT, ""));
      } else if (arg.startsWith(ATTACH_ARGUMENT)) {
        File attach = new File(arg.replace(ATTACH_ARGUMENT, ""));
        if (attach.isDirectory()) {
          attachContent.putAll(FileContent.getDirectoryContents(attach));
        } else {
          attachContent.put(attach.getName(), new FileContent(attach));
        }
      } else if (arg.startsWith(PARSER_ARGUMENT)) {
        Object parser = Class.forName(arg.replace(PARSER_ARGUMENT, "")).getConstructor().newInstance();
        if (parser instanceof ProductCreator) {
          setProductCreator((ProductCreator) parser);
        } else {
          QuakemlProductCreator productCreator = new QuakemlProductCreator();
          productCreator.setConverter((FileToQuakemlConverter) parser);
          setProductCreator(productCreator);
        }
      } else if (arg.startsWith(VALIDATE_ARGUMENT)) {
        validate = true;
      } else if (arg.startsWith(SERVERS_ARGUMENT)) {
        // ignore servers argument when in test mode
        if (!test) {
          getProductSenders().clear();
          getProductSenders()
              .addAll(parseServers(arg.replace(SERVERS_ARGUMENT, ""), connectTimeout, binaryFormat, enableDeflate));
        }
      } else if (arg.startsWith(TEST_ARGUMENT)) {
        test = true;
        getProductSenders().clear();
        getProductSenders().add(new DebugProductSender());
      } else if (arg.startsWith(PRIVATE_KEY_ARGUMENT)) {
        setPrivateKey(CryptoUtils
            .readOpenSSHPrivateKey(FileUtils.readFile(new File(arg.replace(PRIVATE_KEY_ARGUMENT, ""))), null));
      } else if (arg.startsWith(SIGNATURE_VERSION_ARGUMENT)) {
        setSignatureVersion(Version.fromString(arg.replace(SIGNATURE_VERSION_ARGUMENT, "")));
      } else if (arg.startsWith(CONNECT_TIMEOUT_ARGUMENT)) {
        connectTimeout = Integer.valueOf(arg.replace(CONNECT_TIMEOUT_ARGUMENT, ""));
      } else if (arg.equals(BINARY_FORMAT_ARGUMENT)) {
        binaryFormat = true;
      } else if (arg.equals(DISABLE_DEFLATE_ARGUMENT)) {
        enableDeflate = false;
      } else if (arg.startsWith(POLL_INTERVAL_ARGUMENT)) {
        setPollInterval(Long.valueOf(arg.replace(POLL_INTERVAL_ARGUMENT, "")));
      } else if (arg.equals(CREATE_INTERNAL_PRODUCTS)) {
        createInternalProducts = true;
      } else if (arg.equals(CREATE_SCENARIO_PRODUCTS)) {
        createScenarioProducts = true;
      } else if (arg.equals(CLIProductBuilder.DISABLE_PARALLEL_SEND)) {
        parallelSend = false;
      } else if (arg.startsWith(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT)) {
        parallelSendTimeout = Long.valueOf(arg.replace(CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT, ""));
      }
    }

    ProductCreator creator = getProductCreator();
    creator.setValidate(validate);
    if (creator instanceof EQMessageProductCreator) {
      ((EQMessageProductCreator) creator).setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
    } else if (creator instanceof QuakemlProductCreator) {
      QuakemlProductCreator quakemlCreator = ((QuakemlProductCreator) creator);
      quakemlCreator.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
      quakemlCreator.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);
    }

    if (
    // want usage, or didn't provide arguments
    (help || args.length == 0)
        // or didn't provide correct arguments
        || (!poll && file == null)) {
      printUsage();
    }

    // run continuously
    else if (poll) {
      startup();
    }

    // send, then shutdown
    else {
      // file != null
      try {
        // send
        Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(file, attachContent);

        // check how send went
        int numSenders = getProductSenders().size();
        int total = sendExceptions.size();
        int successful = 0;
        int partialFailures = 0;
        int totalFailures = 0;

        Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
        while (sentIds.hasNext()) {
          ProductId sentId = sentIds.next();
          if (sendExceptions.get(sentId).size() == numSenders) {
            totalFailures++;
            LOGGER.severe("Total failure sending product " + sentId.toString());
          } else {
            // output built product id because it was sent at least
            // once
            System.out.println(sentId.toString());

            if (sendExceptions.get(sentId).size() == 0) {
              successful++;
            } else {
              partialFailures++;
              LOGGER.warning("Partial failure sending product " + sentId.toString());
            }
          }
        }

        LOGGER.info("Generated " + total + " products: " + successful + " sent, " + partialFailures
            + " partially sent, " + totalFailures + " failed to send");

        // notify of failures using exit code
        if (totalFailures > 0) {
          System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
        }
        if (partialFailures > 0) {
          System.exit(CLIProductBuilder.EXIT_PARTIALLY_SENT);
        }
      } catch (Exception e) {
        if (e instanceof JAXBException && ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
          SAXParseException spe = (SAXParseException) ((JAXBException) e).getLinkedException();
          LOGGER.severe("Parse error: " + spe.getMessage() + "; line=" + spe.getLineNumber() + ", column="
              + spe.getColumnNumber());
        } else {
          LOGGER.log(Level.SEVERE, "Exception while sending", e);
        }
        System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
      }
    }

  }

  /** Usage for interface */
  public static void printUsage() {
    System.err.println("\nUsage:\n\n" + "java -cp ProductClient.jar gov.usgs.earthquake.eids.EIDSInputWedge" + " ("
        + HELP_ARGUMENT + "|" + POLL_ARGUMENT + "|" + FILE_ARGUMENT + "FILE) [" + PRIVATE_KEY_ARGUMENT + "KEYFILE] ["
        + SIGNATURE_VERSION_ARGUMENT + "VERSION] [" + SERVERS_ARGUMENT + "SERVERS] [" + TEST_ARGUMENT + "] ["
        + CONNECT_TIMEOUT_ARGUMENT + "TIMEOUT] [" + PARSER_ARGUMENT + "PARSER] [" + POLLDIR_ARGUMENT + "POLLDIR] ["
        + POLL_INTERVAL_ARGUMENT + "INTERVAL] [" + STORAGEDIR_ARGUMENT + "STORAGEDIR] [" + ERRORDIR_ARGUMENT
        + "ERRORDIR] [" + ATTACH_ARGUMENT + "ATTACH] [" + SEND_ORIGINS_WITH_PHASES + "] [" + SEND_MECHANISMS_WITH_PHASES
        + "] [" + CREATE_INTERNAL_PRODUCTS + "] [" + CREATE_SCENARIO_PRODUCTS + "] [" + BINARY_FORMAT_ARGUMENT + "] ["
        + DISABLE_DEFLATE_ARGUMENT + "] [" + CLIProductBuilder.DISABLE_PARALLEL_SEND + "] ["
        + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT + "300]");

    System.err.println();

    System.err.println("\t" + HELP_ARGUMENT);
    System.err.println("\t\tdisplay this message");
    System.err.println("\t" + FILE_ARGUMENT + "FILE");
    System.err.println("\t\tparse and send one file");
    System.err.println("\t" + POLL_ARGUMENT);
    System.err.println("\t\trun continuously, checking POLLDIR for files");

    System.err.println();

    System.err.println("\t" + PRIVATE_KEY_ARGUMENT + "KEYFILE");
    System.err.println("\t\topenssh private key used to sign products");
    System.err.println("\t" + SIGNATURE_VERSION_ARGUMENT + "VERSION");
    System.err.println("\t\t'v1' is default, 'v2' is other option.");

    System.err.println("\t" + CONNECT_TIMEOUT_ARGUMENT + "TIMEOUT");
    System.err.println("\t\tmilliseconds before timeout while connecting");
    System.err.println("\t\tdefault is \"" + DEFAULT_CONNECT_TIMEOUT + "\"ms");
    System.err.println("\t\t(must appear before " + SERVERS_ARGUMENT + ")");

    System.err.println("\t" + SERVERS_ARGUMENT + "SERVERS");
    System.err.println("\t\tcomma delimited list of servers(host:port) where products are sent");
    System.err.println("\t\tdefault is \"" + SERVERS_DEFAULT + "\"");
    System.err.println("\t" + TEST_ARGUMENT);
    System.err.println("\t\tPrint generated products to console for testing, ignores " + SERVERS_ARGUMENT);
    System.err.println("\t" + PARSER_ARGUMENT + "PARSER");
    System.err.println("\t\tclass that implements " + "gov.usgs.earthquake.quakeml.FileToQuakemlConverter");
    System.err.println("\t\tdefault is \"" + DEFAULT_PARSER_CLASS + "\"");

    System.err.println();

    System.err.println("\t" + POLLDIR_ARGUMENT + "POLLDIR");
    System.err.println("\t\tdirectory to poll for messages");

    System.err.println("\t" + POLL_INTERVAL_ARGUMENT + "INTERVAL");
    System.err.println("\t\tmilliseconds between polling");
    System.err.println("\t\tdefault is \"" + DEFAULT_POLLINTERVAL + "\"ms");

    System.err.println("\t" + STORAGEDIR_ARGUMENT + "STORAGEDIR");
    System.err.println("\t\tdirectory for files that were processed");

    System.err.println("\t" + ERRORDIR_ARGUMENT + "ERRORDIR");
    System.err.println("\t\tdirectory for files that weren't processed");

    System.err.println("\t" + ATTACH_ARGUMENT + "ATTACH");
    System.err.println("\t\tattach a file or directory to one generated product, repeatable");
    System.err.println("\t\tdirectory trees are preserved, each path must be unique");
    System.err.println("\t\tif more than one product is generated, an exception will be thrown");
    System.err.println("\t" + SEND_ORIGINS_WITH_PHASES);
    System.err
        .println("\t\tWhen a phase-data product is generated, also send an origin product without the phase data");
    System.err.println("\t" + SEND_MECHANISMS_WITH_PHASES);
    System.err.println(
        "\t\tWhen an phase-data product is generated, also send focal mechanism products without the phase data");
    System.err.println();

    System.err.println("\t" + CREATE_INTERNAL_PRODUCTS);
    System.err.println("\t\tuse the product type prefix 'internal-' for all generated products");
    System.err.println("\t" + CREATE_SCENARIO_PRODUCTS);
    System.err.println("\t\tuse the product type suffix '-scenario' for all generated products");

    System.err.println("\t" + BINARY_FORMAT_ARGUMENT);
    System.err.println("\t\tsend to hub using binary format");

    System.err.println("\t" + DISABLE_DEFLATE_ARGUMENT);
    System.err.println("\t\tdisable deflate compression when sending to hubs");

    System.err.println("\t" + CLIProductBuilder.DISABLE_PARALLEL_SEND);
    System.err.println("\t\tsend to servers sequentially");

    System.err.println("\t" + CLIProductBuilder.PARALLEL_SEND_TIMEOUT_ARGUMENT);
    System.err.println("\t\ttimeout for parallel sends in seconds");

    System.exit(1);
  }

  @Override
  public void run() {
    if (!polldir.exists()) {
      polldir.mkdirs();
    }
    while (!Thread.currentThread().isInterrupted()) {
      try {
        Date pollStart = new Date();

        String[] polldirFiles = polldir.list();
        if (polldirFiles.length > 0) {
          LOGGER.fine("Polldir contains " + polldirFiles.length + " files");
        }

        for (int i = 0, len = polldirFiles.length; i < len; i++) {
          File file = new File(polldir, polldirFiles[i]);
          try {
            if (pollCarefully) {
              // wait until file is at least pollInterval ms old,
              // in case it is still being written
              long age = new Date().getTime() - file.lastModified();
              if (age <= pollInterval) {
                continue;
              }
            }

            onFile(file);

            if (storagedir != null) {
              if (!storagedir.exists()) {
                storagedir.mkdirs();
              }
              file.renameTo(new File(storagedir, pollStart.getTime() + "_" + file.getName()));
            } else {
              FileUtils.deleteTree(file);
            }
          } catch (Exception e) {
            if (errordir != null) {
              if (!errordir.exists()) {
                errordir.mkdirs();
              }
              file.renameTo(new File(errordir, pollStart.getTime() + "_" + file.getName()));
            } else {
              LOGGER.warning("Error processing file " + file.getName() + ", and no error directory configured");
              FileUtils.deleteTree(file);
            }
          }
        }

        Date pollEnd = new Date();
        Long pollTime = pollEnd.getTime() - pollStart.getTime();
        if (pollTime < pollInterval) {
          Thread.sleep(pollInterval - pollTime);
        }
      } catch (InterruptedException ie) {
        // interrupted means shutdown
        return;
      }
    }

  }

}