Wednesday, February 23, 2011

Size does matter, Crunching data for the cloud with a Google Protocol Buffer Serializer

So, most cloud services charge by some unit of size (Amazon charges by the Gig). If you’re working for a company that uses these services, then, well, duhh….send around the smallest bits of data you can, right?

The way I’ve got Ellemy stuff for serialization now is that I have one little interface.

   1:  public interface ISerializer
   2:      {
   3:          object Deserialize(string input, Type desiredType);
   4:          object DeserializeObject(string input);
   5:          string Serialize(object input);
   6:      }

I hate the DeserializeObject thing, but, well, that’s for another time.


I have one concrete implementation of it currently, it uses JSON. I won’t show it here, because that’s not the point of the post.


So, I’ve been looking at Google Protocol Buffers and the .Net project for it, so I figured I’d write a ISerializer that uses protocol buffers. It looks pretty nice, apparently about half the size of Json serialization and a bit quicker too. The not so nice part is that the data is not self descriptive, and you MUST know the type of the object being requested. Honestly though, that’s not a problem for most apps I write, Just send the type in the message, and we’re good right?


So taking a look at the Protobuf-net wiki, I see that we’ll need data contracts for our messages.


Given a class like so.


   1:   public class SomeThingToSerialize
   2:      {
   3:          public string SomeStringProperty { get; set; }
   4:          public Guid SomeGuidProperty { get; set; }
   5:          public int SomeIntProperty { get; set; }
   6:      }

We’d need a class like this so that Protocol Buffer can do its thing.


   1:  [ProtoContract]
   2:      public class SomeThingToSerialize
   3:      {
   4:          [ProtoMember(1)]
   5:          public string SomeStringProperty { get; set; }
   6:          [ProtoMember(2)]
   7:          public Guid SomeGuidProperty { get; set; }
   8:          [ProtoMember(3)]
   9:          public int SomeIntProperty { get; set; }
  10:      }

I don’t want to muddy my problem domains with sterilization specific attributes, so I think what I’ll do is actually generate the classes for the messages on the fly – let’s play in CodeDom a bit!


Test


   1:   [TestFixture]
   2:      public class ProtocolBufferGenerator_tests
   3:      {
   4:          private ProtocolBufferDataContractGenerator _generator;
   5:          [SetUp]
   6:          public void arrange()
   7:          {
   8:              _generator = new ProtocolBufferDataContractGenerator();
   9:          }
  10:          [Test]
  11:          public void message_is_decorated_with_ProtoContract_attribute()
  12:          {
  13:              var random = new Random();
  14:              var testThing = new SomeThingToSerialize
  15:                                  {
  16:                                      SomeGuidProperty = Guid.NewGuid(),
  17:                                      SomeIntProperty = random.Next(),
  18:                                      SomeStringProperty = "Blah"
  19:                                  };
  20:              var protoClass = _generator.GenerateProtoFor(testThing);
  21:              var foundProtoContractAttribute = protoClass.GetType()
  22:                  .GetCustomAttributes(false)
  23:                  .Any(attribute => attribute.GetType().Name == "ProtoContractAttribute");
  24:   
  25:              Assert.IsTrue(foundProtoContractAttribute);
  26:          }
  27:      }

Yeah, just what the test says, it just checks that the object this ProtocolBufferDataContractGenerator is decorated with the correct attribute. Lets make it so.


   1:  public class ProtocolBufferDataContractGenerator
   2:      {
   3:          private const string _codeContractsNamespace = "Ellemy.CQRS.Serializers.GoogleProtocolBuffers.Contracts";
   4:          
   5:   
   6:          public object GenerateProtoFor<T>(T thing)
   7:          {
   8:              var nameSpace = new CodeNamespace(_codeContractsNamespace);
   9:              nameSpace.Imports.Add(new CodeNamespaceImport("ProtoBuf"));
  10:              nameSpace.Imports.Add(new CodeNamespaceImport(thing.GetType().Namespace));
  11:              var @class = new CodeTypeDeclaration(thing.GetType().Name)
  12:                               {
  13:                                   IsClass = true,
  14:                                   Attributes = MemberAttributes.Public
  15:                               };
  16:              var protoContractAttribute = new CodeAttributeDeclaration("ProtoContract");
  17:              @class.CustomAttributes.Add(protoContractAttribute);
  18:              nameSpace.Types.Add(@class);
  19:              var compileUnit = new CodeCompileUnit();
  20:              compileUnit.Namespaces.Add(nameSpace);
  21:              compileUnit.ReferencedAssemblies.Add("protobuf-net.dll");
  22:              var thingAssembly = thing.GetType().Assembly;
  23:              var assemblyToAdd = thingAssembly.GetName().Name + ".dll";
  24:              compileUnit.ReferencedAssemblies.Add(assemblyToAdd);
  25:              var parameters = new CompilerParameters {GenerateInMemory = true};
  26:   
  27:              var provider = new CSharpCodeProvider();
  28:              var results = provider.CompileAssemblyFromDom(parameters,compileUnit);
  29:              if(results.Errors.Count != 0)
  30:              {
  31:                  throw new InvalidOperationException(results.Errors[0].ErrorText);
  32:              }
  33:             
  34:              return
  35:                  results.CompiledAssembly.CreateInstance(string.Format("{0}.{1}", _codeContractsNamespace,thing.GetType().Name));
  36:   
  37:   
  38:          }
  39:   
  40:       
  41:          }

Lots of code there, but it does work! If you’re not familiar with code dom, it’s actually not very complicated. We’re just using C# to gen c#, and the code is fairly self documenting.


Pretty neat that we’re leaving the assembly in memory, but I’m thinking that in the future, we might wanna save that assembly, and not pay the cost of genning that class multiple times, but we’ll get to that soon enough.


So far, we’re creating an empty class decorated with the ProtoContract attribute, which is perfectly worthless since we don’t have the properties. Let’s fix that.


