SearchSocket.java

package gov.usgs.earthquake.indexer;

import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.util.StreamUtils;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

/**
 * Client side of search socket interface.
 */
public class SearchSocket {

  /** The remote host to connect. */
  private final InetAddress host;

  /** The remote port to connect. */
  private final int port;

  /**
   * Construct a new SearchSocket.
   *
   * @param host the remote host.
   * @param port the remote port.
   */
  public SearchSocket(final InetAddress host, final int port) {
    this.host = host;
    this.port = port;
  }

  /**
   * Send a search request, converting the response to a java object.
   *
   * @param request the request to send.
   * @param storage where received products are stored.
   * @return the response.
   * @throws Exception if error occurs
   */
  public SearchResponse search(final SearchRequest request, final FileProductStorage storage) throws Exception {
    final PipedInputStream pipedIn = new PipedInputStream();
    final PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);

    // parse response in background, while searching
    ResponseParserThread thread = new ResponseParserThread(pipedIn, storage);
    thread.start();

    // start search, sending response xml to response parser thread
    search(request, pipedOut);

    // wait for parsing to complete
    thread.join();

    // either return parsed object, or raise parse exception
    if (thread.getSearchResponse() != null) {
      return thread.getSearchResponse();
    } else {
      throw thread.getParseError();
    }
  }

  /**
   * Send a search request, writing the response to an outputstream.
   *
   * @param request     the request to send.
   * @param responseOut the outputstream to write.
   * @throws Exception if error occurs
   */
  public void search(final SearchRequest request, final OutputStream responseOut) throws Exception {
    Socket socket = null;
    DeflaterOutputStream out = null;
    InputStream in = null;

    try {
      // connect to the configured endpoint
      socket = new Socket(host, port);

      // send the request as compressed xml
      out = new DeflaterOutputStream(new BufferedOutputStream(socket.getOutputStream()));
      SearchXML.toXML(request, new StreamUtils.UnclosableOutputStream(out));

      // must finish and flush to complete Deflater stream
      out.finish();
      out.flush();

      // now read response
      in = new InflaterInputStream(new BufferedInputStream(socket.getInputStream()));
      StreamUtils.transferStream(in, responseOut);
    } finally {
      // make sure socket is closed
      try {
        socket.close();
      } catch (Exception e) {
        // ignore
      }
    }
  }

  /**
   * Thread used for parsing search response in background.
   */
  private static class ResponseParserThread extends Thread {

    /** Input stream being parsed. */
    private InputStream in = null;

    /** Storage where received products are stored. */
    private FileProductStorage storage = null;

    /** The parsed search response. */
    private SearchResponse searchResponse = null;

    /** Parse error, if one happened. */
    private Exception parseError = null;

    public ResponseParserThread(final InputStream in, final FileProductStorage storage) {
      this.in = in;
      this.storage = storage;
    }

    public void run() {
      try {
        searchResponse = SearchXML.parseResponse(in, storage);
      } catch (Exception e) {
        searchResponse = null;
        parseError = e;
      }
    }

    public SearchResponse getSearchResponse() {
      return searchResponse;
    }

    public Exception getParseError() {
      return parseError;
    }
  }

}