DirectoryPoller.java
/*
* DirectoryPoller
*
* $Id$
* $HeadURL$
*/
package gov.usgs.util;
import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Logger;
/**
* Monitor a directory for files, notifying FileListenerInterfaces.
*
* Implementers of the FileListenerInterface should process files before
* returning, because these files may move or disappear.
*/
public class DirectoryPoller extends DefaultConfigurable implements Poller {
private static final Logger LOGGER = Logger.getLogger(DirectoryPoller.class.getName());
/** Property for pollCarefully */
public static final String POLL_CAREFULLY_PROPERTY = "pollCarefully";
/** Default status of POLL_CAREFULLY */
public static final String DEFAULT_POLL_CAREFULLY = "false";
/** Property for pollDirectory */
public static final String POLL_DIRECTORY_PROPERTY = "pollDirectory";
/** Default pollDirectory */
public static final String DEFAULT_POLL_DIRECTORY = "pollDirectory";
/** Property for errorDirectory */
public static final String ERROR_DIRECTORY_PROPERTY = "errorDirectory";
/** Default errorDirectory */
public static final String DEFAULT_ERROR_DIRECTORY = "errorDirectory";
/** Property for storageDirectory */
public static final String STORAGE_DIRECTORY_PROPERTY = "oldInputDirectory";
/** Default storageDirectory */
public static final String DEFAULT_STORAGE_DIRECTORY = "oldInputDirectory";
/** Poll interval property */
public static final String POLLINTERVAL_PROPERTY = "interval";
/** Default interval for POLLINTERVAL */
public static final String DEFAULT_POLLINTERVAL = "1000";
/** Polling interval */
private Long pollInterval;
/** Carful polling enabled */
private Boolean pollCarefully;
/** Directory to watch. */
private File pollDirectory;
/** Directory to store files in. */
private File storageDirectory;
/** Directory to store files that throw exceptions in. */
private File errorDirectory;
/** Timer schedules polling frequency. */
private Timer timer;
/** Notification of files. */
private List<FileListenerInterface> listeners = new LinkedList<FileListenerInterface>();
/**
* Create a DirectoryPoller.
*
*/
public DirectoryPoller() {
}
@Override
public void configure(Config config) throws Exception {
this.pollDirectory = new File(config.getProperty(POLL_DIRECTORY_PROPERTY, DEFAULT_POLL_DIRECTORY));
LOGGER.config("Using poll directory " + this.pollDirectory.getCanonicalPath());
this.pollInterval = Long.valueOf(config.getProperty(POLLINTERVAL_PROPERTY, DEFAULT_POLLINTERVAL));
LOGGER.config("Using poll interval " + pollInterval + "ms");
this.pollCarefully = Boolean.valueOf(config.getProperty(POLL_CAREFULLY_PROPERTY, DEFAULT_POLL_CAREFULLY));
LOGGER.config("Poll carefully = " + pollCarefully);
this.storageDirectory = new File(config.getProperty(STORAGE_DIRECTORY_PROPERTY, DEFAULT_STORAGE_DIRECTORY));
LOGGER.config("Using oldinput directory " + storageDirectory.getCanonicalPath());
this.errorDirectory = new File(config.getProperty(ERROR_DIRECTORY_PROPERTY, DEFAULT_ERROR_DIRECTORY));
LOGGER.config("Using error directory " + errorDirectory.getCanonicalPath());
}
/** @param listener FileListenerInterface to add */
@Override
public void addFileListener(final FileListenerInterface listener) {
listeners.add(listener);
}
/** @param listener FileListenerInterface to remove */
@Override
public void removeFileListener(final FileListenerInterface listener) {
listeners.remove(listener);
}
/**
* Notify all listeners that files exist and need to be processed.
*
* @param file that needs to be processed
*/
@Override
public void notifyListeners(final File file) {
for (FileListenerInterface listener : listeners) {
try {
listener.onFile(file);
} catch (Exception e) {
LOGGER.warning(
String.format("Error processing file %s, moving to %s", file.getName(), getErrorDirectory().getName()));
moveToDirectory(file, getErrorDirectory());
}
}
}
/**
* Start polling in a background thread.
*
* Any previously scheduled polling is stopped before starting at this
* frequency. This schedules using fixed-delay (time between complete polls) as
* opposed to fixed-rate (how often to start polling).
*
*/
@Override
public void start() {
if (timer != null) {
// already started
stop();
}
if (!this.getPollDirectory().exists()) {
this.getPollDirectory().mkdirs();
}
timer = new Timer();
timer.schedule(new PollTask(), 0L, this.pollInterval);
}
/**
* Stop any currently scheduled polling.
*/
@Override
public void stop() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
/**
* The Polling Task. Notifies all listeners then either deletes or moves the
* file to storage.
*
* @author jmfee
*
*/
protected class PollTask extends TimerTask {
public void run() {
File[] files = pollDirectory.listFiles();
for (File file : files) {
if (isPollCarefully()) {
// wait until file is at least pollInterval ms old,
// in case it is still being written
long age = new Date().getTime() - file.lastModified();
if (age <= pollInterval) {
continue;
}
}
// send file to listeners
notifyListeners(file);
// move file to storage
moveToDirectory(file, getStorageDirectory());
}
}
}
/**
* Move a file from polldir to storage directory. Attempts to move file into
* storage directory. The file is not moved if no storage directory was
* specified, or if the file no longer exists.
*
* @param file file to move.
*/
private void moveToDirectory(final File file, final File directory) {
if (!file.exists()) {
// was already removed, done
return;
}
if (!directory.exists()) {
directory.mkdirs();
}
// build a filename that doesn't exist
String fileName = file.getName();
File storageFile = new File(directory, fileName);
if (storageFile.exists()) {
fileName = new Date().getTime() + "_" + fileName;
storageFile = new File(directory, fileName);
}
// rename file
file.renameTo(storageFile);
}
/** @return pollDirectory file */
public File getPollDirectory() {
return this.pollDirectory;
}
/** @return storageDirectory file */
public File getStorageDirectory() {
return this.storageDirectory;
}
public void setStorageDirectory(File storageDirectory) {
this.storageDirectory = storageDirectory;
}
/** @return errorDirectory */
public File getErrorDirectory() {
return this.errorDirectory;
}
/** @param errorDirectory File to send */
public void setErrorDirectory(File errorDirectory) {
this.errorDirectory = errorDirectory;
}
/** @return pollInterval long */
public Long getPollInterval() {
return pollInterval;
}
/** @param pollInterval long to set */
public void setPollInterval(long pollInterval) {
this.pollInterval = pollInterval;
}
/** @return pollCarefully boolean */
public Boolean isPollCarefully() {
return pollCarefully;
}
/** @param pollCarefully boolean to set */
public void setPollCarefully(boolean pollCarefully) {
this.pollCarefully = pollCarefully;
}
}