First, make sure we’re adding properties on the message.


   1:   [Test]
   2:          public void properties_are_added()
   3:          {
   4:              var expectedNumberOfProperties = typeof(SomeThingToSerialize).GetProperties().Count();
   5:              var actualNumberOfProperties = _protoClass.GetType().GetProperties().Count();
   6:              Assert.AreEqual(expectedNumberOfProperties,actualNumberOfProperties);
   7:          }

To make it pass I wrote this little method and called it on line 19 (actually doesn’t matter where) from the GenerateProto for method.


   1:   private void AddProperties<T>(T thing, CodeTypeDeclaration @class)
   2:          {
   3:              foreach (var propertyInfo in thing.GetType().GetProperties().OrderBy(p => p.Name))
   4:              {
   5:                  var field = new CodeMemberField
   6:                                  {
   7:                                      Type = new CodeTypeReference(propertyInfo.PropertyType.FullName),
   8:                                      Attributes = MemberAttributes.Private,
   9:                                      Name = "_" + propertyInfo.Name
  10:                                  };
  11:                  @class.Members.Add(field);
  12:                  var @property = new CodeMemberProperty
  13:                                      {
  14:                                          Name = propertyInfo.Name,
  15:                                          HasGet = true,
  16:                                          HasSet = true,
  17:                                          Type = new CodeTypeReference(propertyInfo.PropertyType.FullName),
  18:                                          Attributes = MemberAttributes.Public,
  19:                                      };
  20:                  var getter = new CodeSnippetStatement(String.Format("return _{0};",propertyInfo.Name));
  21:                  @property.GetStatements.Add(getter);
  22:                  var setter = new CodeSnippetStatement(String.Format("_{0} = value;", propertyInfo.Name));
  23:                  @property.SetStatements.Add(setter);
  24:                  @class.Members.Add(@property);
  25:              }
  26:          }







Ok, so now we need to decorate each property with a ProtoMember attribute. Here’s the test.


   1:  [Test]
   2:          public void every_property_on_the_message_is_decorated_with_a_ProtoMember_attribute()
   3:          {
   4:              
   5:              foreach(var propertyInfo in _protoClass.GetType().GetProperties())
   6:              {
   7:                  var foundProtoMemberAttribute = _protoClass.GetType().GetProperty(propertyInfo.Name)
   8:                 .GetCustomAttributes(false)
   9:                 .Any(attribute => attribute.GetType().Name == "ProtoMemberAttribute");
  10:                  Assert.IsTrue(foundProtoMemberAttribute);
  11:   
  12:              }
  13:          }


And now lets make it pass. I wrote this little method and added it as in that loop in AddProperties.


   1:   private void AddProtoMemberAttribute(CodeMemberProperty property, int memberNumber)
   2:          {
   3:              var protoBuffAttribute = new CodeAttributeDeclaration("ProtoMember");
   4:              var attributeArgument = new CodeAttributeArgument(new CodePrimitiveExpression(memberNumber));
   5:              protoBuffAttribute.Arguments.Add(attributeArgument);
   6:              @property.CustomAttributes.Add(protoBuffAttribute);
   7:          }

Sweet, now we’re generating the DataContracts! We’re still not actually serializing objects though. We’re just making stuff that Google Protocol Buffers can work with.


Lets add a new test.



   1:   [TestFixture]
   2:      public class using_the_GoogleProtocolBuffer_serializer
   3:      {
   4:          private Serializer _serializer;
   5:          [SetUp]
   6:          public void Arrange()
   7:          {
   8:              _serializer = new Serializer();
   9:          }
  10:          
  11:          [Test]
  12:          public void serialize_an_non_DataContract_class()
  13:          {
  14:              var testThing = new TestThing { Guid = Guid.NewGuid(), Int = 1, String = "Some String"};
  15:              var output = _serializer.Serialize(testThing);
  16:              Assert.IsNotNullOrEmpty(output);
  17:              Console.WriteLine(output);
  18:          }


Ok, so not much going on here, we’re just testing that the output is actually not null, and (by extension), that the Serializer class doesn’t throw an exception.


Here’s what I did to make it work.


 

   1:   public class Serializer : ISerializer 
   2:      {
   3:          private readonly ProtocolBufferDataContractGenerator _protocolBufferDataContractGenerator;
   4:   
   5:          public Serializer()
   6:          {
   7:              _protocolBufferDataContractGenerator = new ProtocolBufferDataContractGenerator();
   8:          }
   9:          public object Deserialize(string input, Type desiredType)
  10:          {throw new NotImplementedException("patience is a virtue"); }
  11:   
  12:          public object DeserializeObject(string input)
  13:          {
  14:              throw new NotSupportedException("nope, dis dont werk ");
  15:          }
  16:          public string Serialize(object input)
  17:          {
  18:              var t = _protocolBufferDataContractGenerator.GenerateProtoFor(input);
  19:              foreach (var property in input.GetType().GetProperties())
  20:              {
  21:                  var setterForT = t.GetType().GetProperty(property.Name);
  22:                  var value = property.GetValue(input, null);
  23:                  setterForT.SetValue(t, value,null);
  24:              }
  25:              string data;
  26:              using (var writer = new MemoryStream())
  27:              {
  28:                  ProtoBuf.Serializer.NonGeneric.Serialize(writer, t);
  29:                  writer.Position = 0;
  30:                  using (var reader = new StreamReader(writer,Encoding.ASCII))
  31:                  {
  32:                      data = reader.ReadToEnd();
  33:                  }
  34:              }
  35:              return data;
  36:          }
  37:      }
  38:  }

on line 18, I simply get an instance of the DataContract class (we just saw what’s in there). I then loop through all the types on the DataContract via reflection and set all the values appropriately.


On line 26-34, we’re just using Protobuff-net to serialize the object, and return the results.


