DirectoryPoller.java

/*
 * DirectoryPoller
 *
 * $Id$
 * $HeadURL$
 */
package gov.usgs.util;

import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;

/**
 * Monitor a directory for files, notifying FileListenerInterfaces.
 *
 * Implementers of the FileListenerInterface should process files before
 * returning, because these files may move or disappear.
 */
public class DirectoryPoller extends DefaultConfigurable implements Poller {

  private static final Logger LOGGER = Logger.getLogger(DirectoryPoller.class.getName());

  /** Property for pollCarefully */
  public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
  /** Default status of POLL_CAREFULLY */
  public static final String DEFAULT_POLL_CAREFULLY = "false";

  /** Property for pollDirectory */
  public static final String POLL_DIRECTORY_PROPERTY = "pollDirectory";
  /** Default pollDirectory */
  public static final String DEFAULT_POLL_DIRECTORY = "pollDirectory";

  /** Property for errorDirectory */
  public static final String ERROR_DIRECTORY_PROPERTY = "errorDirectory";
  /** Default errorDirectory */
  public static final String DEFAULT_ERROR_DIRECTORY = "errorDirectory";

  /** Property for storageDirectory */
  public static final String STORAGE_DIRECTORY_PROPERTY = "oldInputDirectory";
  /** Default storageDirectory */
  public static final String DEFAULT_STORAGE_DIRECTORY = "oldInputDirectory";

  /** Poll interval property */
  public static final String POLLINTERVAL_PROPERTY = "interval";
  /** Default interval for POLLINTERVAL */
  public static final String DEFAULT_POLLINTERVAL = "1000";

  /** Polling interval */
  private Long pollInterval;

  /** Carful polling enabled */
  private Boolean pollCarefully;

  /** Directory to watch. */
  private File pollDirectory;

  /** Directory to store files in. */
  private File storageDirectory;

  /** Directory to store files that throw exceptions in. */
  private File errorDirectory;

  /** Timer schedules polling frequency. */
  private Timer timer;

  /** Notification of files. */
  private List<FileListenerInterface> listeners = new LinkedList<FileListenerInterface>();

  /**
   * Create a DirectoryPoller.
   *
   */
  public DirectoryPoller() {
  }

  @Override
  public void configure(Config config) throws Exception {

    this.pollDirectory = new File(config.getProperty(POLL_DIRECTORY_PROPERTY, DEFAULT_POLL_DIRECTORY));
    LOGGER.config("Using poll directory " + this.pollDirectory.getCanonicalPath());

    this.pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY, DEFAULT_POLLINTERVAL));
    LOGGER.config("Using poll interval " + pollInterval + "ms");

    this.pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY, DEFAULT_POLL_CAREFULLY));
    LOGGER.config("Poll carefully = " + pollCarefully);

    this.storageDirectory = new File(config.getProperty(STORAGE_DIRECTORY_PROPERTY, DEFAULT_STORAGE_DIRECTORY));
    LOGGER.config("Using oldinput directory " + storageDirectory.getCanonicalPath());

    this.errorDirectory = new File(config.getProperty(ERROR_DIRECTORY_PROPERTY, DEFAULT_ERROR_DIRECTORY));
    LOGGER.config("Using error directory " + errorDirectory.getCanonicalPath());

  }

  /** @param listener FileListenerInterface to add */
  @Override
  public void addFileListener(final FileListenerInterface listener) {
    listeners.add(listener);
  }

  /** @param listener FileListenerInterface to remove */
  @Override
  public void removeFileListener(final FileListenerInterface listener) {
    listeners.remove(listener);
  }

  /**
   * Notify all listeners that files exist and need to be processed.
   *
   * @param file that needs to be processed
   */
  @Override
  public void notifyListeners(final File file) {
    for (FileListenerInterface listener : listeners) {
      try {
        listener.onFile(file);
      } catch (Exception e) {
        LOGGER.warning(
            String.format("Error processing file %s, moving to %s", file.getName(), getErrorDirectory().getName()));

        moveToDirectory(file, getErrorDirectory());
      }
    }
  }

  /**
   * Start polling in a background thread.
   *
   * Any previously scheduled polling is stopped before starting at this
   * frequency. This schedules using fixed-delay (time between complete polls) as
   * opposed to fixed-rate (how often to start polling).
   *
   */
  @Override
  public void start() {
    if (timer != null) {
      // already started
      stop();
    }

    if (!this.getPollDirectory().exists()) {
      this.getPollDirectory().mkdirs();
    }

    timer = new Timer();
    timer.schedule(new PollTask(), 0L, this.pollInterval);
  }

  /**
   * Stop any currently scheduled polling.
   */
  @Override
  public void stop() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  /**
   * The Polling Task. Notifies all listeners then either deletes or moves the
   * file to storage.
   *
   * @author jmfee
   *
   */
  protected class PollTask extends TimerTask {
    public void run() {

      File[] files = pollDirectory.listFiles();
      for (File file : files) {
        if (isPollCarefully()) {
          // wait until file is at least pollInterval ms old,
          // in case it is still being written
          long age = new Date().getTime() - file.lastModified();
          if (age <= pollInterval) {
            continue;
          }
        }

        // send file to listeners
        notifyListeners(file);
        // move file to storage
        moveToDirectory(file, getStorageDirectory());

      }
    }
  }

  /**
   * Move a file from polldir to storage directory. Attempts to move file into
   * storage directory. The file is not moved if no storage directory was
   * specified, or if the file no longer exists.
   *
   * @param file file to move.
   */
  private void moveToDirectory(final File file, final File directory) {
    if (!file.exists()) {
      // was already removed, done
      return;
    }
    if (!directory.exists()) {
      directory.mkdirs();
    }

    // build a filename that doesn't exist
    String fileName = file.getName();
    File storageFile = new File(directory, fileName);
    if (storageFile.exists()) {
      fileName = new Date().getTime() + "_" + fileName;
      storageFile = new File(directory, fileName);
    }
    // rename file
    file.renameTo(storageFile);
  }

  /** @return pollDirectory file */
  public File getPollDirectory() {
    return this.pollDirectory;
  }

  /** @return storageDirectory file */
  public File getStorageDirectory() {
    return this.storageDirectory;
  }

  public void setStorageDirectory(File storageDirectory) {
    this.storageDirectory = storageDirectory;
  }

  /** @return errorDirectory */
  public File getErrorDirectory() {
    return this.errorDirectory;
  }

  /** @param errorDirectory File to send */
  public void setErrorDirectory(File errorDirectory) {
    this.errorDirectory = errorDirectory;
  }

  /** @return pollInterval long */
  public Long getPollInterval() {
    return pollInterval;
  }

  /** @param pollInterval long to set */
  public void setPollInterval(long pollInterval) {
    this.pollInterval = pollInterval;
  }

  /** @return pollCarefully boolean */
  public Boolean isPollCarefully() {
    return pollCarefully;
  }

  /** @param pollCarefully boolean to set */
  public void setPollCarefully(boolean pollCarefully) {
    this.pollCarefully = pollCarefully;
  }
}