Image via CrunchBaseIn the code below, I'm using the AWS SDK for Java (available here). The program does the following:
- Create an SNS topic.
- Set a policy on the topic to allow subscriptions.
- Create an SQS queue.
- Set a policy on the queue to allow message posting.
- Subscribe the queue to the topic.
- Send an SNS notification message.
- Listen for the message on the queue.
Steps 1-6 seem to work just fine, but on step 7, it listens in vain for messages. All ten iterations go by without picking up any messages on the queue. As an added note, if I subscribe an email address to the topic, messages are delivered. Only the queue appears to be bypassed when SNS sends its notifications.
public class SnsWithSqsDemo {
private static final String TOPIC_NAME = "TestTopic";
private static final String QUEUE_NAME = "TestQueue";
/**
* This program demonstrates how to hook up an AWS SQS queue to an AWS SNS
* topic to receive messages.
*
* 1. Create an SNS topic.
* 2. Set a policy on the topic to allow subscriptions.
* 3. Create an SQS queue.
* 4. Set a policy on the queue to allow message posting.
* 5. Subscribe the queue to the topic.
* 6. Send an SNS notification message.
* 7. Listen for the message on the queue.
*
* @param args not used
*/
public static void main(String[] args) {
AWSCredentials awsCredentials = null;
AmazonSNS sns = null;
AmazonSQS sqs = null;
String topicArn = null;
String queueUrl = null;
String queueArn = null;
String subscriptionArn = null;
try {
awsCredentials =
new PropertiesCredentials(
SnsWithSqsDemo.class.getResourceAsStream("AwsCredentials.properties"));
sns = new AmazonSNSClient(awsCredentials);
// 1. Create a topic
CreateTopicResult ctResult =
sns.createTopic(new CreateTopicRequest(TOPIC_NAME));
topicArn = ctResult.getTopicArn();
System.out.println(String.format("Created topic %s with ARN %s",
TOPIC_NAME, topicArn));
// 2. Set policy on topic to allow open subscriptions
Policy snsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SNSActions.Subscribe));
// ??? Is there no Java SDK constant for "Policy"?
sns.setTopicAttributes(new SetTopicAttributesRequest(
topicArn, "Policy", snsPolicy.toJson()));
// 3. Create a queue
sqs = new AmazonSQSClient(awsCredentials);
CreateQueueResult cqResult =
sqs.createQueue(new CreateQueueRequest(QUEUE_NAME));
queueUrl = cqResult.getQueueUrl();
System.out.println(String.format("Created queue %s with URL %s",
QUEUE_NAME, queueUrl));
// 4. Set the queue policy to allow SNS to publish messages
Policy sqsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SQSActions.SendMessage)
.withConditions(ConditionFactory.newSourceArnCondition(topicArn)));
Map queueAttributes = new HashMap();
queueAttributes.put("Policy", sqsPolicy.toJson());
sqs.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, queueAttributes));
// ??? Why isn't the queue ARN in the queue attributes?
AmazonIdentityManagement idMgr = new AmazonIdentityManagementClient(awsCredentials);
String accountId = idMgr.getUser().getUser().getUserId();
queueArn = String.format("arn:aws:sqs:us-east-1:%s:%s", accountId, QUEUE_NAME);
// 5. Subscribe the queue to the topic
SubscribeResult sResult =
sns.subscribe(new SubscribeRequest(topicArn, "sqs", queueArn));
subscriptionArn = sResult.getSubscriptionArn();
System.out.println("Subscription ARN: " + subscriptionArn);
// 5.5. Wait a bit for AWS to get all synched-up
Thread.sleep(60000L);
// 6. Send a notification
PublishResult pResult =
sns.publish(new PublishRequest(topicArn,
"Mr Watson -- Come here -- I want to see you."));
System.out.println("Sent message ID = " + pResult.getMessageId());
// 7. Wait for message receipt in queue
for (int i = 0; i < 10; i++) { Thread.sleep(2000L); ReceiveMessageResult rmResult = sqs.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (rmResult.getMessages().size() > 0) {
// A message has been received
for (Message message : rmResult.getMessages()) {
System.out.println(message.getBody());
sqs.deleteMessage(new DeleteMessageRequest(queueUrl,
message.getReceiptHandle()));
}
break;
} else {
System.out.println("No messages available, attempt " + (i+1));
}
}
} catch (Exception e) {
e.printStackTrace(System.err);
} finally {
// Unsubscribe the queue from the topic
if (sns != null && subscriptionArn != null) {
sns.unsubscribe(new UnsubscribeRequest(subscriptionArn));
System.out.println("Unsubscribed queue from topic.");
}
// Destroy queue
if (sqs != null && queueUrl != null) {
sqs.deleteQueue(new DeleteQueueRequest(queueUrl));
System.out.println("Deleted the queue.");
sqs.shutdown();
}
// Destroy topic
if (sns != null && topicArn != null) {
sns.deleteTopic(new DeleteTopicRequest(topicArn));
System.out.println("Deleted the topic.");
sns.shutdown();
}
}
}
}

2 comments:
I saw what I was doing wrong with the queue properties. Unlike the SNS properties, SQS properties require you to specify exactly which properties to retrieve, and default to retrieving nothing. So instead of doing this:
sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl));
One should do this:
sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl) .withAttributeNames("Policy", "QueueArn", "ApproximateNumberOfMessages"));
It would be nice for the SDK to have some enums defined for this.
OK, I figured it out! The issue is with the Policy object on the queue; it needs a resource set, and that resource is, well, itself rather than what's being allowed permission (that's in the conditions). It seems counter-intuitive, but it works. I'll make a separate post with the complete solution, but the relevant piece is:
Policy sqsPolicy =
new Policy().withStatements(
new Statement(Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn)) // Note: queue, not topic
.withActions(SQSActions.SendMessage)
.withConditions(
ConditionFactory.newSourceArnCondition(topicArn)));
Post a Comment