ReplicationStorageListener.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.ProductId;
import gov.usgs.util.Config;
import gov.usgs.util.ProcessTimeoutException;
import gov.usgs.util.StringUtils;
import gov.usgs.util.TimeoutProcess;
import gov.usgs.util.TimeoutProcessBuilder;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/** Class to handle replication of storage */
public class ReplicationStorageListener extends DefaultStorageListener {

  private static final Logger LOGGER = Logger.getLogger(ReplicationStorageListener.class.getName());

  /**
   * Name of the property specifying whether to use archive flag on the
   * replication.
   */
  public static final String ARCHIVE_FLAG_PROPERTY = "archiveSync";

  /**
   * Name of the property specifying the replication command on the host system.
   */
  public static final String REPL_CMD_PROPERTY = "rsync";

  /**
   * Name of property indicating how many times the replication should be
   * attempted before considering it a failure.
   */
  public static final String REPL_MAX_TRIES_PROPERTY = "maxTries";

  /**
   * Name of the property specifying how long to wait for the replication to
   * complete successfully.
   */
  public static final String REPL_TIMEOUT_PROPERTY = "timeout";

  /**
   * Name of property specifying to which hosts the storage should be replicated.
   */
  public static final String REPL_HOSTS_PROPERTY = "targetHosts";

  /** Default. Use archiving. */
  private static final boolean ARCHIVE_FLAG_DEFAULT = true;

  /** Default replication command */
  private static final String REPL_CMD_DEFAULT = "rsync";

  /** Default number of times to try replication. */
  private static final int REPL_MAX_TRIES_DEFAULT = 1;

  /** Default replication timeout (milliseconds). */
  private static final long REPL_TIMEOUT_DEFAULT = 30000L;

  /** Default replication hosts. None. */
  private static final Map<String, ExecutorService> REPL_HOSTS_DEFAULT = new HashMap<String, ExecutorService>();

  private boolean archiveFlag = ARCHIVE_FLAG_DEFAULT;
  private String replCmd = REPL_CMD_DEFAULT;
  private int replMaxTries = REPL_MAX_TRIES_DEFAULT;
  private long replTimeout = REPL_TIMEOUT_DEFAULT;
  private Map<String, ExecutorService> replHosts = REPL_HOSTS_DEFAULT;

  /**
   * Default constructor used when this object is instantiated via configuration.
   */
  public ReplicationStorageListener() {
  }

  /**
   * Customer initialization of the constructor
   *
   * @param archiveFlag Bool flag of what to do on archive
   * @param replCmd     Replication command on host system
   * @param replTimeout Replication in ms
   * @param replHosts   List of Replication hosts
   */
  public ReplicationStorageListener(final boolean archiveFlag, String replCmd, final long replTimeout,
      final List<String> replHosts) {
    this.archiveFlag = archiveFlag;
    this.replCmd = replCmd;
    this.replTimeout = replTimeout;
    setReplHosts(replHosts);
  }

  /**
   * Set new Replication hosts
   *
   * @param replHosts string list of new hosts
   */
  protected void setReplHosts(List<String> replHosts) {
    this.replHosts = new HashMap<String, ExecutorService>();
    Iterator<String> replHostsIter = replHosts.iterator();
    while (replHostsIter.hasNext()) {
      String replHost = replHostsIter.next();
      ExecutorService service = Executors.newSingleThreadExecutor();
      this.replHosts.put(replHost, service);
    }
  }

  @Override
  public void configure(Config config) {

    // -- Configure the archive flag property
    try {
      String useArchive = config.getProperty(ARCHIVE_FLAG_PROPERTY);
      if ("TRUE".equalsIgnoreCase(useArchive)) {
        archiveFlag = true;
      } else {
        archiveFlag = false;
      }
    } catch (Exception ex) {
      LOGGER.warning(
          "[" + getName() + "] replicationStorageListener::Archive flag " + "" + "misconfigured. Using default.");
      archiveFlag = ARCHIVE_FLAG_DEFAULT;
    }

    // -- Configure the replication command property
    try {
      replCmd = config.getProperty(REPL_CMD_PROPERTY);
      if (replCmd == null || "".equals(replCmd)) {
        replCmd = REPL_CMD_DEFAULT;
      }
    } catch (Exception ex) {
      LOGGER.warning("[" + getName() + "] replicationStorageListener::Exception " + "configuring replication command. ("
          + ex.getMessage() + ")");
    }

    // -- Configure the replication max tries property
    try {
      replMaxTries = Integer.parseInt(config.getProperty(REPL_MAX_TRIES_PROPERTY));
    } catch (NumberFormatException npx) {
      LOGGER.warning(
          "[" + getName() + "] replicationStorageListener::Bad value for " + "replication max tries. Using default.");
      replTimeout = REPL_MAX_TRIES_DEFAULT;
    } catch (NullPointerException npx) {
      // User didn't configure timeout. Just use default; no warning.
      replTimeout = REPL_MAX_TRIES_DEFAULT;
    }

    // -- Configure the replication timeout property
    try {
      replTimeout = Long.parseLong(config.getProperty(REPL_TIMEOUT_PROPERTY));
    } catch (NumberFormatException npx) {
      LOGGER.warning(
          "[" + getName() + "] replicationStorageListener::Bad value for " + "replication timeout. Using default.");
      replTimeout = REPL_TIMEOUT_DEFAULT;
    } catch (NullPointerException npx) {
      // User didn't configure timeout. Just use default; no warning.
      replTimeout = REPL_TIMEOUT_DEFAULT;
    }

    // -- Configure the replication hosts property
    try {
      setReplHosts(StringUtils.split(config.getProperty(REPL_HOSTS_PROPERTY), ","));
    } catch (Exception ex) {
      LOGGER.warning("[" + getName() + "] replicationStorageListener::No replication hosts configured.");
      replHosts = REPL_HOSTS_DEFAULT;
    }
  }

