SnsPublisher.java
package gov.usgs.earthquake.aws;
import java.util.Objects;
import java.util.logging.Logger;
import gov.usgs.earthquake.distribution.ConfigurationException;
import gov.usgs.earthquake.distribution.EventPublisher;
import gov.usgs.util.Config;
import gov.usgs.util.DefaultConfigurable;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.awssdk.services.sns.model.SnsException;
public class SnsPublisher extends DefaultConfigurable implements EventPublisher {
/** Logger to use for this class */
public static final Logger LOGGER = Logger.getLogger(AwsSecretResolver.class.getName());
public static final String REGION_PROPERTY = "region";
public static final String TOPIC_ARN_PROPERTY = "topicArn";
private SnsClient snsClient;
private String topicArn;
/**
* Configurable SNS publisher.
*
* @param region AWS region where the SNS topic is.
* @param topicArn The ARN of the target SNS topic.
*/
public SnsPublisher(Region region, String topicArn,
SnsClient snsClient) {
this.snsClient = snsClient;
this.topicArn = topicArn;
}
@Override
public void configure(Config config) throws Exception {
final String region = config.getProperty(REGION_PROPERTY);
if (Objects.isNull(region)) {
throw new ConfigurationException(
"[" + getName() + "] " + REGION_PROPERTY + " is required");
}
final String topicArn = config.getProperty(TOPIC_ARN_PROPERTY);
if (Objects.isNull(topicArn)) {
throw new ConfigurationException(
"[" + getName() + "] " + TOPIC_ARN_PROPERTY + " is required");
}
this.topicArn = topicArn;
this.snsClient = SnsClient.builder().region(Region.of(region)).build();
LOGGER.config(() -> String.format("[%s] region=%s", getName(), region));
LOGGER.config(() -> String.format("[%s] TOPICARN=%s", getName(), topicArn));
}
/**
* Publishes a message to the configured SNS topic. Alerts on failure by
* printing an error message.
*
* @param message The message to publish.
*/
@Override
public void publishMessage(String message) {
try {
PublishRequest request = PublishRequest.builder()
.message(message)
.topicArn(topicArn)
.build();
PublishResponse result = snsClient.publish(request);
LOGGER.info(() -> String.format("Message published successfully. Message ID: " + result.messageId()));
} catch (SnsException e) {
LOGGER.warning(() -> String
.format("Error publishing message to SNS topic " + topicArn + ": " + e.awsErrorDetails().errorMessage()));
}
}
/**
* Closes the SNS client.
*/
public void shutdown() {
snsClient.close();
}
}