Not horribly complicated, but we’re still not done, because we can’t yet deserialize.


New test.


   1:   [Test]
   2:          public void serialize_then_deserialize()
   3:          {
   4:              var testThing = new TestThing { Guid = Guid.NewGuid(), Int = 1, String = "Some String" };
   5:              var output = _serializer.Serialize(testThing);
   6:              var result = (TestThing)_serializer.Deserialize(output, typeof(TestThing));
   7:              Assert.AreEqual(testThing.Guid, result.Guid);
   8:              Assert.AreEqual(testThing.String, result.String);
   9:              Assert.AreEqual(testThing.Int, result.Int);
  10:              Assert.AreEqual(testThing.Enum1, result.Enum1);
  11:          }


Yeah, now we’re testing that it actually works. Lets make this pass.


   1:   public object Deserialize(string input, Type desiredType)
   2:          {
   3:              var bytes = ASCIIEncoding.ASCII.GetBytes(input);
   4:              var @event = Activator.CreateInstance(desiredType);
   5:              using (var stream = new MemoryStream(bytes))
   6:              {
   7:                  var thisSucksINeedToFixIt = Activator.CreateInstance(desiredType);
   8:                  var protobufferType = _protocolBufferDataContractGenerator.GenerateProtoFor(thisSucksINeedToFixIt).GetType();
   9:                  var protobuffer = ProtoBuf.Serializer.NonGeneric.Deserialize(protobufferType, stream);
  10:                  
  11:                  foreach (var fieldInfo in protobuffer.GetType().GetProperties())
  12:                  {
  13:                      var setter = desiredType.GetProperty(fieldInfo.Name);
  14:                      var value = fieldInfo.GetValue(protobuffer,null);
  15:                      setter.SetValue(@event, value, null);
  16:                  }
  17:              }
  18:              return @event;
  19:          }

Not too bad, huh?


Except for that thisSucksINeedToFixIt variable. I should defer to some IOC container or something there, but honestly, I don’t think events should have dependencies – yeah, argument for a different day.


Ok, so the test passes except for the Guid assertion. Gunna go take a look at that now.


Ok, after a lot of googling I realized that the issue was my ASCII encoding. I doubt anyone reading this blog cares too much, but what I did to fix it was to use BitConverter.ToString() method when Serializing and some code I ripped off off stack overflow to deserialize. If you’re interested, yeah go git the code!


So there we go, I like it, but there’s still a lot of work to go before it’s ready for prime time.


Some things I have to get done.



  • Handle versioning well (the order is very important, if you added a new property with the current implementation in the middle of a class, it would break old stuff)

  • Maybe even output .proto files, and have the GoogleProtocolBufferGenerator use them if they exist, that way it’ll work by default with conventions, but allow customization, and it’ll save on the overhead of generating the contract.

Have fun with it! Thoughts?

Wednesday, February 16, 2011

What goes up must come down, Events from the cloud with the Amazon SQS Ellemy.CQRS subscriber

Ok, so I’ve got a nice little Amazon SNS publisher working for Ellemy.CQRS. So I figured I wanted to work some more with Amazon services and write a subscriber. Amazon makes a Simple Queue Service that was build to work seamlessly with their SNS, so I figured, why fight it?

Lets get to work. In keeping with the spirit of the other examples I’ve written for Ellemy.CQRS, I’m going to make this really simple. In fact I’m gunna write it to behave the same way as the NServiceBus Subscriber does. 

Here’s the user story…

As a weird person who looks at console windows, I want to see a display on my screen that shows the message text and message id every time a message is submitted on the website so that I can clap my hands and do the funky chicken.

For now, I’m just gunna put the subscription stuff in the Amazon Publishing assembly (just like I did for NService bus). I don’t really see a reason not to, and since I’m the boss of this product, what I say goes. 

Since I’ve done a lotta leg work in my last post, I’m familiar with what we’re gunna need a subscriber to do.

  1. Create a Queue if it doesn’t exist
  2. Set an Attribute on the new Queue that gives the Topic permission to send messages to it
  3. Subscribe to topics from the Topic ARN

So, lets get started. I wrote this little test.

   1:    [SetUp]
   2:          public void Arrange()
   3:          {
   4:              _config = Configure.With().AmazonPublisher();
   5:              _config
   6:                  .AwsAccessKeyId(awsKey)
   7:                  .AwsSecretKey(secret)
   8:                  .TopicArn(topicArn)
   9:                  .QueueName(tempQueueName);
  10:   
  11:              _amazonClient = AWSClientFactory.CreateAmazonSQSClient(awsKey, secret);
  12:          }
  13:          [TearDown]
  14:          public void CleanUp()
  15:          {
  16:              var listQueuesRequest = new ListQueuesRequest().WithQueueNamePrefix("TESTING");
  17:              var result = _amazonClient.ListQueues(listQueuesRequest);
  18:              foreach (var queueUrl in result.ListQueuesResult.QueueUrl)
  19:              {
  20:                  _amazonClient.DeleteQueue(new DeleteQueueRequest().WithQueueUrl(queueUrl));
  21:              }
  22:   
  23:   
  24:          }
  25:          [Test]
  26:          public void it_will_create_the_queue()
  27:          {
  28:              //lets just leave the logic in the ctor, it'll be singleton for consumers
  29:              var subscriber = new AmazonSqsSubscriber(_config);
  30:   
  31:              var listQueuesResponse = _amazonClient.ListQueues(new ListQueuesRequest());
  32:              var foundExpectedQueue = false;
  33:              foreach (var queueUrl in listQueuesResponse.ListQueuesResult.QueueUrl)
  34:              {
  35:                  if (queueUrl.Contains(tempQueueName))
  36:                      foundExpectedQueue = true;
  37:              }
  38:              Assert.IsTrue(foundExpectedQueue,"Queue was not found");
  39:   
  40:   
  41:   
  42:          }

