ProductBuilder.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;
import gov.usgs.util.CryptoUtils;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.StreamUtils;
import gov.usgs.util.StringUtils;
import gov.usgs.util.CryptoUtils.Version;

import java.io.File;
import java.security.PrivateKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Essentials for building/sending products.
 *
 * This is the base class for other builders.
 *
 * Supported configurable properties:
 * <dl>
 * <dt>senders</dt>
 * <dd>A comma delimited list of product senders to use when sending
 * products.</dd>
 * <dt>privateKeyFile</dt>
 * <dd>Path to a private key that can be used to sign products.</dd>
 * </dl>
 */
public class ProductBuilder extends DefaultConfigurable {

  private static final Logger LOGGER = Logger.getLogger(ProductBuilder.class.getSimpleName());

  /** Configurable property for senders. */
  public static final String SENDERS_PROPERTY = "senders";

  /** Private key filename configuration property. */
  public static final String PRIVATE_KEY_FILE_PROPERTY = "privateKeyFile"; // rename

  /** Signature version property. */
  public static final String SIGNATURE_VERSION_PROPERTY = "signatureVersion";

  /** Send in parallel. */
  public static final String PARALLEL_SEND_PROPERTY = "parallelSend";
  /** Bool for parallel send */
  public static final String DEFAULT_PARALLEL_SEND = "true";

  /** Timeout in seconds for parallel send. */
  public static final String PARALLEL_SEND_TIMEOUT_PROPERTY = "parallelSendTimeout";
  /** time in ms for parallel send timemout */
  public static final String DEFAULT_PARALLEL_SEND_TIMEOUT = "300";

  /** List of senders where built products are sent. */
  private List<ProductSender> senders = new LinkedList<ProductSender>();

  /** Key used to sign sent products. */
  private PrivateKey privateKey;

  /** Signature version. */
  private Version signatureVersion = Version.SIGNATURE_V2;

  /** Whether to send in parallel. */
  protected boolean parallelSend = true;

  /** How long to wait before parallel send timeout. */
  protected long parallelSendTimeout = 300L;

  /**
   * Send a product. If the product has not yet been signed, and a privateKey is
   * configured, signs the product before sending.
   *
   * @param product the product to send.
   * @return map of all exceptions thrown, from Sender to corresponding Exception.
   * @throws Exception if an error occurs while signing product.
   */
  public Map<ProductSender, Exception> sendProduct(final Product product) throws Exception {

    // Mark which version of client was used to create product
    product.getProperties().put(ProductClient.PDL_CLIENT_VERSION_PROPERTY, ProductClient.RELEASE_VERSION);

    // Doesn't already have a signature
    if (privateKey != null && product.getSignature() == null) {
      product.sign(privateKey, signatureVersion);
    }

    // send product using all product senders.
    if (parallelSend) {
      return parallelSendProduct(senders, product, parallelSendTimeout);
    }

    // send sequentially if not parallel
    Map<ProductSender, Exception> errors = new HashMap<ProductSender, Exception>();
    for (ProductSender sender : senders) {
      try {
        sender.sendProduct(product);
      } catch (Exception e) {
        if (e instanceof ProductAlreadyInStorageException) {
          LOGGER.info("Product already in storage, id=" + product.getId().toString());
        } else {
          LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
          errors.put(sender, e);
        }
      }
    }

    return errors;
  }

  /**
   * @return list of product senders
   */
  public List<ProductSender> getProductSenders() {
    return senders;
  }

  /**
   * Add a ProductSender.
   *
   * @param sender to add
   */
  public void addProductSender(final ProductSender sender) {
    senders.add(sender);
  }

  /**
   * Remove a previously added ProductSender.
   *
   * @param sender to remove
   */
  public void removeProductSender(final ProductSender sender) {
    senders.remove(sender);
  }

  /** @return privateKey */
  public PrivateKey getPrivateKey() {
    return privateKey;
  }

  /** @param privateKey to set */
  public void setPrivateKey(PrivateKey privateKey) {
    this.privateKey = privateKey;
  }

  /** @return signatureVersion */
  public Version getSignatureVersion() {
    return signatureVersion;
  }

  /** @param signatureVersion to set */
  public void setSignatureVersion(Version signatureVersion) {
    this.signatureVersion = signatureVersion;
  }

