SocketProductReceiverHandler.java

  1. package gov.usgs.earthquake.distribution;

  2. import java.io.BufferedInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import java.net.Socket;
  7. import java.util.logging.Level;
  8. import java.util.logging.Logger;

  9. import gov.usgs.earthquake.product.ProductId;
  10. import gov.usgs.earthquake.product.io.BinaryIO;
  11. import gov.usgs.earthquake.product.io.IOUtil;
  12. import gov.usgs.earthquake.util.SizeLimitInputStream;
  13. import gov.usgs.util.ObjectLock;
  14. import gov.usgs.util.StreamUtils;

  15. /**
  16.  * Handles incoming socket including lock/unlock on storage
  17.  *
  18.  * @see SocketProductReceiver
  19.  *
  20.  */
  21. public class SocketProductReceiverHandler implements Runnable {

  22.   private static final Logger LOGGER = Logger.getLogger(SocketProductReceiverHandler.class.getName());

  23.   /** buffer for PDL protocol. Set to 1024 */
  24.   public static final int PDL_PROTOCOL_BUFFER = 1024;

  25.   /** Protected Variable for BinaryIO */
  26.   protected final BinaryIO io = new BinaryIO();
  27.   /** Protected Variable for SocketProductReceiver */
  28.   protected final SocketProductReceiver receiver;
  29.   /** Protected Variable for Socket */
  30.   protected final Socket socket;
  31.   /** Protected Variable for a string of protocolVersion */
  32.   protected String protocolVersion;

  33.   /**
  34.    * Constructor
  35.    *
  36.    * @param receiver SocketProductReceiver
  37.    * @param socket   Socket
  38.    */
  39.   public SocketProductReceiverHandler(final SocketProductReceiver receiver, final Socket socket) {
  40.     this.receiver = receiver;
  41.     this.socket = socket;
  42.   }

  43.   /**
  44.    * Acquire write lock in receiver storage.
  45.    *
  46.    * @param id product to lock.
  47.    */
  48.   public void acquireWriteLock(final ProductId id) {
  49.     try {
  50.       ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage()).getStorageLocks();
  51.       storageLocks.acquireWriteLock(id);
  52.     } catch (Exception e) {
  53.       // ignore
  54.     }
  55.   }

  56.   /**
  57.    * Release write lock in receiver storeage.
  58.    *
  59.    * @param id product to unlock.
  60.    */
  61.   public void releaseWriteLock(final ProductId id) {
  62.     if (id == null) {
  63.       return;
  64.     }

  65.     try {
  66.       ObjectLock<ProductId> storageLocks = ((FileProductStorage) receiver.getProductStorage()).getStorageLocks();
  67.       storageLocks.releaseWriteLock(id);
  68.     } catch (Exception e) {
  69.       // ignore
  70.     }
  71.   }

  72.   /**
  73.    * Read PDL protocol version.
  74.    *
  75.    * @param in input stream to read
  76.    * @return version, or null if not the PDL protocol.
  77.    *
  78.    * @throws IOException if IO error occurs
  79.    */
  80.   public String readProtocolVersion(final InputStream in) throws IOException {
  81.     String version = null;
  82.     if (in.read() == 'P' && in.read() == 'D' && in.read() == 'L') {
  83.       try {
  84.         version = io.readString(in, PDL_PROTOCOL_BUFFER);
  85.       } catch (IOException e) {
  86.         if (e.getMessage().contains("maxLength")) {
  87.           throw new IOException("bad protocol version");
  88.         } else {
  89.           throw e;
  90.         }
  91.       }
  92.     }
  93.     return version;
  94.   }

  95.   /**
  96.    * Process incoming socket connection.
  97.    */
  98.   @Override
  99.   public void run() {
  100.     BufferedInputStream in = null;
  101.     InputStream productIn = null;
  102.     OutputStream out = null;
  103.     ProductId productId = null;

  104.     try {
  105.       socket.setSoTimeout(receiver.getReadTimeout());
  106.       in = new BufferedInputStream(socket.getInputStream());
  107.       out = socket.getOutputStream();

  108.       in.mark(PDL_PROTOCOL_BUFFER);

  109.       protocolVersion = readProtocolVersion(in);
  110.       if (protocolVersion == null) {
  111.         LOGGER.fine("[" + receiver.getName() + "] not using PDL protocol " + socket);
  112.         in.reset();
  113.       } else {
  114.         LOGGER.fine("[" + receiver.getName() + "] protocol version '" + protocolVersion + "' " + socket);

  115.         // got a version, see if it's supported
  116.         if (SocketProductSender.PROTOCOL_VERSION_0_1.equals(protocolVersion)) {
  117.           // product id is only message
  118.           String productIdString;
  119.           try {
  120.             productIdString = io.readString(in, 1024);
  121.           } catch (IOException e) {
  122.             if (e.getMessage().contains("maxLength")) {
  123.               throw new IOException("version too long");
  124.             } else {
  125.               throw e;
  126.             }
  127.           }
  128.           productId = ProductId.parse(productIdString);

  129.           acquireWriteLock(productId);
  130.           if (receiver.getProductStorage().hasProduct(productId)) {
  131.             // have product, don't send
  132.             sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
  133.             return;
  134.           } else {
  135.             // don't have product
  136.             sendString(out, SocketProductSender.UNKNOWN_PRODUCT);
  137.             out.flush();
  138.           }
  139.         } else {
  140.           throw new IOException("unsupported protocol version");
  141.         }
  142.       }

  143.       // read product
  144.       productIn = in;
  145.       if (receiver.getSizeLimit() > 0) {
  146.         productIn = new SizeLimitInputStream(in, receiver.getSizeLimit());
  147.       }
  148.       String status = receiver
  149.           .storeAndNotify(IOUtil.autoDetectProductSource(new StreamUtils.UnclosableInputStream(productIn)));
  150.       LOGGER.info(status + " from " + socket.toString());

  151.       try {
  152.         // tell sender "success"
  153.         if (protocolVersion == null) {
  154.           out.write(status.getBytes());
  155.         } else {
  156.           io.writeString(status, out);
  157.         }
  158.         out.flush();
  159.       } catch (Exception ex) {
  160.         LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] unable to notify sender of success", ex);
  161.       }
  162.     } catch (Exception ex) {
  163.       sendException(out, ex);
  164.     } finally {
  165.       releaseWriteLock(productId);

  166.       StreamUtils.closeStream(in);
  167.       StreamUtils.closeStream(out);
  168.     }
  169.   }

  170.   /**
  171.    * Send an exception to the user.
  172.    *
  173.    * @param out output stream where exception message is written
  174.    * @param e   exception to write
  175.    */
  176.   public void sendException(final OutputStream out, final Exception e) {
  177.     try {
  178.       if (e instanceof ProductAlreadyInStorageException || e.getCause() instanceof ProductAlreadyInStorageException) {
  179.         LOGGER.info("[" + receiver.getName() + "] product from " + socket.toString() + " already in storage");
  180.         sendString(out, SocketProductSender.ALREADY_HAVE_PRODUCT);
  181.       } else {
  182.         // tell sender "exception"
  183.         LOGGER.log(Level.WARNING, "[" + receiver.getName() + "] exception while processing socket", e);
  184.         sendString(out, SocketProductSender.RECEIVE_ERROR + " '" + e.getMessage() + "'");
  185.       }
  186.     } catch (Exception e2) {
  187.       // ignore
  188.     }
  189.   }

  190.   /**
  191.    * Send a string to the user.
  192.    *
  193.    * @param out output stream where exception message is written
  194.    * @param str string to write
  195.    * @throws IOException if IO error occurs
  196.    */
  197.   public void sendString(final OutputStream out, final String str) throws IOException {
  198.     try {
  199.       if (protocolVersion == null) {
  200.         out.write(str.getBytes());
  201.       } else {
  202.         io.writeString(str, out);
  203.       }
  204.       out.flush();
  205.     } catch (IOException e) {
  206.       LOGGER.log(Level.WARNING,
  207.           "[" + receiver.getName() + "] unable to send message '" + str + "' to " + socket.toString(), e);
  208.       throw e;
  209.     }
  210.   }

  211. }