AdminSocketServer.java

  1. package gov.usgs.earthquake.distribution;

  2. import gov.usgs.util.DefaultConfigurable;
  3. import gov.usgs.util.SocketAcceptor;
  4. import gov.usgs.util.SocketListenerInterface;
  5. import gov.usgs.util.StreamUtils;

  6. import java.io.BufferedReader;
  7. import java.io.InputStream;
  8. import java.io.InputStreamReader;
  9. import java.io.OutputStream;
  10. import java.net.ServerSocket;
  11. import java.net.Socket;
  12. import java.util.Iterator;
  13. import java.util.Map;
  14. import java.util.concurrent.Executors;
  15. import java.util.logging.Level;
  16. import java.util.logging.Logger;

  17. /**
  18.  * Telnet to this socket to get a "command prompt".
  19.  *
  20.  * @author jmfee
  21.  */
  22. public class AdminSocketServer extends DefaultConfigurable implements SocketListenerInterface {

  23.   private static final Logger LOGGER = Logger.getLogger(AdminSocketServer.class.getName());

  24.   /** Variable for default thread pool size */
  25.   private static final int DEFAULT_THREAD_POOL_SIZE = 10;
  26.   /** Variable for default admin port */
  27.   private static final int DEFAULT_ADMIN_PORT = 11111;

  28.   private int port = -1;
  29.   private int threads = -1;
  30.   private SocketAcceptor acceptor = null;

  31.   /** the client this server is providing stats for. */
  32.   private ProductClient client = null;

  33.   /** Initializes socket with default thread pool size and port */
  34.   public AdminSocketServer() {
  35.     this(DEFAULT_ADMIN_PORT, DEFAULT_THREAD_POOL_SIZE, null);
  36.   }

  37.   /**
  38.    * Initializes socket with custom port, threads, and client
  39.    *
  40.    * @param port    Admind port
  41.    * @param threads Thread pool size
  42.    * @param client  Product Client
  43.    */
  44.   public AdminSocketServer(final int port, final int threads, final ProductClient client) {
  45.     this.port = port;
  46.     this.threads = threads;
  47.     this.client = client;
  48.   }

  49.   public void startup() throws Exception {
  50.     // call DefaultNotificationReceiver startup first
  51.     super.startup();

  52.     ServerSocket socket = new ServerSocket(port);
  53.     socket.setReuseAddress(true);
  54.     acceptor = new SocketAcceptor(socket, this, Executors.newFixedThreadPool(threads));
  55.     // start accepting connections via socket
  56.     acceptor.start();
  57.   }

  58.   public void shutdown() throws Exception {
  59.     // stop accepting connections
  60.     try {
  61.       acceptor.stop();
  62.     } finally {
  63.       // shutdown no matter what
  64.       // call DefaultNotificationReceiver shutdown last
  65.       super.shutdown();
  66.     }
  67.   }

  68.   /**
  69.    * Process a line of input.
  70.    *
  71.    * @param line input
  72.    * @param out  write generated output to stream
  73.    * @throws Exception if misconfigured or the client quits.
  74.    */
  75.   protected void processLine(final String line, final OutputStream out) throws Exception {
  76.     if (client == null) {
  77.       throw new Exception("No product client configured");
  78.     }

  79.     String s = line.trim();
  80.     if (s.equals("status")) {
  81.       out.write(getStatus().getBytes());
  82.     } else if (s.startsWith("reprocess")) {
  83.       out.write(("Reprocess not yet supported").getBytes());
  84.       // reprocess(out, s.replace("reprocess", "").split(" "));
  85.     } else if (s.startsWith("search")) {
  86.       out.write(("Search not yet supported").getBytes());
  87.       // search(out, s.replace("search", "").split(" "));
  88.     } else if (s.equals("quit")) {
  89.       throw new Exception("Bye");
  90.     } else {
  91.       out.write(("Help:\n" + "status - show server status\n" + "SOON search [source=SOURCE] [type=TYPE] [code=CODE]\n"
  92.           + "SOON reprocess listener=LISTENER id=PRODUCTID").getBytes());
  93.     }
  94.   }

  95.   private String getStatus() {
  96.     StringBuffer buf = new StringBuffer();
  97.     // receiver queue status
  98.     Iterator<NotificationReceiver> iter = client.getReceivers().iterator();
  99.     while (iter.hasNext()) {
  100.       NotificationReceiver receiver = iter.next();
  101.       if (receiver instanceof DefaultNotificationReceiver) {
  102.         Map<String, Integer> status = ((DefaultNotificationReceiver) receiver).getQueueStatus();
  103.         if (status != null) {
  104.           Iterator<String> queues = status.keySet().iterator();
  105.           while (queues.hasNext()) {
  106.             String queue = queues.next();
  107.             buf.append(queue).append(" = ").append(status.get(queue)).append("\n");
  108.           }
  109.         }
  110.       }
  111.     }

  112.     String status = buf.toString();
  113.     if (status.equals("")) {
  114.       status = "No queues to show";
  115.     }
  116.     return status;
  117.   }

  118.   public void onSocket(Socket socket) {
  119.     LOGGER.info("[" + getName() + "] accepted connection " + socket.toString());

  120.     InputStream in = null;
  121.     OutputStream out = null;

  122.     try {
  123.       in = socket.getInputStream();
  124.       out = socket.getOutputStream();

  125.       BufferedReader br = new BufferedReader(new InputStreamReader(in));
  126.       String line = null;
  127.       while ((line = br.readLine()) != null) {
  128.         processLine(line, out);
  129.       }
  130.     } catch (Exception ex) {
  131.       LOGGER.log(Level.WARNING, "[" + getName() + "] exception while processing socket", ex);
  132.       // tell sender "exception"
  133.       try {
  134.         out.write(("Error receiving product '" + ex.getMessage() + "'").getBytes());
  135.       } catch (Exception ex2) {
  136.         LOGGER.log(Level.WARNING, "[" + getName() + "] unable to notify sender of exception", ex2);
  137.       }
  138.     } finally {
  139.       StreamUtils.closeStream(in);
  140.       StreamUtils.closeStream(out);

  141.       try {
  142.         socket.shutdownInput();
  143.       } catch (Exception e) {
  144.         // ignore
  145.       }
  146.       try {
  147.         socket.shutdownOutput();
  148.       } catch (Exception e) {
  149.         // ignore
  150.       }

  151.       try {
  152.         socket.close();
  153.       } catch (Exception e) {
  154.         // ignore
  155.       }
  156.     }

  157.     LOGGER.info("[" + getName() + "] closed connection " + socket.toString());
  158.   }

  159.   /** @return port */
  160.   public int getPort() {
  161.     return port;
  162.   }

  163.   /** @param port port number */
  164.   public void setPort(int port) {
  165.     this.port = port;
  166.   }

  167.   /** @return threads */
  168.   public int getThreads() {
  169.     return threads;
  170.   }

  171.   /** @param threads set number of threads */
  172.   public void setThreads(int threads) {
  173.     this.threads = threads;
  174.   }

  175.   /** @return product client */
  176.   public ProductClient getClient() {
  177.     return client;
  178.   }

  179.   /** @param client set product client */
  180.   public void setClient(ProductClient client) {
  181.     this.client = client;
  182.   }

  183. }