HeartbeatListener.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.product.Product;
import gov.usgs.util.Config;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.Iterator;
import java.util.logging.Logger;

import javax.json.Json;
import javax.json.JsonObjectBuilder;

/**
 * Heartbeat Listener stores heartbeat messages and writes them to a heartbeat
 * file when a product is received
 *
 * @author tene
 *
 */
public class HeartbeatListener extends DefaultNotificationListener {

  private static final Logger LOGGER = Logger.getLogger(HeartbeatListener.class.getName());

  /** Storage for heartbeat components */
  private static Map<String, HeartbeatStatus> HASH_HEARTBEATS = new ConcurrentHashMap<>();

  /** Configurable property for heartbeat fully qualified directory+filename. */
  public static final String HEARTBEAT_FILENAME_PROPERTY = "heartbeatFilename";

  /** Default heartbeat directory. */
  public static final String DEFAULT_HEARTBEAT_FILENAME = "heartbeat.dat";

  /** Default timeout for HeartbeatStatus key/value pairs. Zero = disabled */
  public static final String DEFAULT_STORAGE_TIMEOUT = "0";

  /**
   * Default schedule interval for HeartbeatStatus key/value pairs cleanup. 30
   * minutes
   */
  public static final String DEFAULT_CLEANUP_INTERVAL = "1800000";

  /** Configurable property for heartbeat key/value expiration */
  public static final String HEARTBEAT_TIMEOUT_PROPERTY = "heartbeatTimeout";

  /** Flag listeners are listening */
  private static boolean LISTENING = false;

  /** Hearbeat registered file. */
  private File heartbeatFile;

  /** Timeout for expiration of key/value pairs */
  private long storageTimeout;

  /**
   * Create a new HeartbeatListener.
   *
   * Sets up the includeTypes list to contain "heartbeat".
   *
   * @throws Exception if error occurs
   */
  public HeartbeatListener() throws Exception {
    LISTENING = true;
    heartbeatFile = new File(DEFAULT_HEARTBEAT_FILENAME);
    storageTimeout = Long.valueOf(DEFAULT_STORAGE_TIMEOUT);
    this.getIncludeTypes().add("heartbeat");
  }

  /**
   * @return map of component and heartbeat status.
   */
  protected static Map<String, HeartbeatStatus> getHeartbeats() {
    return HASH_HEARTBEATS;
  }

  /**
   * heartbeat onProduct processing writes to heartbeat file
   */
  @Override
  public void onProduct(final Product product) throws Exception {

    // track product
    sendHeartbeatMessage(this.getName(), "lastHeartbeat", product.getId().toString());

    // track current memory usage
    MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
    MemoryUsage heapMemory = memoryMXBean.getHeapMemoryUsage();
    MemoryUsage nonHeapMemory = memoryMXBean.getNonHeapMemoryUsage();

    long heapUsed = heapMemory.getUsed();
    long nonHeapUsed = nonHeapMemory.getUsed();
    long heapCommitted = heapMemory.getCommitted();
    long nonHeapCommitted = nonHeapMemory.getCommitted();
    long heapMax = heapMemory.getMax();
    long nonHeapMax = nonHeapMemory.getMax();
    long totalUsed = heapUsed + nonHeapUsed;
    long totalCommitted = heapCommitted + nonHeapCommitted;
    long totalMax = heapMax + nonHeapMax;

    sendHeartbeatMessage(this.getName(), "totalUsed", Long.toString(totalUsed));
    sendHeartbeatMessage(this.getName(), "totalCommitted", Long.toString(totalCommitted));
    sendHeartbeatMessage(this.getName(), "totalMax", Long.toString(totalMax));

    // write heartbeat information to file
    this.writeHeartbeat();

  }

