Thursday, March 24, 2011

AWS: How do you subscribe an SQS queue to an SNS topic?

Image representing Amazon Web Services as depi...Image via CrunchBase
If there are any AWS (Amazon Web Services) gurus out there, I'm stumped on trying to use a Simple Queue Service queue to receive notifications sent out on a Simple Notification Service topic. Any pointers?

In the code below, I'm using the AWS SDK for Java (available here). The program does the following:


  1. Create an SNS topic.
  2. Set a policy on the topic to allow subscriptions.
  3. Create an SQS queue.
  4. Set a policy on the queue to allow message posting.
  5. Subscribe the queue to the topic.
  6. Send an SNS notification message.
  7. Listen for the message on the queue.
Steps 1-6 seem to work just fine, but on step 7, it listens in vain for messages. All ten iterations go by without picking up any messages on the queue. As an added note, if I subscribe an email address to the topic, messages are delivered. Only the queue appears to be bypassed when SNS sends its notifications.


public class SnsWithSqsDemo {
private static final String TOPIC_NAME = "TestTopic";
private static final String QUEUE_NAME = "TestQueue";

/**
* This program demonstrates how to hook up an AWS SQS queue to an AWS SNS
* topic to receive messages.
*
* 1. Create an SNS topic.
* 2. Set a policy on the topic to allow subscriptions.
* 3. Create an SQS queue.
* 4. Set a policy on the queue to allow message posting.
* 5. Subscribe the queue to the topic.
* 6. Send an SNS notification message.
* 7. Listen for the message on the queue.
*
* @param args not used
*/
public static void main(String[] args) {
AWSCredentials awsCredentials = null;
AmazonSNS sns = null;
AmazonSQS sqs = null;
String topicArn = null;
String queueUrl = null;
String queueArn = null;
String subscriptionArn = null;
try {
awsCredentials =
new PropertiesCredentials(
SnsWithSqsDemo.class.getResourceAsStream("AwsCredentials.properties"));
sns = new AmazonSNSClient(awsCredentials);
// 1. Create a topic
CreateTopicResult ctResult =
sns.createTopic(new CreateTopicRequest(TOPIC_NAME));
topicArn = ctResult.getTopicArn();
System.out.println(String.format("Created topic %s with ARN %s",
TOPIC_NAME, topicArn));
// 2. Set policy on topic to allow open subscriptions
Policy snsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SNSActions.Subscribe));
// ??? Is there no Java SDK constant for "Policy"?
sns.setTopicAttributes(new SetTopicAttributesRequest(
topicArn, "Policy", snsPolicy.toJson()));
// 3. Create a queue
sqs = new AmazonSQSClient(awsCredentials);
CreateQueueResult cqResult =
sqs.createQueue(new CreateQueueRequest(QUEUE_NAME));
queueUrl = cqResult.getQueueUrl();
System.out.println(String.format("Created queue %s with URL %s",
QUEUE_NAME, queueUrl));
// 4. Set the queue policy to allow SNS to publish messages
Policy sqsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SQSActions.SendMessage)
.withConditions(ConditionFactory.newSourceArnCondition(topicArn)));
Map queueAttributes = new HashMap();
queueAttributes.put("Policy", sqsPolicy.toJson());
sqs.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, queueAttributes));
// ??? Why isn't the queue ARN in the queue attributes?
AmazonIdentityManagement idMgr = new AmazonIdentityManagementClient(awsCredentials);
String accountId = idMgr.getUser().getUser().getUserId();
queueArn = String.format("arn:aws:sqs:us-east-1:%s:%s", accountId, QUEUE_NAME);
// 5. Subscribe the queue to the topic
SubscribeResult sResult =
sns.subscribe(new SubscribeRequest(topicArn, "sqs", queueArn));
subscriptionArn = sResult.getSubscriptionArn();
System.out.println("Subscription ARN: " + subscriptionArn);
// 5.5. Wait a bit for AWS to get all synched-up
Thread.sleep(60000L);
// 6. Send a notification
PublishResult pResult =
sns.publish(new PublishRequest(topicArn,
"Mr Watson -- Come here -- I want to see you."));
System.out.println("Sent message ID = " + pResult.getMessageId());
// 7. Wait for message receipt in queue
for (int i = 0; i < 10; i++) { Thread.sleep(2000L); ReceiveMessageResult rmResult = sqs.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (rmResult.getMessages().size() > 0) {
// A message has been received
for (Message message : rmResult.getMessages()) {
System.out.println(message.getBody());
sqs.deleteMessage(new DeleteMessageRequest(queueUrl,
message.getReceiptHandle()));
}
break;
} else {
System.out.println("No messages available, attempt " + (i+1));
}
}
} catch (Exception e) {
e.printStackTrace(System.err);
} finally {
// Unsubscribe the queue from the topic
if (sns != null && subscriptionArn != null) {
sns.unsubscribe(new UnsubscribeRequest(subscriptionArn));
System.out.println("Unsubscribed queue from topic.");
}
// Destroy queue
if (sqs != null && queueUrl != null) {
sqs.deleteQueue(new DeleteQueueRequest(queueUrl));
System.out.println("Deleted the queue.");
sqs.shutdown();
}
// Destroy topic
if (sns != null && topicArn != null) {
sns.deleteTopic(new DeleteTopicRequest(topicArn));
System.out.println("Deleted the topic.");
sns.shutdown();
}
}
}

}


Enhanced by Zemanta

6 comments:

Rajiv Mote said...

I saw what I was doing wrong with the queue properties. Unlike the SNS properties, SQS properties require you to specify exactly which properties to retrieve, and default to retrieving nothing. So instead of doing this:


sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl));


One should do this:


sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl) .withAttributeNames("Policy", "QueueArn", "ApproximateNumberOfMessages"));


It would be nice for the SDK to have some enums defined for this.

Rajiv Mote said...

OK, I figured it out! The issue is with the Policy object on the queue; it needs a resource set, and that resource is, well, itself rather than what's being allowed permission (that's in the conditions). It seems counter-intuitive, but it works. I'll make a separate post with the complete solution, but the relevant piece is:


Policy sqsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn)) // Note: queue, not topic
.withActions(SQSActions.SendMessage)
.withConditions(
ConditionFactory.newSourceArnCondition(topicArn)));

matthew.luchak said...

Hi Rajiv,

Do you have any sources that helped you solve the policy question. I am still battling with setting a simple SQS policy (no SNS) and have implemented your resolution but am getting "AWS.SimpleQueueService.NonExistentQueue" error.

I agree that the docs on this are very poor and support from AWS (even paying for bronze support) is pitiful..

thanks,
Matthew

Rajiv Mote said...

Hi Matthew,

Unfortunately, I haven't found anything much better than the policy API documentation itself:

http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AccessPolicyLanguage.html

The only way I've been able to make headway on these issues is to do a simple program (like the one in the original post) that performs both producer and consumer operations, just to eliminate other sources of access problems. Then I keep tweaking the policy and trying goofy things until it works. Only then do I translate it to the distributed environment.

I wish I could offer more guidance. Good luck.

Anonymous coward said...

your code makes my eyes bleed

Rajiv Mote said...

AC: That means it's your job to do better -- and share.