Saturday, February 5, 2011

Publishing Events, cause sometimes everyone justs gotta know–Ellemy.CQRS


Ok, so my examples working pretty good. Ellemy.CQRS is doing it’s job pretty well. In fact (even though they don’t realize it), a lot of people are using it and liking it – in spite of the arguments about CQRS being “too complex”. I consider this a victory. But hey, whatever!

The way things currently work though is that all event and command handling is done in process. When I wrote the first pass, this was a strategic decision – in fact it was one of my stated goals. But, lets move this puppy to real world. Like, in a scenario where we actually want durable messaging. 

So, I created a simple interface.

   1:  public interface IEventPublisher
   2:      {
   3:          void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent;
   4:      }

and in my DomainEvents class, I made a small change in the Publish method. It now looks like this.

   1:  public static void Publish()
   2:          {
   3:              if (handlerActions != null)
   4:              {
   5:                  handlerActions.ForEach(a => a());
   6:              }
   7:              var publisher = Configure.CurrentConfig.EventPublisher;
   8:              if(unpublishedEvents != null){
   9:              foreach (var unpublishedEvent in unpublishedEvents)
  10:              {
  11:                  publisher.Publish(unpublishedEvent);
  12:              }
  13:              unpublishedEvents.Clear();
  14:              }
  15:          }

I just added lines 7-12. I can clean this up a lot (and I will), but I wanted to leave this method all inline so you guys can see what it looks like. Line 7 grabs the currently configured instance of IEventPublisher. I wrote this one really quick to use if you don’t care to publish to remote systems.

   1:   public class NoOpPublisher : IEventPublisher
   2:      {
   3:          public void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent:IDomainEvent
   4:          {
   6:          }
   7:      }

And made that the default by just newing one up in my Configuration class.

Lets get to the good stuff now. I’m a big NServiceBus fan, so I wanna make a implementation that pushes out a NSB message for the events. NSB requires us to mark Messages with IMessage. I don’t want to bleed that into my domain, so we’re gunna make a quick class that is an IMessage, but contains the DomainEvent so that subscribers can get to it.

It looks like this.

   1:    [Serializable]
   2:      public class EventMessage<T> : IMessage
   3:         where T : IDomainEvent
   4:      {
   5:          public EventMessage(T @event)
   6:          {
   7:              Payload = @event;
   8:          }
   9:          public EventMessage(){}
  10:          /// <summary>
  11:          /// Gets or sets transported event.
  12:          /// </summary>
  13:          public T Payload { get; set; }
  14:      }

So, next we’re gunna create an implementation of IEventPublisher that sends this puppy off to the bus, it’s really really simple.

   1:  public class NServiceBusPublisher : IEventPublisher
   2:      {
   3:          private readonly IBus _bus;
   5:          public NServiceBusPublisher(IBus bus)
   6:          {
   7:              _bus = bus;
   8:          }
  10:          public void Publish<TDomainEvent>(TDomainEvent @event) where TDomainEvent:IDomainEvent
  11:          {
  12:              var message = new EventMessage<TDomainEvent>(@event);
  13:              _bus.Send(message);
  14:          }
  15:      }

I just did a Send through NSB for now. I’ll move that to a publish once I get commands sent off, so that I won’t be publishing from a website. See here for why that’s a bad idea.

Next up, how to we tell Ellemy.CQRS to use this puppy? In my fluent Configure class I simply added this.

   1:   public Configuration PublishEventsWith(IEventPublisher publisher)
   2:          {
   3:              EventPublisher = publisher;
   4:              return this;
   5:          }
   6:          internal IEventPublisher EventPublisher { get; private set; }

And in the assembly that contains the NSB stuff, I wrote an extension method for the configure class like so.

   1:  public static class ConfigurationExtensions
   2:      {
   3:          public static Configuration NServiceBusPublisher(this Configuration configuration,IBus bus)
   4:          {
   5:              configuration.PublishEventsWith(new NServiceBusPublisher(bus));
   6:              return configuration;
   7:          }  
   8:      }

Back to my website, I change my bootstrap code to look like this.

   1:    Configure.With()
   2:                  .StructureMapBuilder(ObjectFactory.Container)
   3:                  .CommandExecutorsAreInAssemblyContainingType<CreateMessage>()
   4:                  .HandlersAreInAssemblyContainingType<MessageReadModel>()
   5:                  .NServiceBusPublisher(ObjectFactory.Container.GetInstance<IBus>());

That’s really all we’ve gotta do to get Ellemy.CQRS to work.

But we’ve gotta now make NServiceBus play nice. Since we’re only sending one type of message I set my config for the client like this.

   1:  <UnicastBusConfig>
   2:      <MessageEndpointMappings>
   3:        <add Messages="Ellemy.CQRS.Publishing.NServiceBus.EventMessage`1[[Ellemy.CQRS.Event.IDomainEvent, Ellemy.CQRS]],Ellemy.CQRS.Publishing.NServiceBus" Endpoint="myserverinputqueue" />
   4:      </MessageEndpointMappings>
   5:    </UnicastBusConfig>


Now, when we add a message through the example website, we see this in MSMQ


Ok, we know we’re now sending off the message! Next blog, I’ll write a subscriber that’ll print stuff out to the console that it received the message. Or you can just git the code…

Have fun with it!

No comments:

Post a Comment