Ok, so I’ve got a nice little Amazon SNS publisher working for Ellemy.CQRS. So I figured I wanted to work some more with Amazon services and write a subscriber. Amazon makes a Simple Queue Service that was build to work seamlessly with their SNS, so I figured, why fight it?
Lets get to work. In keeping with the spirit of the other examples I’ve written for Ellemy.CQRS, I’m going to make this really simple. In fact I’m gunna write it to behave the same way as the NServiceBus Subscriber does.
Here’s the user story…
As a weird person who looks at console windows, I want to see a display on my screen that shows the message text and message id every time a message is submitted on the website so that I can clap my hands and do the funky chicken.
For now, I’m just gunna put the subscription stuff in the Amazon Publishing assembly (just like I did for NService bus). I don’t really see a reason not to, and since I’m the boss of this product, what I say goes.
Since I’ve done a lotta leg work in my last post, I’m familiar with what we’re gunna need a subscriber to do.
- Create a Queue if it doesn’t exist
- Set an Attribute on the new Queue that gives the Topic permission to send messages to it
- Subscribe to topics from the Topic ARN
So, lets get started. I wrote this little test.
1: [SetUp]
2: public void Arrange()
3: {
4: _config = Configure.With().AmazonPublisher();
5: _config
6: .AwsAccessKeyId(awsKey)
7: .AwsSecretKey(secret)
8: .TopicArn(topicArn)
9: .QueueName(tempQueueName);
10:
11: _amazonClient = AWSClientFactory.CreateAmazonSQSClient(awsKey, secret);
12: }
13: [TearDown]
14: public void CleanUp()
15: {
16: var listQueuesRequest = new ListQueuesRequest().WithQueueNamePrefix("TESTING");
17: var result = _amazonClient.ListQueues(listQueuesRequest);
18: foreach (var queueUrl in result.ListQueuesResult.QueueUrl)
19: {
20: _amazonClient.DeleteQueue(new DeleteQueueRequest().WithQueueUrl(queueUrl));
21: }
22:
23:
24: }
25: [Test]
26: public void it_will_create_the_queue()
27: {
28: //lets just leave the logic in the ctor, it'll be singleton for consumers
29: var subscriber = new AmazonSqsSubscriber(_config);
30:
31: var listQueuesResponse = _amazonClient.ListQueues(new ListQueuesRequest());
32: var foundExpectedQueue = false;
33: foreach (var queueUrl in listQueuesResponse.ListQueuesResult.QueueUrl)
34: {
35: if (queueUrl.Contains(tempQueueName))
36: foundExpectedQueue = true;
37: }
38: Assert.IsTrue(foundExpectedQueue,"Queue was not found");
39:
40:
41:
42: }
Basically, we just spin up the Subscriber and use Amazon’s SDK to check that it did create the queue, then we kill the queue in a teardown. Note that Amazon gets all pissy if you try and create a queue with the same name as one you just deleted within 60 seconds, I got around that by (you guessed it!) waiting 60 seconds before I ran the test again. I guess we can make a unique name for the queue and using that, maybe I’ll do that….
Lets make this puppy pass. Here’s what I did.
1: public class AmazonSqsSubscriber
2: {
3: private readonly AmazonConfig _config;
4: private AmazonSQS _client;
5:
6: public AmazonSqsSubscriber(AmazonConfig config)
7: {
8: _config = config;
9: SetupQueue();
10: }
11:
12: private void SetupQueue()
13: {
14: if (!String.IsNullOrEmpty(_config.SqsQueueUrl)) return;
15: _client = Amazon.AWSClientFactory.CreateAmazonSQSClient(_config.AccessKeyId, _config.SecretKey);
16: var listQueuesResponse = _client.ListQueues(new ListQueuesRequest {QueueNamePrefix = _config.SqsQueueName});
17: string queueUrl;
18: if(listQueuesResponse.ListQueuesResult.QueueUrl.Count == 0)
19: {
20: var response = _client.CreateQueue(new CreateQueueRequest{QueueName = _config.SqsQueueName});
21: queueUrl = response.CreateQueueResult.QueueUrl;
22: }
23: else
24: {
25: queueUrl = listQueuesResponse.ListQueuesResult.QueueUrl[0];
26: }
27: _config.SqsQueueUrl = queueUrl;
28: }
29: }
Kinda ugly, but yeah, that just made the test pass. I can’t see much I can do to clean it up except a little resharper extract method stuff, I won’t bore you with what I did.
Next up, lets make that subscriber actually receive messages. Amazon gives you a “RecieveMessage” call you can make, so we’ll just set up a little loop that’ll just continually call that, and dump out the results. Actually, lets just write the “test”, it’s not really a test, since there’s no assertion, I just wanna see stuff get written to my console.
1: [Test]
2: public void it_will_parse_messages()
3: {
4: var subscriber = new AmazonSqsSubscriber(_config);
5:
6:
7: var messageRequest = new SendMessageRequest()
8: .WithMessageBody("This is a test")
9: .WithQueueUrl(_config.SqsQueueUrl);
10: _amazonClient.SendMessage(messageRequest);
11:
12: subscriber.Start();
13:
14: subscriber.Stop();
15:
16: }
When I run this, I just wanna see that “This is a test” dumped out in my console, I guess I could make it so we inject some interface for Console, but I’m too lazy to do that right now. Let’s go make this puppy pass.
Here’s what I added.
1: [ThreadStatic]
2: private bool _stopped;
3: public void Stop()
4: {
5: Console.WriteLine("Stopping....");
6: _stopped = true;
7: }
8: public void Start()
9: {
10: ThreadPool.QueueUserWorkItem(delegate { BeginWork(); });
11: }
12: private void BeginWork()
13: {
14: while(!_stopped)
15: {
16: ThreadPool.QueueUserWorkItem(delegate { DoWork(); });
17: }
18: }
19: private void DoWork()
20: {
21: var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
22: if (!response.IsSetReceiveMessageResult())
23: {
24: return;
25: }
26: var messageResult = response.ReceiveMessageResult;
27: foreach (var message in messageResult.Message)
28: {
29: Console.WriteLine(message.Body);
30: }
31: }
32: }
Run the test, and here’s what I see.
Looks good, although, I KNOW there’s a bug, because Amazon leaves the message up there. I’ll write another test that demonstrates the bug, but since I was too lazy to inject some “Console writer” thing that I could do assertions on, I’ll just have to use my eyes to see that it’s “failing”.
1: [Test]
2: public void the_message_is_only_processed_once()
3: {
4: var subscriber = new AmazonSqsSubscriber(_config);
5:
6:
7: var messageRequest = new SendMessageRequest()
8: .WithMessageBody("This is a test")
9: .WithQueueUrl(_config.SqsQueueUrl);
10: _amazonClient.SendMessage(messageRequest);
11:
12: subscriber.Start();
13:
14: Thread.Sleep(15000);
15: subscriber.Stop();
16: }
Ok… so when I run that… For some reason, it does exactly what I expect. I think it’s got something to do with Amazon putting a lock on the message or something. Regardless I know (from Amazon Documentation) that I need to manually delete the message after I’ve processed it. So, whatever, I’m gunna go do that. Here’s what I did.
1: private void DoWork()
2: {
3: var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
4: if (!response.IsSetReceiveMessageResult())
5: {
6: return;
7: }
8: var messageResult = response.ReceiveMessageResult;
9: foreach (var message in messageResult.Message)
10: {
11: Console.WriteLine(message.Body);
12: Delete(message);
13: }
14: }
15:
16: private void Delete(Message message)
17: {
18: var deleteRequest = new DeleteMessageRequest()
19: .WithReceiptHandle(message.ReceiptHandle)
20: .WithQueueUrl(_config.SqsQueueUrl);
21: _client.DeleteMessage(deleteRequest);
22: }
Everything looks like it’s working.
Ok, technically, we’ve met the requirements of the AC, but we don’t wanna couple up that Console.WriteLine stuff, we wanna make this thing work so we can make Domain Event handlers handle the events that are sent in messages, I mean that was the whole point of this, right?
First of all, we’re not yet subscribing to a topic, lets rip off the code from my last post to do that.
Wow…. This made it ugly! But it’s working for now, we’ll clean up in a bit.
1: private void SetupQueue()
2: {
3: if (TheQueueIsAlreadySet()) return;
4: var queueUrl = GetOrCreateQueueUrl();
5: _config.SqsQueueUrl = queueUrl;
6: SubscribeToTopic();
7: }
8:
9: private void SubscribeToTopic()
10: {
11: SetPermissions();
12: var getArnRequest = new GetQueueAttributesRequest().WithQueueUrl(_config.SqsQueueUrl).WithAttributeName("QueueArn");
13: var clientArn = _client.GetQueueAttributes(getArnRequest).GetQueueAttributesResult.Attribute[0].Value;
14:
15: var sns = Amazon.AWSClientFactory.CreateAmazonSNSClient(_config.AccessKeyId,
16: _config.SecretKey);
17:
18: var subscriptionRequest = new SubscribeRequest()
19: .WithEndpoint(clientArn)
20: .WithProtocol("sqs")
21: .WithTopicArn(_config.TopicAccessResourceName);
22:
23: sns.Subscribe(subscriptionRequest);
24: }
25:
26: private void SetPermissions()
27: {
28: var setQueueAttributeRequest = new SetQueueAttributesRequest()
29: .WithQueueUrl(_config.SqsQueueUrl)
30: .WithAttribute(new Attribute { Name = "Policy", Value = AllowSnsAttribute() });
31: _client.SetQueueAttributes(setQueueAttributeRequest);
32: }
You’ll recognize a lot of this code from my last blog, so it’s not new. I really need to move out the concept of subscribing in it’s own little class, but I’ll get to that soon enough.
Changed my test to look like this.
1: [Test]
2: public void the_message_is_only_processed_once()
3: {
4: var subscriber = new AmazonSqsSubscriber(_config);
5:
6: var @event = new TestEvent {SomeGuid = Guid.NewGuid(), SomeInt = 1, SomeString = "Some String"};
7: _publisher.Publish(@event);
8:
9: subscriber.Start();
10: //Ugly, but I wanna make sure the message arrives
11: Thread.Sleep(3000);
12:
13: subscriber.Stop();
14:
15: }
16:
here’s what I see.
Sweet! Now events published (line 7 in the above code) actually do make it to my subscriber. Next, we’re gunna actually deserialize that Message property to get the actual Domain Event instead of just doing a Console.Write(message.Body).
Ok, first things first, we’ve gotta be able to load the type. I added a cute little method to allow consumers to specify what assembly (or assemblies) the Events they’ll be publishing live in to my AmazonConfig class like so.
1: public AmazonConfig EventsAreInAssemblyContainingType<TEvent>()
2: {
3: if (EventAssemblies == null)
4: EventAssemblies = new List<Assembly>();
5: EventAssemblies.Add(typeof(TEvent).Assembly);
6: return this;
7: }
Just call that method in my test, and changed my AmazonSubscriber to look like this. Bear with the Ugly for now, I’ll be cleaning it, the point of this isn’t to show how clean I can make code, it’s to show that I can get it to work. Pull down the code from git, you’ll see that I clean my stuff up darnit! Anyway, here’s the (very ugly) code.
1: private void DoWork()
2: {
3: var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
4: if (!response.IsSetReceiveMessageResult())
5: {
6: return;
7: }
8: var messageResult = response.ReceiveMessageResult;
9: foreach (var message in messageResult.Message)
10: {
11: var serializer = new JavaScriptSerializer();
12: var thing = (Dictionary<String,Object>)serializer.DeserializeObject(message.Body);
13: Type eventType = null;
14: foreach (var eventAssembly in _config.EventAssemblies)
15: {
16: var t = eventAssembly.GetType((string)thing["Subject"]);
17: if(t!=null)
18: {
19: eventType = t;
20: break;
21: }
22: }
23: if (eventType == null)
24: throw new ConfigurationException(
25: String.Format(
26: "Could not load type {0}, please make sure you call AmazonConfig.EventsAreInAssemblyContainingType<TEvent>() during bootstrap.",
27: thing["Subject"]));
28:
29: var @event = serializer.Deserialize((string)thing["Message"], eventType);
30: Console.WriteLine(@event);
31: Delete(message);
32: }
33: }
Still doing that Console.WriteLine, of course, but yeah, now when I run the test, I’m getting “AmazonTests.TestEvent” instead of that Json string. Now, lets just change that Console.WriteLine to instead locate all the event handlers for the event and execute them.
Once again, shut up about ugly, this is RED/GREEN/CLEAN land, clean is last, I wanna make it work before the code looks pretty.
I changed that line 30 from above to this.
1: var @event = serializer.Deserialize((string)thing["Message"], eventType);
2: var handlerInterface = typeof(IDomainEventHandler<>).MakeGenericType(eventType);
3: foreach (var handler in _config.EllemyConfiguration.ObjectBuilder.BuildAll(handlerInterface))
4: {
5: var handlerMethod = handler.GetType().GetMethod("Handle");
6: handlerMethod.Invoke(handler, new [] { @event });
7: }
8: Delete(message);
I’ll then go write a quick hander write in the test assembly that’ll do the Console.WriteLine, just so I can still see all that pretty stuff. Like this.
1: public class TestEvent : IDomainEvent
2: {
3: public Guid SomeGuid { get; set; }
4: public String SomeString { get; set; }
5: public int SomeInt { get; set; }
6: }
7: public class ConsoleWriter : IDomainEventHandler<TestEvent>{
8: public void Handle(TestEvent @event)
9: {
10: Console.WriteLine("SomeGuid: \t{0}",@event.SomeGuid);
11: Console.WriteLine("SomeInt: \t {0}",@event.SomeInt);
12: Console.WriteLine("SomeString: \t{0}",@event.SomeString);
13: }
14: }
To make it all work, I changed the setup method in my test to look like this.
1: [SetUp]
2: public void Arrange()
3: {
4:
5: tempQueueName = "Test_QUEUE";
6: _config = Configure.With()
7: .StructureMapBuilder()
8: .HandlersAreInAssemblyContainingType<TestEvent>()
9: .AmazonPublisher();
10:
11: _config
12: .AwsAccessKeyId(awsKey)
13: .AwsSecretKey(secret)
14: .TopicArn(topicArn)
15: .QueueName(tempQueueName)
16: .EventsAreInAssemblyContainingType<TestEvent>();
17:
18: _publisher = new AmazonPublisher(_config);
19:
20: _amazonClient = AWSClientFactory.CreateAmazonSQSClient(awsKey, secret);
21:
22: }
So, note that we’re (in line 6) telling it to use my StructureMapBuilder, and telling it where the DomainEventHandlers are (line 8).
Lets give the test a run.
NICE!
Lots of stuff going on there, and I’ve got a lot of cleaning to do, but it’s working! Freel free to pull down the code from github. If you’d like to contribute, I’m game too!
Have fun with it all, and I hope this is helpful.