InputWedge.java

package gov.usgs.earthquake.distribution;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Logger;

import javax.xml.bind.JAXBException;

import org.xml.sax.SAXParseException;

import gov.usgs.earthquake.product.Content;
import gov.usgs.earthquake.product.FileContent;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.DirectoryProductSource;
import gov.usgs.earthquake.product.io.ObjectProductHandler;
import gov.usgs.util.Config;
import gov.usgs.util.DirectoryPoller;
import gov.usgs.util.Poller;
import gov.usgs.util.StringUtils;
import gov.usgs.util.FileListenerInterface;

/**
 * Read messages from files or a poll directory, and push products into PDL.
 *
 * The input messages are converted to Quakeml using the FileToQuakemlConverter
 * interface, then sent as Quakeml based products.
 *
 * Much of the configuration can be supplied using either a configuration file,
 * or command line arguments.
 */
public class InputWedge extends ProductBuilder implements Runnable, Bootstrappable, FileListenerInterface {

  private static final Logger LOGGER = Logger.getLogger(InputWedge.class.getName());

  /** Property for parser class */
  public static final String PARSER_CLASS_PROPERTY = "parserClass";
  /** Default parser class */
  public static final String DEFAULT_PARSER_CLASS = "gov.usgs.earthquake.distribution.QuakemlProductCreator";
  /** Convert parsed quakeml to a product. */
  private ProductCreator productCreator = new QuakemlProductCreator();

  /** Property for validate */
  public static final String VALIDATE_PROPERTY = "validate";
  /** Default status of validate */
  public static final String DEFAULT_VALIDATE = "false";

  /** Property for sendOriginWhenPhasesExist */
  public static final String SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY = "sendOriginWhenPhasesExist";
  /** Default status of sendOrigin... */
  public static final String DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST = "false";

  /** Property for sendMechanismWhenPhasesExist */
  public static final String SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY = "sendMechanismWhenPhasesExist";
  /** Default status of sendMechanism... */
  public static final String DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST = "false";

  /** Whether created products should be converted to internal types. */
  public static final String CREATE_INTERNAL_PRODUCTS_PROPERTY = "createInternalProducts";
  /** Default status of CREATE_INTERNAL_PRODUCTS */
  public static final String DEFAULT_CREATE_INTERNAL_PRODUCTS = "false";
  private boolean createInternalProducts = false;

  /** Whether created products should be converted to scenario types. */
  public static final String CREATE_SCENARIO_PRODUCTS_PROPERTY = "createScenarioProducts";
  /** Default status of CREATE_SCENARIO_PRODUCTS */
  public static final String DEFAULT_CREATE_SCENARIO_PRODUCTS = "false";
  private boolean createScenarioProducts = false;

  public static final String POLLER_CLASS_PROPERTY = "poller";
  /** Polling object. */
  private Poller poller;

  /** Property for doBufferFix */
  public static final String DO_BUFFER_FIX_PROPERTY = "doBufferFix";
  /** Default status of DO_BUFFER_FIX property */
  public static final String DEFAULT_DO_BUFFER_FIX = "true";
  private boolean doBufferFix = true;

  private Thread pollThread = null;

  /**
   * Empty constructor
   *
   * @throws Exception if error occurs
   */
  public InputWedge() throws Exception {
  }

  @Override
  public void configure(Config config) throws Exception {
    super.configure(config);

    String parserClassName = config.getProperty(PARSER_CLASS_PROPERTY, DEFAULT_PARSER_CLASS);
    Object parserObj = Class.forName(parserClassName).getConstructor().newInstance();
    if (parserObj instanceof ProductCreator) {
      productCreator = (ProductCreator) parserObj;
    } else {
      throw new ConfigurationException(String.format("configured parser class %s does not implement %s",
          parserClassName, ProductCreator.class.getName()));
    }
    LOGGER.config("Using parser class " + parserClassName);

    boolean validate = Boolean.parseBoolean(config.getProperty(VALIDATE_PROPERTY, DEFAULT_VALIDATE));
    productCreator.setValidate(validate);
    LOGGER.config("Validation " + (validate ? "enabled" : "disabled"));

    boolean sendOriginWhenPhasesExist = Boolean
        .valueOf(config.getProperty(SEND_ORIGIN_WHEN_PHASES_EXIST_PROPERTY, DEFAULT_SEND_ORIGIN_WHEN_PHASES_EXIST));
    productCreator.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);