  @Override
  public void configure(final Config config) throws Exception {
    Iterator<String> senderNames = StringUtils.split(config.getProperty(SENDERS_PROPERTY), ",").iterator();
    while (senderNames.hasNext()) {
      String name = senderNames.next();
      LOGGER.config("Loading sender " + name);

      ProductSender sender = (ProductSender) Config.getConfig().getObject(name);
      if (sender == null) {
        throw new ConfigurationException("Unable to load sender '" + name + "', make sure it is properly configured.");
      }
      addProductSender(sender);
    }

    String keyFilename = config.getProperty(PRIVATE_KEY_FILE_PROPERTY);
    if (keyFilename != null) {
      LOGGER.config("[" + getName() + "] Loading private key file '" + keyFilename + "'");
      privateKey = CryptoUtils.readOpenSSHPrivateKey(StreamUtils.readStream(new File(keyFilename)), null);
    }

    String version = config.getProperty(SIGNATURE_VERSION_PROPERTY);
    if (version != null) {
      signatureVersion = Version.fromString(version);
    }
    LOGGER.config("[" + getName() + "] signature version = " + signatureVersion);

    parallelSend = Boolean.valueOf(config.getProperty(PARALLEL_SEND_PROPERTY, DEFAULT_PARALLEL_SEND));
    parallelSendTimeout = Long
        .valueOf(config.getProperty(PARALLEL_SEND_TIMEOUT_PROPERTY, DEFAULT_PARALLEL_SEND_TIMEOUT));
    LOGGER.config("[" + getName() + "] parallel send enabled=" + parallelSend + ", timeout=" + parallelSendTimeout);
  }

  @Override
  public void shutdown() throws Exception {
    Iterator<ProductSender> iter = senders.iterator();
    while (iter.hasNext()) {
      iter.next().shutdown();
    }
  }

  @Override
  public void startup() throws Exception {
    Iterator<ProductSender> iter = senders.iterator();
    while (iter.hasNext()) {
      iter.next().startup();
    }
  }

  /**
   * Send a product to all ProductSenders concurrently.
   *
   * @param senders        the senders to receive product.
   * @param product        the product to send.
   * @param timeoutSeconds number of seconds before timing out, interrupting any
   *                       pending send.
   * @return exceptions that occured while sending. If map is empty, there were no
   *         exceptions.
   */
  public static Map<ProductSender, Exception> parallelSendProduct(final List<ProductSender> senders,
      final Product product, final long timeoutSeconds) {
    final Map<ProductSender, Boolean> sendComplete = Collections.synchronizedMap(new HashMap<ProductSender, Boolean>());
    final Map<ProductSender, Exception> sendExceptions = Collections
        .synchronizedMap(new HashMap<ProductSender, Exception>());

    Iterator<ProductSender> iter = senders.iterator();
    List<Callable<Void>> sendTasks = new ArrayList<Callable<Void>>();
    while (iter.hasNext()) {
      final ProductSender sender = iter.next();
      sendComplete.put(sender, false);
      sendTasks.add(() -> {
        try {
          sender.sendProduct(product);
          sendComplete.put(sender, true);
        } catch (Exception e) {
          if (e instanceof ProductAlreadyInStorageException) {
            LOGGER.info("Product already in storage, id=" + product.getId().toString());
          } else {
            LOGGER.log(Level.WARNING, "[" + sender.getName() + "] error sending product", e);
            sendExceptions.put(sender, e);
          }
        }
        return null;
      });
    }
    // run in parallel
    ExecutorService sendExecutor = Executors.newFixedThreadPool(senders.size());
    try {
      sendExecutor.invokeAll(sendTasks, timeoutSeconds, TimeUnit.SECONDS);
    } catch (Exception e) {
      // this may be Interupted, NullPointer, or RejectedExecution
      // in any case, this part is done and move on to checking send status
    }
    sendExecutor.shutdown();
    // check whether send completed or was interrupted
    for (ProductSender sender : sendComplete.keySet()) {
      if (!sendComplete.get(sender) && sendExceptions.get(sender) == null) {
        sendExceptions.put(sender, new InterruptedException());
      }
    }

    return sendExceptions;
  }

}