SocketAcceptor.java

/*
 * SocketAcceptor
 *
 * $Id$
 * $HeadURL$
 */
package gov.usgs.util;

import java.net.ServerSocket;
import java.net.Socket;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * Accept socket connections from a ServerSocket, and notify a listener using a
 * separate thread.
 *
 * @author jmfee
 *
 */
public class SocketAcceptor implements Runnable {

  /** Track active sockets. */
  private LinkedList<Future<?>> activeSockets = new LinkedList<Future<?>>();

  /** Socket used to accept connections. */
  private ServerSocket listener;

  /** Handler used to process accepted connections. */
  private SocketListenerInterface callback;

  /** Sockets are processed by this executor. */
  private ExecutorService socketExecutor;

  /** Whether to keep accepting connections. */
  private boolean listening = true;

  /**
   * Create a new SocketAcceptor object that uses a single thread executor.
   *
   * @param listener the server socket to accept connections from.
   * @param callback the object that processes accepted connections.
   */
  public SocketAcceptor(final ServerSocket listener, SocketListenerInterface callback) {
    this(listener, callback, Executors.newSingleThreadExecutor());
  }

  /**
   *
   * @param listener the server socket to accept connections from.
   * @param callback the object that processes accepted connections.
   * @param executor the executor used to invoke callback.
   */
  public SocketAcceptor(final ServerSocket listener, SocketListenerInterface callback, ExecutorService executor) {
    this.listener = listener;
    this.callback = callback;
    this.socketExecutor = executor;
  }

  /**
   * Start accepting connections in a background thread.
   */
  public void start() {
    new Thread(this).start();
  }

  /**
   * Stop accepting connections.
   */
  public void stop() {
    // tell accept thread to stop accepting
    listening = false;
    try {
      // close the server socket, will cause exception in blocking accept
      // call
      listener.close();
    } catch (Exception e) {
      // ignore
    }

    // process any queued sockets
    socketExecutor.shutdown();
  }

  /**
   * Accept connections until the shutdown method is called.
   */
  public void run() {
    Socket socket = null;
    while (listening) {
      try {
        socket = listener.accept();
      } catch (Exception e) {
        socket = null;
        if (!listening) {
          // exception was thrown because socket was closed
          break;
        }
        System.err.println("Error accepting connection");
        e.printStackTrace();
      }

      // check if this is a valid socket
      if (socket != null) {
        final Socket threadSocket = socket;
        final SocketListenerInterface threadCallback = callback;

        // schedule processing
        Future<?> socketThread = socketExecutor.submit(new Runnable() {
          public void run() {
            try {
              threadCallback.onSocket(threadSocket);
            } catch (Exception e) {
              System.err.println("SocketListener callback threw exception:");
              e.printStackTrace();
            }
          }
        });
        // track this to see if a socket is blocking...
        activeSockets.add(socketThread);
      } else {
        System.err.println("Socket is null while accepting connection.");
      }

      // see how many sockets are still in the queue
      // but first remove any completed sockets
      Iterator<Future<?>> iter = activeSockets.iterator();
      while (iter.hasNext()) {
        Future<?> next = iter.next();
        try {
          if ((next.get(0, TimeUnit.MILLISECONDS)) == null) {
            iter.remove();
          }
        } catch (Exception e) {
          // ignore
        }
      }
      System.err.println("There are " + activeSockets.size() + " active/queued socket connections");
    }
  }

}