EmbeddedPDLClient.java

package gov.usgs.earthquake.distribution;

import gov.usgs.earthquake.cube.CubeMessage;
import gov.usgs.earthquake.eids.LegacyConverter;
import gov.usgs.earthquake.eidsutil.EIDSClient;
import gov.usgs.earthquake.product.Product;

import java.io.File;

/**
 * An example of an embedded PDL client.
 *
 * Creates a notification receiver, which store it's information in a specified
 * directory. Listeners can be added to this receiver before its startup()
 * method is called, which starts the distribution process.
 */
public class EmbeddedPDLClient {

  /** name for embedded receiver, appears in log files. */
  public static final String EMBEDDED_NAME = "embeddedPDL";
  /** name for eids tracking file, in data directory. */
  public static final String EMBEDDED_TRACKING_FILE = "receiver_tracking.dat";
  /** name for notification index file, in data directory. */
  public static final String EMBEDDED_INDEX_FILE = "receiver_index.db";
  /** name for receiver storage directory, in data directory. */
  public static final String EMBEDDED_STORAGE_DIRECTORY = "receiver_storage";

  /** The notification receiver. */
  private EIDSNotificationReceiver eidsReceiver;

  /**
   * Construct an embedded PDL client.
   *
   * @param dataDirectory        directory where receiver files are stored.
   * @param serverHost           PDL hub hostname.
   * @param serverPort           PDL hub port.
   * @param alternateServersList comma separated list of "hostname:port" alternate
   *                             pdl hubs.
   * @throws Exception if error occurs
   */
  public EmbeddedPDLClient(final File dataDirectory, final String serverHost, final Integer serverPort,
      final String alternateServersList) throws Exception {
    EIDSClient client = new EIDSClient();
    client.setServerHost(serverHost);
    client.setServerPort(serverPort);
    client.setAlternateServersList(alternateServersList);
    client.setTrackingFileName(new File(dataDirectory, EMBEDDED_TRACKING_FILE).getCanonicalPath());

    eidsReceiver = new EIDSNotificationReceiver();
    eidsReceiver.setName(EMBEDDED_NAME);
    eidsReceiver.setNotificationIndex(
        new JDBCNotificationIndex(new File(dataDirectory, EMBEDDED_INDEX_FILE).getCanonicalPath()));
    eidsReceiver.setProductStorage(new FileProductStorage(new File(dataDirectory, EMBEDDED_STORAGE_DIRECTORY)));
    // default these to 15 minutes
    eidsReceiver.setProductStorageMaxAge(900000L);
    eidsReceiver.setReceiverCleanupInterval(900000L);
    eidsReceiver.setClient(client);
    client.addListener(eidsReceiver);
  }

  /**
   * Get the embedded EIDSNotificationReceiver object for further configuration,
   * adding/removing listeners, and starting/stopping distribution.
   *
   * @return the embedded EIDSNotificationReceiver object.
   */
  public EIDSNotificationReceiver getReceiver() {
    return eidsReceiver;
  }

  /**
   * Example main method that uses the EmbeddedPDLClient.
   *
   * @param args not used.
   * @throws Exception if error occurs
   */
  public static void main(final String[] args) throws Exception {
    // client for production hub
    File dataDirectory = new File("embeddedStorage");
    String hostname = "prod01-pdl01.cr.usgs.gov";
    Integer port = 39977;
    String alternateServers = "prod02-pdl01.wr.usgs.gov:39977";

    // create embedded client
    final EmbeddedPDLClient client = new EmbeddedPDLClient(dataDirectory, hostname, port, alternateServers);

    // create a listener that tries to convert messages to cube
    final DefaultNotificationListener listener = new DefaultNotificationListener() {
      // convert a product to a cube message, if possible
      private final LegacyConverter converter = LegacyConverter.cubeConverter();

      @Override
      public void onProduct(final Product product) {
        System.err.println("Processing product " + product.getId().toString());
        try {
          byte[] cubeBytes = converter.convert(product);
          if (cubeBytes != null) {
            CubeMessage cubeMessage = CubeMessage.parse(new String(cubeBytes));
            // CubeMessage instanceof CubeEvent
            // or CubeMessage instanceof CubeDelete
            System.err.println(cubeMessage.toCUBE());
          }
        } catch (Exception e) {
          // ignore
        }
      }
    };
    // only listen for origin messages
    listener.getIncludeTypes().add("origin");
    // name appears in log messages
    listener.setName("embeddedListener");
    // add listener index for more reliable notification across restarts
    listener.setNotificationIndex(
        new JDBCNotificationIndex(new File(dataDirectory, "embedded_listener_index.db").getCanonicalPath()));

    // add listener to receiver
    client.getReceiver().addNotificationListener(listener);

    // start
    listener.startup();
    client.getReceiver().startup();

    Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
        try {
          client.getReceiver().shutdown();
          listener.shutdown();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    });
  }
}