Basically, we just spin up the Subscriber and use Amazon’s SDK to check that it did create the queue, then we kill the queue in a teardown. Note that Amazon gets all pissy if you try and create a queue with the same name as one you just deleted within 60 seconds, I got around that by (you guessed it!) waiting 60 seconds before I ran the test again. I guess we can make a unique name for the queue and using that, maybe I’ll do that….


Lets make this puppy pass. Here’s what I did.


   1:  public class AmazonSqsSubscriber
   2:      {
   3:          private readonly AmazonConfig _config;
   4:          private AmazonSQS _client;
   5:   
   6:          public AmazonSqsSubscriber(AmazonConfig config)
   7:          {
   8:              _config = config;
   9:              SetupQueue();
  10:          }
  11:   
  12:          private void SetupQueue()
  13:          {
  14:              if (!String.IsNullOrEmpty(_config.SqsQueueUrl)) return;
  15:              _client = Amazon.AWSClientFactory.CreateAmazonSQSClient(_config.AccessKeyId, _config.SecretKey);
  16:              var listQueuesResponse = _client.ListQueues(new ListQueuesRequest {QueueNamePrefix = _config.SqsQueueName});
  17:              string queueUrl;
  18:              if(listQueuesResponse.ListQueuesResult.QueueUrl.Count == 0)
  19:              {
  20:                  var response = _client.CreateQueue(new CreateQueueRequest{QueueName = _config.SqsQueueName});
  21:                  queueUrl = response.CreateQueueResult.QueueUrl;
  22:              }
  23:              else
  24:              {
  25:                  queueUrl = listQueuesResponse.ListQueuesResult.QueueUrl[0];
  26:              }
  27:              _config.SqsQueueUrl = queueUrl;
  28:          }
  29:  }

 


Kinda ugly, but yeah, that just made the test pass. I can’t see much I can do to clean it up except a little resharper extract method stuff, I won’t bore you with what I did.


Next up, lets make that subscriber actually receive messages. Amazon gives you a “RecieveMessage” call you can make, so we’ll just set up a little loop that’ll just continually call that, and dump out the results. Actually, lets just write the “test”, it’s not really a test, since there’s no assertion, I just wanna see stuff get written to my console.


   1:   [Test]
   2:          public void it_will_parse_messages()
   3:          {
   4:              var subscriber = new AmazonSqsSubscriber(_config);
   5:              
   6:              
   7:              var messageRequest = new SendMessageRequest()
   8:                  .WithMessageBody("This is a test")
   9:                  .WithQueueUrl(_config.SqsQueueUrl);
  10:              _amazonClient.SendMessage(messageRequest);
  11:   
  12:              subscriber.Start();
  13:              
  14:              subscriber.Stop();
  15:   
  16:          }

When I run this, I just wanna see that “This is a test” dumped out in my console, I guess I could make it so we inject some interface for Console, but I’m too lazy to do that right now. Let’s go make this puppy pass.


Here’s what I added.


   1:   [ThreadStatic]
   2:          private bool _stopped;
   3:          public void Stop()
   4:          {
   5:              Console.WriteLine("Stopping....");
   6:              _stopped = true;
   7:          }
   8:          public void Start()
   9:          {
  10:              ThreadPool.QueueUserWorkItem(delegate { BeginWork(); });
  11:          }
  12:          private void BeginWork()
  13:          {
  14:              while(!_stopped)
  15:              {
  16:                  ThreadPool.QueueUserWorkItem(delegate { DoWork(); });
  17:              }
  18:          }
  19:          private void DoWork()
  20:          {
  21:              var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
  22:              if (!response.IsSetReceiveMessageResult())
  23:              {
  24:                  return;
  25:              }
  26:              var messageResult = response.ReceiveMessageResult;
  27:              foreach (var message in messageResult.Message)
  28:              {
  29:                  Console.WriteLine(message.Body);
  30:              }
  31:          }
  32:      }





Run the test, and here’s what I see.


subscribertest


Looks good, although, I KNOW there’s a bug, because Amazon leaves the message up there. I’ll write another test that demonstrates the bug, but since I was too lazy to inject some “Console writer” thing that I could do assertions on, I’ll just have to use my eyes to see that it’s “failing”.


   1:    [Test]
   2:          public void the_message_is_only_processed_once()
   3:          {
   4:              var subscriber = new AmazonSqsSubscriber(_config);
   5:   
   6:   
   7:              var messageRequest = new SendMessageRequest()
   8:                  .WithMessageBody("This is a test")
   9:                  .WithQueueUrl(_config.SqsQueueUrl);
  10:              _amazonClient.SendMessage(messageRequest);
  11:   
  12:              subscriber.Start();
  13:   
  14:              Thread.Sleep(15000);
  15:              subscriber.Stop();
  16:          }


Ok… so when I run that… For some reason, it does exactly what I expect. I think it’s got something to do with Amazon putting a lock on the message or something. Regardless I know (from Amazon Documentation) that I need to manually delete the message after I’ve processed it. So, whatever, I’m gunna go do that. Here’s what I did.


   1:  private void DoWork()
   2:          {
   3:              var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
   4:              if (!response.IsSetReceiveMessageResult())
   5:              {
   6:                  return;
   7:              }
   8:              var messageResult = response.ReceiveMessageResult;
   9:              foreach (var message in messageResult.Message)
  10:              {
  11:                  Console.WriteLine(message.Body);
  12:                  Delete(message);
  13:              }
  14:          }
  15:   
  16:          private void Delete(Message message)
  17:          {
  18:              var deleteRequest = new DeleteMessageRequest()
  19:                  .WithReceiptHandle(message.ReceiptHandle)
  20:                  .WithQueueUrl(_config.SqsQueueUrl);
  21:              _client.DeleteMessage(deleteRequest);
  22:          }

 


