SocketProductReceiverHandler.java

package gov.usgs.earthquake.distribution;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;

import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.BinaryIO;
import gov.usgs.earthquake.product.io.IOUtil;
import gov.usgs.earthquake.util.SizeLimitInputStream;
import gov.usgs.util.ObjectLock;
import gov.usgs.util.StreamUtils;

/**
 * Handles incoming socket including lock/unlock on storage
 *
 * @see SocketProductReceiver
 *
 */
public class SocketProductReceiverHandler implements Runnable {

  private static final Logger LOGGER = Logger.getLogger(SocketProductReceiverHandler.class.getName());

  /** buffer for PDL protocol. Set to 1024 */
  public static final int PDL_PROTOCOL_BUFFER = 1024;

  /** Protected Variable for BinaryIO */
  protected final BinaryIO io = new BinaryIO();
  /** Protected Variable for SocketProductReceiver */
  protected final SocketProductReceiver receiver;
  /** Protected Variable for Socket */
  protected final Socket socket;
  /** Protected Variable for a string of protocolVersion */
  protected String protocolVersion;

  /**
   * Constructor
   *
   * @param receiver SocketProductReceiver
   * @param socket   Socket
   */
  public SocketProductReceiverHandler(final SocketProductReceiver receiver, final Socket socket) {
    this.receiver = receiver;
    this.socket = socket;
  }

  /**
   * Acquire write lock in receiver storage.
   *
   * @param id product to lock.
   */
  public void acquireWriteLock(final ProductId id) {
    try {
      ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage()).getStorageLocks();
      storageLocks.acquireWriteLock(id);
    } catch (Exception e) {
      // ignore
    }
  }

  /**
   * Release write lock in receiver storeage.
   *
   * @param id product to unlock.
   */
  public void releaseWriteLock(final ProductId id) {
    if (id == null) {
      return;
    }

    try {
      ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage()).getStorageLocks();
      storageLocks.releaseWriteLock(id);
    } catch (Exception e) {
      // ignore
    }
  }

  /**
   * Read PDL protocol version.
   *
   * @param in input stream to read
   * @return version, or null if not the PDL protocol.
   *
   * @throws IOException if IO error occurs
   */
  public String readProtocolVersion(final InputStream in) throws IOException {
    String version = null;
    if (in.read() == 'P' && in.read() == 'D' && in.read() == 'L') {
      try {
        version = io.readString(in, PDL_PROTOCOL_BUFFER);
      } catch (IOException e) {
        if (e.getMessage().contains("maxLength")) {
          throw new IOException("bad protocol version");
        } else {
          throw e;
        }
      }
    }
    return version;
  }

  /**
   * Process incoming socket connection.
   */
  @Override
  public void run() {
    BufferedInputStream in = null;
    InputStream productIn = null;
    OutputStream out = null;
    ProductId productId = null;

    try {
      socket.setSoTimeout(receiver.getReadTimeout());
      in = new BufferedInputStream(socket.getInputStream());
      out = socket.getOutputStream();

      in.mark(PDL_PROTOCOL_BUFFER);

      protocolVersion = readProtocolVersion(in);
      if (protocolVersion == null) {
        LOGGER.fine("[" + receiver.getName() + "] not using PDL protocol " + socket);
        in.reset();
      } else {
        LOGGER.fine("[" + receiver.getName() + "] protocol version '" + protocolVersion + "' " + socket);

        // got a version, see if it's supported
        if (SocketProductSender.PROTOCOL_VERSION_0_1.equals(protocolVersion)) {
          // product id is only message
          String productIdString;
          try {
            productIdString = io.readString(in, 1024);
          } catch (IOException e) {
            if (e.getMessage().contains("maxLength")) {
              throw new IOException("version too long");
            } else {
              throw e;
            }
          }
          productId = ProductId.parse(productIdString);

          acquireWriteLock(productId);
          if (receiver.getProductStorage().hasProduct(productId)) {
            // have product, don't send
            sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
            return;
          } else {
            // don't have product
            sendString(out, SocketProductSender.UNKNOWN_PRODUCT);
            out.flush();
          }
        } else {
          throw new IOException("unsupported protocol version");
        }
      }

      // read product
      productIn = in;
      if (receiver.getSizeLimit() > 0) {
        productIn = new SizeLimitInputStream(in, receiver.getSizeLimit());
      }
      String status = receiver
          .storeAndNotify(IOUtil.autoDetectProductSource(new StreamUtils.UnclosableInputStream(productIn)));
      LOGGER.info(status + " from " + socket.toString());

      try {
        // tell sender "success"
        if (protocolVersion == null) {
          out.write(status.getBytes());
        } else {
          io.writeString(status, out);
        }
        out.flush();
      } catch (Exception ex) {
        LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] unable to notify sender of success", ex);
      }
    } catch (Exception ex) {
      sendException(out, ex);
    } finally {
      releaseWriteLock(productId);

      StreamUtils.closeStream(in);
      StreamUtils.closeStream(out);
    }
  }

  /**
   * Send an exception to the user.
   *
   * @param out output stream where exception message is written
   * @param e   exception to write
   */
  public void sendException(final OutputStream out, final Exception e) {
    try {
      if (e instanceof ProductAlreadyInStorageException || e.getCause() instanceof ProductAlreadyInStorageException) {
        LOGGER.info("[" + receiver.getName() + "] product from " + socket.toString() + " already in storage");
        sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
      } else {
        // tell sender "exception"
        LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] exception while processing socket", e);
        sendString(out, SocketProductSender.RECEIVE_ERROR + " '" + e.getMessage() + "'");
      }
    } catch (Exception e2) {
      // ignore
    }
  }

  /**
   * Send a string to the user.
   *
   * @param out output stream where exception message is written
   * @param str string to write
   * @throws IOException if IO error occurs
   */
  public void sendString(final OutputStream out, final String str) throws IOException {
    try {
      if (protocolVersion == null) {
        out.write(str.getBytes());
      } else {
        io.writeString(str, out);
      }
      out.flush();
    } catch (IOException e) {
      LOGGER.log(Level.WARNING,
          "[" + receiver.getName() + "] unable to send message '" + str + "' to " + socket.toString(), e);
      throw e;
    }
  }

}