AdminSocketServer.java

package gov.usgs.earthquake.distribution;

import gov.usgs.util.DefaultConfigurable;
import gov.usgs.util.SocketAcceptor;
import gov.usgs.util.SocketListenerInterface;
import gov.usgs.util.StreamUtils;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Telnet to this socket to get a "command prompt".
 *
 * @author jmfee
 */
public class AdminSocketServer extends DefaultConfigurable implements SocketListenerInterface {

  private static final Logger LOGGER = Logger.getLogger(AdminSocketServer.class.getName());

  /** Variable for default thread pool size */
  private static final int DEFAULT_THREAD_POOL_SIZE = 10;
  /** Variable for default admin port */
  private static final int DEFAULT_ADMIN_PORT = 11111;

  private int port = -1;
  private int threads = -1;
  private SocketAcceptor acceptor = null;

  /** the client this server is providing stats for. */
  private ProductClient client = null;

  /** Initializes socket with default thread pool size and port */
  public AdminSocketServer() {
    this(DEFAULT_ADMIN_PORT, DEFAULT_THREAD_POOL_SIZE, null);
  }

  /**
   * Initializes socket with custom port, threads, and client
   *
   * @param port    Admind port
   * @param threads Thread pool size
   * @param client  Product Client
   */
  public AdminSocketServer(final int port, final int threads, final ProductClient client) {
    this.port = port;
    this.threads = threads;
    this.client = client;
  }

  public void startup() throws Exception {
    // call DefaultNotificationReceiver startup first
    super.startup();

    ServerSocket socket = new ServerSocket(port);
    socket.setReuseAddress(true);
    acceptor = new SocketAcceptor(socket, this, Executors.newFixedThreadPool(threads));
    // start accepting connections via socket
    acceptor.start();
  }

  public void shutdown() throws Exception {
    // stop accepting connections
    try {
      acceptor.stop();
    } finally {
      // shutdown no matter what
      // call DefaultNotificationReceiver shutdown last
      super.shutdown();
    }
  }

  /**
   * Process a line of input.
   *
   * @param line input
   * @param out  write generated output to stream
   * @throws Exception if misconfigured or the client quits.
   */
  protected void processLine(final String line, final OutputStream out) throws Exception {
    if (client == null) {
      throw new Exception("No product client configured");
    }

    String s = line.trim();
    if (s.equals("status")) {
      out.write(getStatus().getBytes());
    } else if (s.startsWith("reprocess")) {
      out.write(("Reprocess not yet supported").getBytes());
      // reprocess(out, s.replace("reprocess", "").split(" "));
    } else if (s.startsWith("search")) {
      out.write(("Search not yet supported").getBytes());
      // search(out, s.replace("search", "").split(" "));
    } else if (s.equals("quit")) {
      throw new Exception("Bye");
    } else {
      out.write(("Help:\n" + "status - show server status\n" + "SOON search [source=SOURCE] [type=TYPE] [code=CODE]\n"
          + "SOON reprocess listener=LISTENER id=PRODUCTID").getBytes());
    }
  }

  private String getStatus() {
    StringBuffer buf = new StringBuffer();
    // receiver queue status
    Iterator<NotificationReceiver> iter = client.getReceivers().iterator();
    while (iter.hasNext()) {
      NotificationReceiver receiver = iter.next();
      if (receiver instanceof DefaultNotificationReceiver) {
        Map<String, Integer> status = ((DefaultNotificationReceiver) receiver).getQueueStatus();
        if (status != null) {
          Iterator<String> queues = status.keySet().iterator();
          while (queues.hasNext()) {
            String queue = queues.next();
            buf.append(queue).append(" = ").append(status.get(queue)).append("\n");
          }
        }
      }
    }

    String status = buf.toString();
    if (status.equals("")) {
      status = "No queues to show";
    }
    return status;
  }

  public void onSocket(Socket socket) {
    LOGGER.info("[" + getName() + "] accepted connection " + socket.toString());

    InputStream in = null;
    OutputStream out = null;

    try {
      in = socket.getInputStream();
      out = socket.getOutputStream();

      BufferedReader br = new BufferedReader(new InputStreamReader(in));
      String line = null;
      while ((line = br.readLine()) != null) {
        processLine(line, out);
      }
    } catch (Exception ex) {
      LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing socket", ex);
      // tell sender "exception"
      try {
        out.write(("Error receiving product '" + ex.getMessage() + "'").getBytes());
      } catch (Exception ex2) {
        LOGGER.log(Level.WARNING, "[" + getName() + "] unable to notify sender of exception", ex2);
      }
    } finally {
      StreamUtils.closeStream(in);
      StreamUtils.closeStream(out);

      try {
        socket.shutdownInput();
      } catch (Exception e) {
        // ignore
      }
      try {
        socket.shutdownOutput();
      } catch (Exception e) {
        // ignore
      }

      try {
        socket.close();
      } catch (Exception e) {
        // ignore
      }
    }

    LOGGER.info("[" + getName() + "] closed connection " + socket.toString());
  }

  /** @return port */
  public int getPort() {
    return port;
  }

  /** @param port port number */
  public void setPort(int port) {
    this.port = port;
  }

  /** @return threads */
  public int getThreads() {
    return threads;
  }

  /** @param threads set number of threads */
  public void setThreads(int threads) {
    this.threads = threads;
  }

  /** @return product client */
  public ProductClient getClient() {
    return client;
  }

  /** @param client set product client */
  public void setClient(ProductClient client) {
    this.client = client;
  }

}