Everything looks like it’s working.


Ok, technically, we’ve met the requirements of the AC, but we don’t wanna couple up that Console.WriteLine stuff, we wanna make this thing work so we can make Domain Event handlers handle the events that are sent in messages, I mean that was the whole point of this, right?


First of all, we’re not yet subscribing to a topic, lets rip off the code from my last post to do that.


Wow…. This made it ugly! But it’s working for now, we’ll clean up in a bit.


   1:    private void SetupQueue()
   2:          {
   3:              if (TheQueueIsAlreadySet()) return;
   4:              var queueUrl = GetOrCreateQueueUrl();
   5:              _config.SqsQueueUrl = queueUrl;
   6:              SubscribeToTopic();
   7:          }
   8:   
   9:          private void SubscribeToTopic()
  10:          {
  11:              SetPermissions();
  12:              var getArnRequest = new GetQueueAttributesRequest().WithQueueUrl(_config.SqsQueueUrl).WithAttributeName("QueueArn");
  13:              var clientArn = _client.GetQueueAttributes(getArnRequest).GetQueueAttributesResult.Attribute[0].Value;
  14:   
  15:              var sns = Amazon.AWSClientFactory.CreateAmazonSNSClient(_config.AccessKeyId,
  16:                                                                      _config.SecretKey);
  17:   
  18:              var subscriptionRequest = new SubscribeRequest()
  19:                  .WithEndpoint(clientArn)
  20:                  .WithProtocol("sqs")
  21:                  .WithTopicArn(_config.TopicAccessResourceName);
  22:   
  23:              sns.Subscribe(subscriptionRequest);
  24:          }
  25:   
  26:          private void SetPermissions()
  27:          {
  28:              var setQueueAttributeRequest = new SetQueueAttributesRequest()
  29:                 .WithQueueUrl(_config.SqsQueueUrl)
  30:                 .WithAttribute(new Attribute { Name = "Policy", Value = AllowSnsAttribute() });
  31:              _client.SetQueueAttributes(setQueueAttributeRequest);
  32:          }

You’ll recognize a lot of this code from my last blog, so it’s not new. I really need to move out the concept of subscribing in it’s own little class, but I’ll get to that soon enough.


Changed my test to look like this.


   1:  [Test]
   2:          public void the_message_is_only_processed_once()
   3:          {
   4:              var subscriber = new AmazonSqsSubscriber(_config);
   5:   
   6:              var @event = new TestEvent {SomeGuid = Guid.NewGuid(), SomeInt = 1, SomeString = "Some String"};
   7:              _publisher.Publish(@event);
   8:             
   9:              subscriber.Start();
  10:             //Ugly, but I wanna make sure the message arrives
  11:              Thread.Sleep(3000);
  12:              
  13:              subscriber.Stop();
  14:             
  15:          }
  16:          

 


here’s what I see.


publishedevent






Sweet! Now events published (line 7 in the above code) actually do make it to my subscriber. Next, we’re gunna actually deserialize that Message property to get the actual Domain Event instead of just doing a Console.Write(message.Body).


Ok, first things first, we’ve gotta be able to load the type. I added a cute little method to allow consumers to specify what assembly (or assemblies) the Events they’ll be publishing live in to my AmazonConfig class like so.


   1:   public AmazonConfig EventsAreInAssemblyContainingType<TEvent>()
   2:          {
   3:              if (EventAssemblies == null)
   4:                  EventAssemblies = new List<Assembly>();
   5:              EventAssemblies.Add(typeof(TEvent).Assembly);
   6:              return this;
   7:          }

 


Just call that method in my test, and changed my AmazonSubscriber to look like this. Bear with the Ugly for now, I’ll be cleaning it, the point of this isn’t to show how clean I can make code, it’s to show that I can get it to work. Pull down the code from git, you’ll see that I clean my stuff up darnit! Anyway, here’s the (very ugly) code.


   1:  private void DoWork()
   2:          {
   3:              var response = _client.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = _config.SqsQueueUrl });
   4:              if (!response.IsSetReceiveMessageResult())
   5:              {
   6:                  return;
   7:              }
   8:              var messageResult = response.ReceiveMessageResult;
   9:              foreach (var message in messageResult.Message)
  10:              {
  11:                  var serializer = new JavaScriptSerializer();
  12:                  var thing = (Dictionary<String,Object>)serializer.DeserializeObject(message.Body);
  13:                  Type eventType = null;
  14:                  foreach (var eventAssembly in _config.EventAssemblies)
  15:                  {
  16:                      var t = eventAssembly.GetType((string)thing["Subject"]);
  17:                      if(t!=null)
  18:                      {
  19:                          eventType = t;
  20:                          break;
  21:                      }
  22:                  }
  23:                  if (eventType == null)
  24:                      throw new ConfigurationException(
  25:                          String.Format(
  26:                              "Could not load type {0}, please make sure you call AmazonConfig.EventsAreInAssemblyContainingType<TEvent>() during bootstrap.",
  27:                              thing["Subject"]));
  28:   
  29:                  var @event = serializer.Deserialize((string)thing["Message"], eventType);
  30:                  Console.WriteLine(@event);
  31:                  Delete(message);
  32:              }
  33:          }

 


Still doing that Console.WriteLine, of course, but yeah, now when I run the test, I’m getting “AmazonTests.TestEvent” instead of that Json string. Now, lets just change that Console.WriteLine to instead locate all the event handlers for the event and execute them.


Once again, shut up about ugly, this is RED/GREEN/CLEAN land, clean is last, I wanna make it work before the code looks pretty.