    LOGGER.config("sendOriginWhenPhasesExist = " + sendOriginWhenPhasesExist);

    boolean sendMechanismWhenPhasesExist = Boolean.valueOf(
        config.getProperty(SEND_MECHANISM_WHEN_PHASES_EXIST_PROPERTY, DEFAULT_SEND_MECHANISM_WHEN_PHASES_EXIST));
    productCreator.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);

    LOGGER.config("sendMechanismWhenPhasesExist = " + sendMechanismWhenPhasesExist);

    String pollerClass = config.getProperty(POLLER_CLASS_PROPERTY);
    if (Objects.isNull(pollerClass)) {
      LOGGER.config(String.format("pollerClass not defined, using default %s", DirectoryPoller.class.getName()));
      DirectoryPoller directoryPoller = new DirectoryPoller();
      directoryPoller.configure(new Config());

      poller = directoryPoller;
    } else {
      poller = (Poller) Config.getConfig().getObject(pollerClass);
      LOGGER.config("Using pollerClass class " + pollerClass);
    }

    createInternalProducts = Boolean
        .valueOf(config.getProperty(CREATE_INTERNAL_PRODUCTS_PROPERTY, DEFAULT_CREATE_INTERNAL_PRODUCTS));
    LOGGER.config("createInternalProducts = " + createInternalProducts);

    createScenarioProducts = Boolean
        .valueOf(config.getProperty(CREATE_SCENARIO_PRODUCTS_PROPERTY, DEFAULT_CREATE_SCENARIO_PRODUCTS));
    LOGGER.config("createScenarioProducts = " + createScenarioProducts);

    doBufferFix = Boolean.valueOf(config.getProperty(DO_BUFFER_FIX_PROPERTY, DEFAULT_DO_BUFFER_FIX));
    LOGGER.config("doBufferFix = " + doBufferFix);
  }

  @Override
  public void startup() throws Exception {
    super.startup();

    if (pollThread == null) {
      pollThread = new Thread(this);
      pollThread.setName("poll thread");
      pollThread.start();
    }
  }

  @Override
  public void shutdown() throws Exception {
    if (pollThread != null) {
      pollThread.interrupt();
      pollThread = null;
    }

    super.shutdown();
  }

  /**
   * Gets products from file and iterates through each product During iteration,
   * sets type to internal/scenario if createInternalProducts or
   * createScenarioProducts is true. Attaches Content files to product, Sends
   * product
   *
   * @param file          File containing products
   * @param attachContent Map of String and Content
   * @return Map of product IDs and sent products
   * @throws Exception if error occurs
   */
  public Map<ProductId, Map<ProductSender, Exception>> parseAndSend(final File file,
      final Map<String, Content> attachContent) throws Exception {

    Map<ProductId, Map<ProductSender, Exception>> sendProductResults = new HashMap<ProductId, Map<ProductSender, Exception>>();

    List<Product> products;
    // If a directory is given, treat it as a single product, otherwise parse it out
    // using the productCreator
    if (file.isDirectory()) {
      DirectoryProductSource directoryProductSource = new DirectoryProductSource(file);

      final ObjectProductHandler handler = new ObjectProductHandler();
      directoryProductSource.streamTo(handler);
      handler.close();

      products = List.of(handler.getProduct());
    } else {
      products = productCreator.getProducts(file);
    }

    for (Product product : products) {
      ProductId id = product.getId();

      if (createInternalProducts) {
        id.setType("internal-" + id.getType());
      }
      if (createScenarioProducts) {
        id.setType(id.getType() + "-scenario");
      }

      // attach files to generated product
      if (attachContent != null && attachContent.size() > 0) {
        if (products.size() > 1) {
          throw new Exception("Trying to attach files," + " generated more than 1 product");
        }
        product.getContents().putAll(attachContent);
      }

      // send product, save any exceptions
      sendProductResults.put(product.getId(), sendProduct(product));
    }

    return sendProductResults;
  }

  /**
   * Parses given file, looking for send exceptions and reports statistics
   *
   * @param file to parse and look for errors
   * @throws Exception
   */
  public void onFile(File file) throws Exception {
    onFile(file, null);
  }

  public void onFile(File file, final Map<String, Content> attachContent) throws Exception {
    LOGGER.info("Reading file " + file.getName());

    try {
      Map<ProductId, Map<ProductSender, Exception>> sendExceptions = parseAndSend(file, null);

      // check how send went
      int numSenders = getProductSenders().size();
      int total = sendExceptions.size();
      int successful = 0;
      int partialFailures = 0;
      int totalFailures = 0;

      Iterator<ProductId> sentIds = sendExceptions.keySet().iterator();
      while (sentIds.hasNext()) {
        ProductId sentId = sentIds.next();
        if (sendExceptions.get(sentId).size() == numSenders) {
          totalFailures++;
          LOGGER.severe("Total failure sending product " + sentId.toString());
        } else {
          // output built product id because it was sent at least once
          System.out.println(sentId.toString());

          if (sendExceptions.get(sentId).size() == 0) {
            successful++;
          } else {
            partialFailures++;
            LOGGER.warning("Partial failure sending product " + sentId.toString());
          }
        }
      }

      LOGGER.info("generated " + total + " products: " + successful + " sent, " + partialFailures + " partially sent, "
          + totalFailures + " failed to send");

      // notify of failures using exit code
      if (totalFailures > 0) {
        // consider this failure, event if some products sent
        throw new Exception();
      }
    } catch (Exception e) {
      if (e instanceof JAXBException && ((JAXBException) e).getLinkedException() instanceof SAXParseException) {
        SAXParseException spe = (SAXParseException) ((JAXBException) e).getLinkedException();
        LOGGER.warning(
            "Parse error: " + spe.getMessage() + "; line=" + spe.getLineNumber() + ", column=" + spe.getColumnNumber());
      }

      throw e;
    }
  }

  /** @return productCreator */
  public ProductCreator getProductCreator() {
    return productCreator;
  }

  /** @param productCreator to set */
  public void setProductCreator(ProductCreator productCreator) {
    this.productCreator = productCreator;
  }

  /** @return poller */
  public Poller getPoller() {
    return poller;
  }

  /** @param poller to set */
  public void setPoller(Poller poller) {
    this.poller = poller;
  }

  /**
   * @return the createInternalProducts
   */
  public boolean isCreateInternalProducts() {
    return createInternalProducts;
  }

  /**
   * @param createInternalProducts the createInternalProducts to set
   */
  public void setCreateInternalProducts(boolean createInternalProducts) {
    this.createInternalProducts = createInternalProducts;
  }

  /**
   * @return the createScenarioProducts
   */
  public boolean isCreateScenarioProducts() {
    return createScenarioProducts;
  }

  /**
   * @param createScenarioProducts the createScenarioProducts to set
   */
  public void setCreateScenarioProducts(boolean createScenarioProducts) {
    this.createScenarioProducts = createScenarioProducts;
  }

  /**
   * Parses a string of senders into configured ProductSenders, all put into a
   * list of product senders
   *
   * @param sendersString String of senders in config file, split by commas
   * @return List of product senders
   * @throws Exception
   */
  private List<ProductSender> parseSenders(final String sendersString) throws Exception {
    List<ProductSender> productSenders = new ArrayList<>();

    List<String> senders = StringUtils.split(sendersString, ",");
    for (String sender : senders) {
      ProductSender productSender = (ProductSender) Config.getConfig().getObject(sender);
      if (sender == null) {
        throw new ConfigurationException(
            "Unable to load sender '" + sender + "', make sure it is properly configured.");
      }
      productSenders.add(productSender);

    }

    return productSenders;
  }

  /** Argument for help */
  public static final String HELP_ARGUMENT = "--help";
  /** Argument for poll */
  public static final String POLL_ARGUMENT = "--poll";
  /** Argument for testing */
  public static final String TEST_ARGUMENT = "--test";

  /** Argument for file */
  public static final String FILE_ARGUMENT = "--file=";

  /** Argument for servers */
  public static final String SENDERS_ARGUMENT = "--senders=";
  /** Argument for attach */
  public static final String ATTACH_ARGUMENT = "--attach=";

  /**
   * Bootstrappable interface.
   */
  @Override
  public void run(final String[] args) throws Exception {
    boolean help = findArg(args, HELP_ARGUMENT).isPresent() ? true : false;
    boolean test = findArg(args, TEST_ARGUMENT).isPresent() ? true : false;
    boolean poll = findArg(args, POLL_ARGUMENT).isPresent() ? true : false;

    boolean validate = this.productCreator.isValidate();
    boolean sendOriginWhenPhasesExist = false;
    boolean sendMechanismWhenPhasesExist = false;

    // preserve any existing settings from config file
    sendOriginWhenPhasesExist = productCreator.isSendOriginWhenPhasesExist();

    File file = null;
    // when sending 1 product, allow extra files to be attached.
    HashMap<String, Content> attachContent = new HashMap<String, Content>();

    if (test) {
      getProductSenders().clear();
      getProductSenders().add(new DebugProductSender());
    }

    Optional<String> fileArg = findArg(args, FILE_ARGUMENT);
    file = fileArg.isPresent() ? new File(fileArg.get().replace(FILE_ARGUMENT, "")) : null;

    Optional<String> attachArg = findArg(args, ATTACH_ARGUMENT);
    File attach = attachArg.isPresent() ? new File(attachArg.get().replace(FILE_ARGUMENT, "")) : null;
    if (Objects.nonNull(attach)) {
      if (attach.isDirectory()) {
        attachContent.putAll(FileContent.getDirectoryContents(attach));
      } else {
        attachContent.put(attach.getName(), new FileContent(attach));
      }
    }

    Optional<String> sendersArg = findArg(args, SENDERS_ARGUMENT);
    // ignore senders argument when in test mode
    if (!test && sendersArg.isPresent()) {
      getProductSenders().clear();
      getProductSenders().addAll(parseSenders(sendersArg.get().replace(SENDERS_ARGUMENT, "")));
    }

    ProductCreator creator = getProductCreator();
    creator.setValidate(validate);
    creator.setSendOriginWhenPhasesExist(sendOriginWhenPhasesExist);
    creator.setSendMechanismWhenPhasesExist(sendMechanismWhenPhasesExist);

    if (
    // want usage, or didn't provide arguments
    (help || args.length == 0)
        // or didn't provide correct arguments
        || (!poll && file == null)) {
      printUsage();
    }

    // run continuously
    else if (poll) {
      startup();
    }

    // send, then shutdown
    else {
      // file != null
      try {
        onFile(file, attachContent);
      } catch (Exception ignored) {
        System.exit(CLIProductBuilder.EXIT_UNABLE_TO_SEND);
      }
    }
  }

  private Optional<String> findArg(final String[] args, final String arg) {
    return Arrays.stream(args)
        .filter(
            keyword -> keyword.startsWith(arg))
        .findFirst();
  }

  /** Usage for interface */
  public static void printUsage() {

    String usageMessage = new StringBuilder()
        .append("\nUsage:\n\n")
        .append("java -cp ProductClient.jar gov.usgs.earthquake.distribution.InputWedge ")
        .append("(" + HELP_ARGUMENT + "|" + POLL_ARGUMENT + "|" + FILE_ARGUMENT + "FILE) ")
        .append("[" + SENDERS_ARGUMENT + "SERVERS] ")
        .append("[" + TEST_ARGUMENT + "] ")
        .toString();

    System.err.println(usageMessage);

    String detailUsageMessage = new StringBuilder()
        .append("\n\t" + HELP_ARGUMENT)
        .append("\n\t\tdisplay this message")
        .append("\n\t" + FILE_ARGUMENT + "FILE")
        .append("\n\t\tparse and send one file")
        .append("\n\t" + POLL_ARGUMENT)
        .append("\n\t\trun continuously, checking POLLDIR for files")
        .append("\n\t" + TEST_ARGUMENT)
        .append("\n\t\tPrint generated products to console for testing, ignores " + SENDERS_ARGUMENT)
        .append("\n\n")
        .append("\n\t" + SENDERS_ARGUMENT + "SENDERS")
        .append("\n\t\tcomma delimited list of senders from the config file where products are sent")
        .append("\n\t\tthis will overwrite senders configured in the config file")
        .append("\n\t\tthis is to be used when you want to send to a subset of senders in the config file")
        .append("\n\t\t")
        .append("\n\t" + ATTACH_ARGUMENT + "ATTACH")
        .append("\n\t\tattach a file or directory to one generated product, repeatable")
        .append("\n\t\tdirectory trees are preserved, each path must be unique")
        .append("\n\t\tif more than one product is generated, an exception will be thrown")
        .append("\n\t\tthis is only supported when the " + FILE_ARGUMENT + " argument is given (not in poll mode)")
        .toString();

    System.err.println(detailUsageMessage);
    System.exit(1);
  }

  @Override
  public void run() {

    poller.addFileListener(this);
    try {
      poller.start();
    } catch (Exception e) {
      return;
    }
  }

}