  /**
   * Send heartbeat data to heartbeat listener
   *
   * @param component String component
   * @param key       Heartbeat key
   * @param value     Heartbeat value
   */
  public static void sendHeartbeatMessage(final String component, final String key, final String value) {

    if (!LISTENING) {
      return;
    }
    HeartbeatStatus objHeartbeat;

    String heartbeatKey = component;
    if (heartbeatKey == null) {
      heartbeatKey = "<null>";
    }

    // register this heartbeat in temporary storage
    if (HASH_HEARTBEATS.containsKey(heartbeatKey)) {
      objHeartbeat = HASH_HEARTBEATS.get(heartbeatKey);
    } else {
      objHeartbeat = new HeartbeatStatus();
      HASH_HEARTBEATS.put(heartbeatKey, objHeartbeat);
    }

    // store the heartbeat key/value in temporary storage
    objHeartbeat.updateStatus(key, value);
  }

  /**
   * Write heartbeat data for all components to the heartbeat file
   *
   * @return true
   * @throws IOException if IO error occurs
   */
  public boolean writeHeartbeat() throws IOException {
    String tempFileName = heartbeatFile.getName() + "-temp";
    File tempFile = new File(tempFileName);

    gov.usgs.util.FileUtils.writeFileThenMove(tempFile, heartbeatFile, this.formatHeartbeatOutput().getBytes());

    return true;
  }

  /**
   * Self-configure HeartbeatListener object
   */
  @Override
  public void configure(Config config) throws Exception {
    // let default notification listener configure itself
    super.configure(config);

    heartbeatFile = new File(config.getProperty(HEARTBEAT_FILENAME_PROPERTY, DEFAULT_HEARTBEAT_FILENAME));
    LOGGER.config("[" + getName() + "] heartbeat file = " + heartbeatFile.getCanonicalPath());

    storageTimeout = Long.valueOf(config.getProperty(HEARTBEAT_TIMEOUT_PROPERTY, DEFAULT_STORAGE_TIMEOUT));
    LOGGER.config("[" + getName() + "] heartbeat timeout = " + storageTimeout + "ms");
  }

  /**
   * @return JSON-formatted output from the map of components and their values
   */
  public String formatHeartbeatOutput() {
    JsonObjectBuilder builder = Json.createObjectBuilder();
    for (String key : HASH_HEARTBEATS.keySet()) {
      HeartbeatStatus status = HASH_HEARTBEATS.get(key);
      builder.add(key, status == null ? null : status.toJsonObject());
    }
    return builder.build().toString();
  }

  /**
   * purge heartbeat key/values older than storageTimeout, also purging empty
   * heartbeats
   */
  @Override
  public void cleanup() throws Exception {
    super.cleanup();

    if (this.storageTimeout == 0) {
      return;
    }

    Map<String, HeartbeatStatus> hashHeartbeats = getHeartbeats();
    HeartbeatStatus objHeartbeat;
    String component;
    Iterator<String> itComponents = hashHeartbeats.keySet().iterator();
    Date purgeDate = new Date(new Date().getTime() - this.storageTimeout);

    // iterate through map of components
    while (itComponents.hasNext()) {
      component = itComponents.next();

      // purge old key/value entries within objHeartbeat
      objHeartbeat = hashHeartbeats.get(component);
      objHeartbeat.clearDataOlderThanDate(purgeDate);

      // if purge has left a objHeartheat with no remaining key/value
      // entries,
      // remove the objHeartbeat (i.e. the component)
      if (objHeartbeat.isEmpty()) {
        itComponents.remove();
      }

    } // END while more components

  }

  /** @return heartbeatFile */
  public File getHeartbeatFile() {
    return heartbeatFile;
  }

  /** @param heartbeatFile to set */
  public void setHeartbeatFile(File heartbeatFile) {
    this.heartbeatFile = heartbeatFile;
  }

  /** @return storageTimeout */
  public long getStorageTimeout() {
    return storageTimeout;
  }

  /** @param storageTimeout to set */
  public void setStorageTimeout(long storageTimeout) {
    this.storageTimeout = storageTimeout;
  }

}