I changed that line 30 from above to this.


   1:    var @event = serializer.Deserialize((string)thing["Message"], eventType);
   2:                  var handlerInterface = typeof(IDomainEventHandler<>).MakeGenericType(eventType);
   3:                  foreach (var handler in _config.EllemyConfiguration.ObjectBuilder.BuildAll(handlerInterface))
   4:                  {
   5:                      var handlerMethod = handler.GetType().GetMethod("Handle");
   6:                      handlerMethod.Invoke(handler, new [] { @event });
   7:                  }
   8:                  Delete(message);

 


I’ll then go write a quick hander write in the test assembly that’ll do the Console.WriteLine, just so I can still see all that pretty stuff. Like this.


   1:   public class TestEvent : IDomainEvent
   2:      {
   3:          public Guid SomeGuid { get; set; }
   4:          public String SomeString { get; set; }
   5:          public int SomeInt { get; set; }
   6:      }
   7:      public class ConsoleWriter : IDomainEventHandler<TestEvent>{
   8:          public void Handle(TestEvent @event)
   9:          {
  10:              Console.WriteLine("SomeGuid: \t{0}",@event.SomeGuid);
  11:              Console.WriteLine("SomeInt: \t {0}",@event.SomeInt);
  12:              Console.WriteLine("SomeString: \t{0}",@event.SomeString);
  13:          }
  14:      }




 


To make it all work, I changed the setup method in my test to look like this.


   1:    [SetUp]
   2:          public void Arrange()
   3:          {
   4:   
   5:              tempQueueName = "Test_QUEUE";
   6:              _config = Configure.With()
   7:                  .StructureMapBuilder()
   8:                  .HandlersAreInAssemblyContainingType<TestEvent>()
   9:                  .AmazonPublisher();
  10:              
  11:              _config
  12:                  .AwsAccessKeyId(awsKey)
  13:                  .AwsSecretKey(secret)
  14:                  .TopicArn(topicArn)
  15:                  .QueueName(tempQueueName)
  16:                  .EventsAreInAssemblyContainingType<TestEvent>();
  17:   
  18:              _publisher = new AmazonPublisher(_config);
  19:   
  20:              _amazonClient = AWSClientFactory.CreateAmazonSQSClient(awsKey, secret);
  21:   
  22:          }


So, note that we’re (in line 6) telling it to use my StructureMapBuilder, and telling it where the DomainEventHandlers are (line 8).


Lets give the test a run.


finished


NICE!


Lots of stuff going on there, and I’ve got a lot of cleaning to do, but it’s working! Freel free to pull down the code from github. If you’d like to contribute, I’m game too!


Have fun with it all, and I hope this is helpful.

Tuesday, February 15, 2011

Playing around in the Cloud, Some exploratory testing before I can write a Amazon SQS subscriber

So, in my quest to send everything to the cloud, I wanna build a subscriber that uses Amazon SQS to get messages.  What is Amazon SQS you ask? Here’s what they say….

Amazon Simple Queue Service (Amazon SQS) offers a reliable, highly scalable, hosted queue for storing messages as they travel between computers. By using Amazon SQS, developers can simply move data between distributed components of their applications that perform different tasks, without losing messages or requiring each component to be always available. Amazon SQS makes it easy to build an automated workflow, working in close conjunction with the Amazon Elastic Compute Cloud (Amazon EC2) and the other AWS infrastructure web services.
Amazon SQS works by exposing Amazon’s web-scale messaging infrastructure as a web service. Any computer on the Internet can add or read messages without any installed software or special firewall configurations. Components of applications using Amazon SQS can run independently, and do not need to be on the same network, developed with the same technologies, or running at the same time.

Yeah, it’s just a Queuing service. I’m gunna link my Amazon SNS Event publisher up with SQS and be all cool Amazonish. Unfortunately,  SQS doesn’t have a cute little console like SNS did, so I’m gunna spend a little time doing some exploratory testing before I go wire all this up to Ellemy.CQRS. I really want my Amazon stuff to be easy and friendly to use with Ellemy.CQRS. I figure that if I get all my hacking out of the way with this testing, I’ll have my head wrapped around everything by the time I’m ready to write my subscriber.

Ok, lets get to it.

First of all, we’ve gotta create a Queue. The Amazon Documentation says it’s pretty simple. Let’s give that a spin, but I’ll use NUnit as my runner (instead of a Console app).

I wrote this.

   1:  [TestFixture]
   2:      public class check_message_tests
   3:      {
   4:          private AmazonSQSClient _client;
   5:          [SetUp]
   6:          public void create_client()
   7:          {
   8:              _client = new AmazonSQSClient("MyAccountId", "MySecret");
   9:          }
  10:          [Test]
  11:          public void create_queue()
  12:          {
  13:              var request = new CreateQueueRequest().WithQueueName("Test_for_blog");
  14:              var response = _client.CreateQueue(request);
  15:              Console.WriteLine(response.CreateQueueResult.QueueUrl);
  16:   
  17:          }
  18:  }


Here’s the results….


Capture


Sweet… I guess that means I created a Queue!


Let’s go check from Amazon what Queues I have… That’s simple enough. Here’s what I did.


   1:  [Test]
   2:          public void list_queues()
   3:          {
   4:              var response = _client.ListQueues(new ListQueuesRequest());
   5:              foreach (var result in response.ListQueuesResult.QueueUrl)
   6:              {
   7:                  Console.WriteLine(result);
   8:              }
   9:          }

 


I won’t do another screen capture…  Just take my word for it, the test passed and I saw that Queue show up in the console. This is nice, because I can write a little utility that’ll check if the requested Queue exists for my subscriber, if not, we’ll use the code from that first test to create one.


