SnsPublisher.java

package gov.usgs.earthquake.aws;

import java.util.Objects;
import java.util.logging.Logger;

import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.EventPublisher;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.awssdk.services.sns.model.SnsException;

public class SnsPublisher extends DefaultConfigurable implements EventPublisher {
  /** Logger to use for this class */
  public static final Logger LOGGER = Logger.getLogger(AwsSecretResolver.class.getName());
  public static final String REGION_PROPERTY = "region";
  public static final String TOPIC_ARN_PROPERTY = "topicArn";

  private SnsClient snsClient;
  private String topicArn;

  /**
   * Configurable SNS publisher.
   *
   * @param region   AWS region where the SNS topic is.
   * @param topicArn The ARN of the target SNS topic.
   */

  public SnsPublisher(Region region, String topicArn,
      SnsClient snsClient) {
    this.snsClient = snsClient;
    this.topicArn = topicArn;
  }

  @Override
  public void configure(Config config) throws Exception {
    final String region = config.getProperty(REGION_PROPERTY);
    if (Objects.isNull(region)) {
      throw new ConfigurationException(
          "[" + getName() + "] " + REGION_PROPERTY + " is required");
    }

    final String topicArn = config.getProperty(TOPIC_ARN_PROPERTY);
    if (Objects.isNull(topicArn)) {
      throw new ConfigurationException(
          "[" + getName() + "] " + TOPIC_ARN_PROPERTY + " is required");
    }
    this.topicArn = topicArn;
    this.snsClient = SnsClient.builder().region(Region.of(region)).build();
    LOGGER.config(() -> String.format("[%s] region=%s", getName(), region));
    LOGGER.config(() -> String.format("[%s] TOPICARN=%s", getName(), topicArn));
  }

  /**
   * Publishes a message to the configured SNS topic. Alerts on failure by
   * printing an error message.
   *
   * @param message The message to publish.
   */
  @Override
  public void publishMessage(String message) {
    try {
      PublishRequest request = PublishRequest.builder()
          .message(message)
          .topicArn(topicArn)
          .build();

      PublishResponse result = snsClient.publish(request);
      LOGGER.info(() -> String.format("Message published successfully. Message ID: " + result.messageId()));
    } catch (SnsException e) {
      LOGGER.warning(() -> String
          .format("Error publishing message to SNS topic " + topicArn + ": " + e.awsErrorDetails().errorMessage()));
    }
  }

  /**
   * Closes the SNS client.
   */
  public void shutdown() {
    snsClient.close();
  }
}