SearchSocket.java
package gov.usgs.earthquake.indexer;
import gov.usgs.earthquake.distribution.FileProductStorage;
import gov.usgs.util.StreamUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
/**
* Client side of search socket interface.
*/
public class SearchSocket {
/** The remote host to connect. */
private final InetAddress host;
/** The remote port to connect. */
private final int port;
/**
* Construct a new SearchSocket.
*
* @param host the remote host.
* @param port the remote port.
*/
public SearchSocket(final InetAddress host, final int port) {
this.host = host;
this.port = port;
}
/**
* Send a search request, converting the response to a java object.
*
* @param request the request to send.
* @param storage where received products are stored.
* @return the response.
* @throws Exception if error occurs
*/
public SearchResponse search(final SearchRequest request, final FileProductStorage storage) throws Exception {
final PipedInputStream pipedIn = new PipedInputStream();
final PipedOutputStream pipedOut = new PipedOutputStream(pipedIn);
// parse response in background, while searching
ResponseParserThread thread = new ResponseParserThread(pipedIn, storage);
thread.start();
// start search, sending response xml to response parser thread
search(request, pipedOut);
// wait for parsing to complete
thread.join();
// either return parsed object, or raise parse exception
if (thread.getSearchResponse() != null) {
return thread.getSearchResponse();
} else {
throw thread.getParseError();
}
}
/**
* Send a search request, writing the response to an outputstream.
*
* @param request the request to send.
* @param responseOut the outputstream to write.
* @throws Exception if error occurs
*/
public void search(final SearchRequest request, final OutputStream responseOut) throws Exception {
Socket socket = null;
DeflaterOutputStream out = null;
InputStream in = null;
try {
// connect to the configured endpoint
socket = new Socket(host, port);
// send the request as compressed xml
out = new DeflaterOutputStream(new BufferedOutputStream(socket.getOutputStream()));
SearchXML.toXML(request, new StreamUtils.UnclosableOutputStream(out));
// must finish and flush to complete Deflater stream
out.finish();
out.flush();
// now read response
in = new InflaterInputStream(new BufferedInputStream(socket.getInputStream()));
StreamUtils.transferStream(in, responseOut);
} finally {
// make sure socket is closed
try {
socket.close();
} catch (Exception e) {
// ignore
}
}
}
/**
* Thread used for parsing search response in background.
*/
private static class ResponseParserThread extends Thread {
/** Input stream being parsed. */
private InputStream in = null;
/** Storage where received products are stored. */
private FileProductStorage storage = null;
/** The parsed search response. */
private SearchResponse searchResponse = null;
/** Parse error, if one happened. */
private Exception parseError = null;
public ResponseParserThread(final InputStream in, final FileProductStorage storage) {
this.in = in;
this.storage = storage;
}
public void run() {
try {
searchResponse = SearchXML.parseResponse(in, storage);
} catch (Exception e) {
searchResponse = null;
parseError = e;
}
}
public SearchResponse getSearchResponse() {
return searchResponse;
}
public Exception getParseError() {
return parseError;
}
}
}