Lets go send and receive a message it, just to get familiar with this stuff. I wrote this.


   1:  [Test]
   2:          public void send_and_receive_messages()
   3:          {
   4:              var url = "https://queue.amazonaws.com/xxxxxxxxx/Test_for_blog";
   5:   
   6:              Console.WriteLine("Sending Message");
   7:              var sendMessageRequest = new SendMessageRequest()
   8:                  .WithQueueUrl(url)
   9:                  .WithMessageBody("Hello from the cloud");
  10:              
  11:              var sendResult = _client.SendMessage(sendMessageRequest);
  12:              Console.WriteLine(sendResult.ToXML());
  13:   
  14:              Console.WriteLine("Receiving Message");
  15:              var request =
  16:                  new ReceiveMessageRequest().
  17:                  WithQueueUrl(url);
  18:   
  19:              var result = _client.ReceiveMessage(request);
  20:              foreach (var message in result.ReceiveMessageResult.Message)
  21:              {
  22:                  Console.WriteLine(message.Body);
  23:              }
  24:          }



Ok, so after screwing around for a while, I had some help from a buddy, on what I needed to do. Basically, if we’re gunna allow SNS to Send messages to SQS, we have to send a Queue Attribute up to SQS. I did it like this (this is a “trust me here” kinda thing).


 


   1:  [Test]
   2:          public void set_permissions()
   3:          {
   4:              var setQueueAttributeRequest = new SetQueueAttributesRequest()
   5:                  .WithQueueUrl(_queueUrl)
   6:                  .WithAttribute(new Attribute { Name = "Policy", Value = AllowSnsAttribute() });
   7:              var result = _client.SetQueueAttributes(setQueueAttributeRequest);
   8:              Console.WriteLine(result.ToXML());
   9:   
  10:              var getQueueAttributesResponse = _client.GetQueueAttributes(
  11:                  new GetQueueAttributesRequest().WithAttributeName("Policy").WithQueueUrl(_queueUrl));
  12:              foreach (var attribute in getQueueAttributesResponse.GetQueueAttributesResult.Attribute)
  13:              {
  14:                  Console.WriteLine("{0} : {1}",attribute.Name,attribute.Value);
  15:              }
  16:          }
  17:          private string AllowSnsAttribute()
  18:          {
  19:              return @"{""Version"": ""2008-10-17"",""Statement"": [{""Resource"": ""arn:aws:sqs:us-east-1:XXXXX:Test_for_blog"", ""Effect"": ""Allow"", ""Sid"": ""1"", ""Action"": ""sqs:*"", ""Condition"": {""StringEquals"": {""aws:SourceArn"": ""arn:aws:sns:us-east-1:XXXXX:EventMessage""}}, ""Principal"": {""AWS"": ""*""}}]}";
  20:          }

Yeah, so it’s pretty nifty actually, but for now, just trust me that we HAVE TO do that to make stuff work. I’ll save this code, cause I’ll be using it later when I start automagically wiring stuff up.


Next up I think I wanna publish from Amazon SNS and receive the resulting message. To do that, I’m gunna need to subscribe to a Topic ARN. Here’s what I did.


   1:   [Test]
   2:          public void subscribe_to_topic()
   3:          {
   4:              var getArnRequest = new GetQueueAttributesRequest().WithQueueUrl(_queueUrl).WithAttributeName("QueueArn");
   5:              var clientArn = _client.GetQueueAttributes(getArnRequest).GetQueueAttributesResult.Attribute[0].Value;
   6:   
   7:              var sns = Amazon.AWSClientFactory.CreateAmazonSNSClient(awsKeyId,
   8:                                                                      secret);
   9:   
  10:              var subscriptionRequest = new SubscribeRequest()
  11:                  .WithEndpoint(clientArn)
  12:                  .WithProtocol("sqs")
  13:                  .WithTopicArn("arn:aws:sns:us-east-1:xxxxx:EventMessage");
  14:              
  15:              var response = sns.Subscribe(subscriptionRequest);
  16:              Console.WriteLine(response.SubscribeResult.SubscriptionArn);
  17:          }


Works like a charm, cause when I go to my AWS console, I see the new subscription. Cool!


Next up, we’re gunna manually publish a message via the AWS console, and rip out the code from that send and receive test (just the receive part) and make sure that the message we published shows. First I fire up the AWS Console and  hit the little publish to topic button. It looks like this….


publishtotopic


I’ll type in some mumbo jumbo and run a test that looks like this.


 


   1:   [Test]
   2:          public void receive_message()
   3:          {
   4:              var request =
   5:                new ReceiveMessageRequest().
   6:                WithQueueUrl(_queueUrl);
   7:              var result = _client.ReceiveMessage(request);
   8:              foreach (var message in result.ReceiveMessageResult.Message)
   9:              {
  10:                  Console.WriteLine(message.Body);
  11:              }
  12:          }


 


And the results?


successfulpublish


Wow! Done… So, now I’ve proven that I can indeed get all this technology to work together… Next up, is doing it!


Stay tuned!

Thursday, February 10, 2011

Ellemy.CQRS goes to the cloud, The Amazon SNS Event Publisher

So, long story short, I left my old job. Starting a new one with a very exciting company. The architect there apparently LOVES Amazon services. So, I figured I’d go write a quick Event publisher for Ellemy.CQRS using Amazon Simple Notification Service. Won’t hurt to be familiar with it, and sounds kinda cool.

First of all, I signed up for the service,  it’s essentially free. Yes, they want a CC card, but they charge nothing as long as you don’t exceed a gig a month, and it’s $0.19 per gig after that. I’m not gunna even approach a gig, so I’m fine.

First of all, I went and configured a Topic, If you’re an NServiceBus guy (like me), a Topic is essentially a Queue – someone feel free to correct me there, but that’s how I think of it.

You can make API calls all day to administer topics, but that’s not the point of this blog, the point of this blog is to send a message out to the cloud. So I’m gunna get a topic created as quick as possible.  Amazon made a nice little web interface to do that.

I logged in with my Amazon Account and added a topic, really simple.

createtopic

I named my topic, and got redirected to a nice little screen that gave me the Topic ARN. We’ll need that along with the Key Id, and Secret. You can add subscribers from the next page, and one of the options is Email (JSON). That’s pretty cool. I just select that. Easy way to check if the message is publishing ok.

