SearchServerSocket.java

package gov.usgs.earthquake.indexer;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
import gov.usgs.util.StreamUtils;

/**
 * Server side of socket search interface.
 */
public class SearchServerSocket extends DefaultConfigurable implements SocketListenerInterface {

  /** Logging object. */
  private static final Logger LOGGER = Logger.getLogger(SearchServerSocket.class.getName());

  /** The configuration property used for listen port. */
  public static final String SEARCH_PORT_PROPERTY = "port";

  /** The default listen port, as a string. */
  public static final String DEFAULT_SEARCH_PORT = "11236";

  /** The configuration property used for listen thread count. */
  public static final String THREAD_POOL_SIZE_PROPERTY = "threads";

  /** The default number of threads, as a string. */
  public static final String DEFAULT_THREAD_POOL_SIZE = "10";

  /** The configuration property used to reference a ProductIndex. */
  public static final String PRODUCT_INDEXER_PROPERTY = "indexer";

  /** The configuration property used to reference a URLProductStorage. */
  public static final String PRODUCT_STORAGE_PROPERTY = "storage";

  /** The port to bind. */
  private int port = -1;

  /** The number of threads to use. */
  private int threads = -1;

  /** The server socket accept thread. */
  private SocketAcceptor acceptor;

  /** The indexer that will be searched. */
  private Indexer indexer;

  /**
   * Construct a new SearchServerSocket using defaults.
   */
  public SearchServerSocket() {
    this.port = Integer.parseInt(DEFAULT_SEARCH_PORT);
    this.threads = Integer.parseInt(DEFAULT_THREAD_POOL_SIZE);
  }

  /**
   * Method to perform search.
   *
   * Calls Indexer.search(SearchRequest). Simplifies testing.
   *
   * @param request the search to execute.
   * @return the search response.
   * @throws Exception if error occurs
   */
  protected SearchResponse search(final SearchRequest request) throws Exception {
    return indexer.search(request);
  }

  /**
   * This method is called each time a SearchSocket connects.
   */
  @Override
  public void onSocket(Socket socket) {
    LOGGER.info("[" + getName() + "] accepted search connection " + socket.toString());

    InputStream in = null;
    DeflaterOutputStream out = null;

    try {
      in = socket.getInputStream();
      in = new InflaterInputStream(new BufferedInputStream(new StreamUtils.UnclosableInputStream(in)));
      // read request
      SearchRequest request = SearchXML.parseRequest(new StreamUtils.UnclosableInputStream(in));

      // do search
      SearchResponse response = this.search(request);

      // send response
      out = new DeflaterOutputStream(new BufferedOutputStream(socket.getOutputStream()));
      SearchXML.toXML(response, new StreamUtils.UnclosableOutputStream(out));

      // finish compression
      out.finish();
      out.flush();
    } catch (Exception ex) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing search", ex);
    } finally {
      StreamUtils.closeStream(in);
      StreamUtils.closeStream(out);

      try {
        socket.shutdownInput();
      } catch (Exception e) {
        // ignore
      }
      try {
        socket.shutdownOutput();
      } catch (Exception e) {
        // ignore
      }

      try {
        socket.close();
      } catch (Exception e) {
        // ignore
      }
    }

    LOGGER.info("[" + getName() + "] closed search connection " + socket.toString());
  }

  @Override
  public void configure(Config config) throws Exception {
    port = Integer.parseInt(config.getProperty(SEARCH_PORT_PROPERTY, DEFAULT_SEARCH_PORT));
    LOGGER.config("[" + getName() + "] search port is " + port);

    threads = Integer.parseInt(config.getProperty(THREAD_POOL_SIZE_PROPERTY, DEFAULT_THREAD_POOL_SIZE));
    LOGGER.config("[" + getName() + "] number of threads is " + threads);

    String indexerName = config.getProperty(PRODUCT_INDEXER_PROPERTY);
    if (indexerName == null) {
      throw new ConfigurationException(
          "[" + getName() + "] '" + PRODUCT_INDEXER_PROPERTY + "' is a required configuration property");
    }
    LOGGER.config("[" + getName() + "] loading indexer '" + indexerName + "'");
    indexer = (Indexer) Config.getConfig().getObject(indexerName);
    if (indexer == null) {
      throw new ConfigurationException("[" + getName() + "] indexer '" + indexerName + "' is not configured properly");
    }
  }

  @Override
  public void shutdown() throws Exception {
    // stop accepting connections
    acceptor.stop();
    acceptor = null;
  }

  @Override
  public void startup() throws Exception {
    ServerSocket socket = new ServerSocket(port);
    socket.setReuseAddress(true);
    acceptor = new SocketAcceptor(socket, this, Executors.newFixedThreadPool(threads));
    // start accepting connections via socket
    acceptor.start();
  }

  /** @return int port */
  public int getPort() {
    return port;
  }

  /** @param port int to set */
  public void setPort(int port) {
    this.port = port;
  }

  /** @return int threads */
  public int getThreads() {
    return threads;
  }

  /** @param threads into to set */
  public void setThreads(int threads) {
    this.threads = threads;
  }

  /** @return indexer */
  public Indexer getIndexer() {
    return indexer;
  }

  /** @param indexer to set */
  public void setIndex(Indexer indexer) {
    this.indexer = indexer;
  }

}