The methodology is this:
- Create an SNS topic.
- Set a policy on the topic to allow subscriptions.
- Create an SQS queue.
- Set a policy on the queue to allow message posting.
- Subscribe the queue to the topic.
- Wait for AWS settings to propagate and "take."
- Send an SNS notification message.
- Listen for the message on the queue.
And here's the code:
public class SnsWithSqsDemo {
private static final Log LOG = LogFactory.getLog(SnsWithSqsDemo.class);
private static final String TOPIC_NAME = "TestTopic";
private static final String QUEUE_NAME = "TestQueue";
/**
* This program demonstrates how to hook up an AWS SQS queue to an AWS SNS
* topic to receive messages.
*
* 1. Create an SNS topic.
* 2. Set a policy on the topic to allow subscriptions.
* 3. Create an SQS queue.
* 4. Set a policy on the queue to allow message posting.
* 5. Subscribe the queue to the topic.
* 6. Wait for AWS settings to propagate and "take."
* 7. Send an SNS notification message.
* 8. Listen for the message on the queue.
*
* @param args not used
*/
public static void main(String[] args) {
AWSCredentials awsCredentials = null;
AmazonSNS sns = null;
AmazonSQS sqs = null;
String topicArn = null;
String queueUrl = null;
String queueArn = null;
String subscriptionArn = null;
LOG.debug("Beginning.");
try {
awsCredentials =
new PropertiesCredentials(
SnsWithSqsDemo.class.getResourceAsStream("AwsCredentials.properties"));
sns = new AmazonSNSClient(awsCredentials);
// 1. Create a topic
System.out.println("Step 1. Create an SNS topic.");
CreateTopicResult ctResult =
sns.createTopic(new CreateTopicRequest(TOPIC_NAME));
topicArn = ctResult.getTopicArn();
System.out.println(String.format("Created topic %s with ARN %s",
TOPIC_NAME, topicArn));
// 2. Set policy on topic to allow open subscriptions
System.out.println("Step 2. Set a policy on the topic to allow subscriptions.");
Policy snsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SNSActions.Subscribe));
System.out.println("Set SNS policy: " + snsPolicy.toJson());
sns.setTopicAttributes(new SetTopicAttributesRequest(
topicArn, "Policy", snsPolicy.toJson()));
// 3. Create a queue
System.out.println("Step 3. Create an SQS queue.");
sqs = new AmazonSQSClient(awsCredentials);
CreateQueueResult cqResult =
sqs.createQueue(new CreateQueueRequest(QUEUE_NAME));
queueUrl = cqResult.getQueueUrl();
System.out.println(String.format("Created queue %s with URL %s",
QUEUE_NAME, queueUrl));
GetQueueAttributesResult queueArnResult =
sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
queueArn = queueArnResult.getAttributes().get("QueueArn");
System.out.println("Queue ARN = " + queueArn);
// 4. Set the queue policy to allow SNS to publish messages
System.out.println("Step 4. Set a policy on the queue to allow message posting.");
Policy sqsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn)) // Note: queue, not topic
.withActions(SQSActions.SendMessage)
.withConditions(
ConditionFactory.newSourceArnCondition(topicArn)));
Map queueAttributes = new HashMap();
queueAttributes.put("Policy", sqsPolicy.toJson());
sqs.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, queueAttributes));
System.out.println("Set SQS policy to " + queueUrl + ": " + sqsPolicy.toJson());
// 5. Subscribe the queue to the topic
System.out.println("Step 5. Subscribe the queue to the topic.");
SubscribeResult sResult =
sns.subscribe(new SubscribeRequest(topicArn, "sqs", queueArn));
subscriptionArn = sResult.getSubscriptionArn();
System.out.println("Subscription ARN: " + subscriptionArn);
// 6. Wait a bit for AWS to get all synched-up
System.out.println("Step 6. Wait for AWS settings to propagate and \"take.\"");
Thread.sleep(60000L);
// 6.1. Verify queue attributes
GetQueueAttributesResult gqaResult =
sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("Policy", "QueueArn", "ApproximateNumberOfMessages"));
if (gqaResult.getAttributes().size() == 0) {
System.out.println("Queue " + QUEUE_NAME + " has no attributes");
} else {
System.out.println("Attributes for " + QUEUE_NAME);
for (String key : gqaResult.getAttributes().keySet()) {
System.out.println(String.format("\t%s = %s",
key, gqaResult.getAttributes().get(key)));
}
}
// 6.2. Verify topic attributes
GetTopicAttributesResult gtaResult =
sns.getTopicAttributes(new GetTopicAttributesRequest(topicArn));
if (gtaResult.getAttributes().size() == 0) {
System.out.println("Topic " + TOPIC_NAME + " has no attributes");
} else {
System.out.println("Attributes for " + TOPIC_NAME);
for (String key : gtaResult.getAttributes().keySet()) {
System.out.println(String.format("\t%s = %s",
key, gtaResult.getAttributes().get(key)));
}
}
// 6.3. Verify subscription
ListSubscriptionsByTopicResult lsbtResult =
sns.listSubscriptionsByTopic(new ListSubscriptionsByTopicRequest(topicArn));
if (lsbtResult.getSubscriptions().size() == 0) {
System.out.println("Topic " + TOPIC_NAME + " has no subscriptions.");
} else {
System.out.println("Subscriptions for " + TOPIC_NAME);
for (Subscription subscription : lsbtResult.getSubscriptions()) {
System.out.println("\t" + subscription.getProtocol() + ": "
+ subscription.getEndpoint());
}
}
// 7. Send a notification
System.out.println("Step 7. Send an SNS notification message.");
PublishResult pResult =
sns.publish(new PublishRequest(topicArn,
"Mr Watson -- Come here -- I want to see you."));
System.out.println("Sent message ID = " + pResult.getMessageId());
// 8. Wait for message receipt in queue
System.out.println("Step 8. Listen for the message on the queue.");
for (int i = 0; i < 10; i++) { Thread.sleep(2000L); ReceiveMessageResult rmResult = sqs.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (rmResult.getMessages().size() > 0) {
// A message has been received
for (Message message : rmResult.getMessages()) {
System.out.println(message.getBody());
sqs.deleteMessage(new DeleteMessageRequest(queueUrl,
message.getReceiptHandle()));
}
break;
} else {
// ??? Why aren't we receiving messages?
System.out.println("No messages available, attempt " + (i+1));
}
}
} catch (Exception e) {
e.printStackTrace(System.err);
} finally {
System.out.println("Shutting down...");
// Unsubscribe the queue from the topic
if (sns != null && subscriptionArn != null) {
sns.unsubscribe(new UnsubscribeRequest(subscriptionArn));
System.out.println("Unsubscribed queue from topic.");
}
// Destroy queue
if (sqs != null && queueUrl != null) {
sqs.deleteQueue(new DeleteQueueRequest(queueUrl));
System.out.println("Deleted the queue.");
sqs.shutdown();
}
// Destroy topic
if (sns != null && topicArn != null) {
sns.deleteTopic(new DeleteTopicRequest(topicArn));
System.out.println("Deleted the topic.");
sns.shutdown();
}
}
LOG.debug("Done.");
}
}