Let’s get in some code now….. We need to implement a new version of IEventPublisher.  Here’s what it looks like.

   1:  namespace Ellemy.CQRS.Event
   2:  {
   3:      public interface IEventPublisher
   4:      {
   5:          void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent;
   6:      }
   7:  }

We’re just gunna send the event off to the cloud there.


I can hand roll GET/POST/PUT all day, but I really don’t feel like doing all the plumbing so I downloaded the Amazon AWS .NET SDK.  It’s got a nice wrapper around all that stuff. Spun up the object browser and looked around and saw this puppy.


objectbrowser


I think that’s exactly what we need.  Let’s just do the simpliest thing first and new one of those up in the publisher. I’ll clean up in a second.


   1:  public class AmazonPublisher : IEventPublisher
   2:      {
   3:          public AmazonPublisher()
   4:          {}
   5:   
   6:          public void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent
   7:          {
   8:              var serializer = new JavaScriptSerializer();
   9:              var payload = serializer.Serialize(@event);
  10:              var client = new AmazonSimpleNotificationServiceClient("myAccessId",
  11:                                                                     "mysecretKey");
  12:   
  13:              var request = new PublishRequest
  14:                                {
  15:                                    Message = payload,
  16:                                    Subject = @event.GetType().AssemblyQualifiedName,
  17:                                    TopicArn = "arn:aws:sns:us-east-1:451419498740:EventMessage"
  18:                                };
  19:              client.Publish(request);
  20:   
  21:   
  22:   
  23:          }
  24:   
  25:          
  26:      }

Lets go tell Ellemy.CQRS to use it. We’ll make a pretty Extension method in a bit, for now, lets do it ugly.


   1:  Configure.With()
   2:                  .StructureMapBuilder(ObjectFactory.Container)
   3:                  .CommandExecutorsAreInAssemblyContainingType<CreateMessage>()
   4:                  .HandlersAreInAssemblyContainingType<MessageReadModel>()
   5:                  //.NServiceBusPublisher(ObjectFactory.Container.GetInstance<IBus>());
   6:                  .PublishEventsWith(new AmazonPublisher());

Let’s go fire it ok, here goes nuthin!


BAM!


message


How freekin awesome is that?


A little cleanup now…. First lets make a extensions class to make it so people don’t have to new up that AmazonPublisher object, and let them send in the config stuff. I’ve kinda blogged on how I like doing this before, but what the heck? I’ll do it again…


First of all, I’ll create a little fluent config to allow users to configure their amazon KeyId, Secret and TopicArn. In a future release I’m gunna create a Factory to resolve TopicArn by domain event type, but for now, let’s stick to a single TopicArn.


Here’s what I cooked up.


   1:   public class AmazonPublisherConfig
   2:      {
   3:          private readonly Configuration _configuration;
   4:   
   5:          public AmazonPublisherConfig(Configuration configuration)
   6:          {
   7:              _configuration = configuration;
   8:          }
   9:   
  10:   
  11:          internal string AccessKeyId { get; private set; }
  12:          internal string SecretKey { get; private set; }
  13:          internal string TopicAccessResourceName { get; private set; }
  14:   
  15:          public AmazonPublisherConfig AwsAccessKeyId(string accessKeyId)
  16:          {
  17:              AccessKeyId = accessKeyId;
  18:              return this;
  19:          }
  20:   
  21:          public AmazonPublisherConfig AwsSecretKey(string awsSecretKey)
  22:          {
  23:              SecretKey = awsSecretKey;
  24:              return this;
  25:          }
  26:   
  27:          public AmazonPublisherConfig TopicArn(string value)
  28:          {
  29:              TopicAccessResourceName = value;
  30:              return this;
  31:          }
  32:          public Configuration CreatePublisher()
  33:          {
  34:              _configuration.PublishEventsWith(new AmazonPublisher(this));
  35:              return _configuration;
  36:          }
  37:      }

At some point I’ll let config files do this, but for now, lets keep it simple.  Basically, we just build up the config values fluently, and call CreatePublisher and that’ll set the IDomainEventPublisher to the AmazonPublisher. Now lets just clean up the AmazonPublisher.


   1:   public class AmazonPublisher : IEventPublisher
   2:      {
   3:          private readonly AmazonSimpleNotificationServiceClient _client;
   4:          private readonly string _topicArn;
   5:   
   6:          internal AmazonPublisher(AmazonPublisherConfig config)
   7:          {
   8:              _client = new AmazonSimpleNotificationServiceClient(config.AccessKeyId, config.SecretKey);
   9:              _topicArn = config.TopicAccessResourceName;
  10:          }
  11:         public void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent
  12:          {
  13:              var serializer = new JavaScriptSerializer();
  14:              var payload = serializer.Serialize(@event);
  15:              
  16:              var request = new PublishRequest
  17:                                {
  18:                                    Message = payload,
  19:                                    Subject = @event.GetType().Name,
  20:                                    TopicArn = _topicArn
  21:                                };
  22:              _client.Publish(request);
  23:          }
  24:      }

Nice and simple, huh? I’m setting the Subject to the event type name right now, and we’ll create a subscriber soon that’ll use that to go find the IDomainEventHandlers for type with the name that matches the subject. Not sure if that’ll work or not, but we’ll see soon enough huh? I’m just JSON serializing the Event. I think that’ll work great.


Lets fire her up again, and see. Hit the UI, fire off a message, and here’s what I get (well, after I took an eraser to the signature so you guys don’t see how many terabytes of messages you can send up to the cloud and charge up my credit card).


hellofromthecloud


I’m having a blast working on this stuff, and I see that I’m getting a few watches on github, Folks, please leave comments with what you’d like to see added to it, or any other blog requests.


Have fun with it!