  @Override
  public void onProductStored(StorageEvent event) throws Exception {
    if (!(event.getProductStorage() instanceof FileProductStorage)) {
      return; // Can't replicate a non-file product storage
    }
    LOGGER.info("[" + getName() + "] product stored. Replicating. (" + event.getProductId().toString() + ")");
    syncProductContents((FileProductStorage) event.getProductStorage(), event.getProductId(), false);
    LOGGER.info("[" + getName() + "] product replicated to remote. (" + event.getProductId().toString() + ")");
  }

  @Override
  public void onProductRemoved(StorageEvent event) throws Exception {
    if (!(event.getProductStorage() instanceof FileProductStorage)) {
      return; // Can't replicate a non-file product storage
    }

    LOGGER.info("[" + getName() + "] product removed. Replicating. (" + event.getProductId().toString() + ")");
    syncProductContents((FileProductStorage) event.getProductStorage(), event.getProductId(), true);
    LOGGER.info("[" + getName() + "] product removal replicated. (" + event.getProductId().toString() + ")");
  }

  /**
   *
   * @param storage  FileProductStorage to use as the base directory
   * @param id       ID of product in storage
   * @param deleting Bool flag for deleting
   * @throws IOException if IO error occurs
   */
  protected void syncProductContents(FileProductStorage storage, ProductId id, boolean deleting) throws IOException {

    final File baseDir = storage.getBaseDirectory();
    final String path = storage.getProductPath(id);

    Iterator<String> replHostsIter = replHosts.keySet().iterator();
    while (replHostsIter.hasNext()) {
      final String replHost = replHostsIter.next();
      final ExecutorService service = replHosts.get(replHost);
      service.submit(new ReplicationTask(createReplicationCommand(baseDir, path, replHost, deleting), baseDir,
          replMaxTries, replTimeout, service));
    }
  }

  /**
   * Create the replication command.
   *
   * @param baseDir  The directory from which replication will be executed.
   * @param path     The path of the content to replicate
   * @param host     The host string to which content should be replicated. Format
   *                 = user@host:path
   * @param deleting Flag whether this should be a deleting replication or not
   *
   * @return The command and arguments as a list suitable for a
   *         <code>ProcessBuilder</code>.
   * @throws IOException if IO error occurs
   */
  protected List<String> createReplicationCommand(final File baseDir, final String path, final String host,
      final boolean deleting) throws IOException {

    // Make sure we are replicating a directory that actually exists
    File source = new File(baseDir, path);

    while (!source.exists() && !source.getParentFile().equals(baseDir)) {
      source = source.getParentFile();
    }

    // StringBuffer command = new StringBuffer(replCmd);
    List<String> command = new ArrayList<String>();
    command.add(replCmd);

    if (archiveFlag) {
      command.add("-a");
    }

    command.add("-vz");
    command.add("--relative");
    command.add("-e");
    command.add("ssh -o ConnectTimeout=5");

    if (deleting) {
      // To do a delete we must sync the parent directory and then
      // explicitly include the original target directory and exclude
      // everything else.
      command.add("--delete");
      command.add("--include='" + source.getName() + "**'");
      command.add("--exclude='*'");
      source = source.getParentFile();
    } else {

    }

    command.add("." + source.getCanonicalPath().replace(baseDir.getCanonicalPath(), ""));

    command.add(host);

    return command;
  }

  /** Specialized Thread for a replication task */
  protected class ReplicationTask extends Thread {

    // Command to execute
    private List<String> command = null;
    // String representation of command
    private String cmdStr = null;
    // Working directory from where to execute the command
    private File cwd = null;
    // Number of times to try replication
    private int numTries = 1;
    // How long to let the command try for
    private long timeout = 1000L;
    // Executor service to repeat this task if appropriate
    private ExecutorService service = null;

    /**
     * Constructor of a replication task
     *
     * @param command  command to execute
     * @param cwd      Direcetory to execute the command
     * @param numTries How many times to try the replication
     * @param timeout  in ms
     * @param service  Executor service
     */
    public ReplicationTask(final List<String> command, final File cwd, final int numTries, final long timeout,
        final ExecutorService service) {
      this.command = command;
      this.cwd = cwd;
      this.timeout = timeout;
      this.numTries = numTries;
      this.service = service;

      // Command string for easier viewing
      StringBuffer buf = new StringBuffer();
      Iterator<String> iter = command.iterator();
      while (iter.hasNext()) {
        buf.append(iter.next()).append(" ");
      }
      this.cmdStr = buf.toString().trim();

    }

    public void run() {
      try {
        TimeoutProcessBuilder builder = new TimeoutProcessBuilder(timeout, command);
        builder.directory(cwd);
        TimeoutProcess process = builder.start();
        int exitStatus = process.waitFor();

        LOGGER.info("[" + getName() + "] command \"" + cmdStr + "\" exited with status [" + exitStatus + "]");
        if (exitStatus != 0) {
          LOGGER.info(
              "[" + getName() + "] command \"" + cmdStr + "\" error output: " + new String(process.errorOutput()));
        }
      } catch (ProcessTimeoutException cex) {

        StringBuffer message = new StringBuffer();
        message.append("[" + getName() + "] command \"").append(cmdStr).append("\" timed out.");

        if (numTries > 0) {
          // Try again
          message.append(" Trying again.");
          service.submit(this);
        } else {
          message.append(" Not retrying.");
        }
        LOGGER.warning(message.toString());
      } catch (IOException iox) {
        LOGGER.log(Level.WARNING, iox.getMessage(), iox);
      } catch (InterruptedException iex) {
        LOGGER.warning(iex.getMessage());
      }
    }
  }
}