Tuesday, February 8, 2011

Subscribing, you know, because you care. Ellemy.CQRS and NServiceBus subscriptions

See, I keep my promises!

Ok in my last blog I was able to write an NServiceBusPublisher that pushes out message using NServiceBus. I quit the writing after I showed the MSMQ admin screen and that the message actually did make it to the queue.

As a side note, I once pushed a product all the way to production spelling queue, que. It really pissed off our DBA since we where using NHibernate Automapping and the table was mispelled, he really got his panties in a wad over it. It was pretty dramatic.Wow, how did I get on that?

Where were we? Oh yeah, so, I wanna write a quick little NServiceBusHost.exe app that’ll just print out the message to the console. I really don’t care what happens with it, just wanna see that we can subscribe and react to the message. Spin up my App.Config MsmqTransport config to grab from the queue the client in sending messages to.

   1:    <MsmqTransportConfig
   2:      InputQueue="MyServerInputQueue"
   3:      ErrorQueue="error"
   4:      NumberOfWorkerThreads="1"
   5:      MaxRetries="5"
   6:    />

I’m a StructureMap guy, so I wire up my container in my Endpoint config like so.

   1:   public class EndpointConfig : IConfigureThisEndpoint, AsA_Server,IWantCustomInitialization
   2:      {
   3:         public void Init()
   4:         {
   5:             ObjectFactory.Initialize(r => r.Scan(scan =>
   6:                                                      {
   7:                                                          scan.AssemblyContainingType<EndpointConfig>();
   8:                                                          scan.AddAllTypesOf(typeof (IDomainEventHandler<>));
   9:                                                      }));
  13:          Configure.With().StructureMapBuilder().BinarySerializer().UnicastBus().LoadMessageHandlers();
  14:         }
  15:      }


This is kinda outside the scope of this post, but the AddAllTypesOf is something that trips up everyone, they use ConnectImplementationsToTypeClosing. That’ll only give you one Handler, that’s what you DO NOT want for handling events – that’s fine for commandhandlers, but events – no, we might have multiple so yeah, just use AddAllTypesOf.


Anyway, yeah, I wire up SM to locate all Domain Event handers and tell NServiceBus to use the StructureMapBuilder, and to Load the message handlers.


Note that in my last blog, I made a message that wraps up domain events, and marked that with IMessage, so while we may have multiple handlers for domain events, we only have one messages that will get sent to the bus. Just a refresher – here it is.


   1:  [Serializable]
   2:      public class EventMessage<T> : IMessage
   3:         where T : IDomainEvent
   4:      {
   5:          public EventMessage(T @event)
   6:          {
   7:              Payload = @event;
   8:          }
   9:          public EventMessage(){}
  10:          /// <summary>
  11:          /// Gets or sets transported event.
  12:          /// </summary>
  13:          public T Payload { get; set; }
  14:      }
So the idea is to have a Handler for EventMessage<IDomainEvent> and have it locate all IDomainEventHandlers for the event in the message. Not horribly complicated actually. But since the compilers not smart enough to figure out the type, we’ve gotta do a little reflection magic…Let’s take a look.

   1:   public class MessageHandler: IHandleMessages<EventMessage<IDomainEvent>>
   2:      {
   3:          private readonly IContainer _container;
   5:          public MessageHandler(IContainer container)
   6:          {
   7:              _container = container;
   8:          }
   9:          public void Handle(EventMessage<IDomainEvent> message)
  10:          {
  11:              var handlerInterface = typeof(IDomainEventHandler<>).MakeGenericType(message.Payload.GetType());
  12:              foreach (var handler in _container.BuildAll(handlerInterface))
  13:              {
  14:                  var handlerMethod = handler.GetType().GetMethod("Handle");
  15:                  handlerMethod.Invoke(handler, new object[] { message.Payload });
  16:              }
  17:          }
  18:      }

So, we build the proper type by interrogating the message payload for type, and build the appropriate generic. Once we’ve got that we use the container to get all the handlers. Oh, a little side note, that IContainer is the NServiceBus IOC container, NOT a StructureMap container. That way people can use the IOC container of their choice.

Once I’ve got all the handlers, we just use reflection to invoke the Handle method on each of them. Simple enough right?

Next up, I’ll go build a quick little handler for my MessageCreated event (that’s the one that is raised from the website)that dumps stuff out in the console window. It looks like this.

   1:  public class PrintStuffToConsole : IDomainEventHandler<MessageCreated>
   2:      {
   3:          public void Handle(MessageCreated @event)
   4:          {
   5:              Console.WriteLine("Recieved Message Created");
   6:              Console.WriteLine("Text:{0}",@event.Text);
   7:              Console.WriteLine("Id:{0}",@event.MessageId);
   8:          }
   9:      }

There we have it. Now lets just go run NServiceBusHost.exe and see what we get.

<(null)> - MessageHandler Failed handling message.
StructureMap.StructureMapException: StructureMap Exception Code:  202
No Default Instance defined for PluginFamily NServiceBus.ObjectBuilder.Common.IC
ontainer, NServiceBus.Core, Version=, Culture=neutral, PublicKeyToken=

OOPS! That’s not what I wanted. This surprised me, I kinda figured that StructureMapObjectBuilder would tell NServiceBuses IOC container to use itself when asked for an IContainer, guess I was wrong, I’ll bug Andreas about that, see if we can get that fixed. Until then, lets just go tell StructureMap that when we ask for an NSB IContainer, use the StructureMapObjectBuilder.

That’s pretty easy… We just change or Endpoint config a tiny bit so we end up with this.

   1:  using Ellemy.CQRS.Event;
   2:  using NServiceBus;
   3:  using NServiceBus.ObjectBuilder.StructureMap262;
   4:  using StructureMap;
   5:  using IContainer = NServiceBus.ObjectBuilder.Common.IContainer;
   7:  namespace Ellemy.CQRS.NServiceBusSubscriber
   8:  {
   9:      public class EndpointConfig : IConfigureThisEndpoint, AsA_Server, IWantCustomInitialization
  10:      {
  11:          public void Init()
  12:          {
  13:              ObjectFactory.Initialize(r =>
  14:                                           {
  15:                                               r.Scan(scan =>
  16:                                                          {
  17:                                                              scan.AssemblyContainingType<EndpointConfig>();
  18:                                                              scan.AddAllTypesOf(typeof (IDomainEventHandler<>));
  19:                                                          });
  20:                                               r.For<IContainer>().Use
  21:                                                   <StructureMapObjectBuilder>();
  22:                                           }
  23:                  );
  26:              Configure.With().StructureMapBuilder().BinarySerializer().UnicastBus().LoadMessageHandlers();
  27:          }
  28:      }
  29:  }

Now, give ‘er a spin and here’s what we get.


It’s a happy day in Ellemy.CQRS land!

Hope this helped, and have fun. Let me know if you’ve got any questions!


No comments:

Post a Comment