Latest Replies
Monday
Jun212010

Lokad.CQRS - Intro into App Engine Architecture, Multiple Message Subscribers

This is the next article in the Learning series for Lokad.CQRS Guidance. In the previous one we've talked about the CQRS message basics and Message Handling Feature of Lokad.Engine. In this article we'll cover the extensible architecture of Lokad.CQRS App Engine, while focusing on the scheduled tasks and multiple message handlers.

Lokad.CQRS App Engine Flexibility

The ICloudEngineHost instance that we have been configuring and running in the previous sample is actually an instance of Lokad.CQRS App Engine for Windows Azure. Basically, it's the application server with core functionality needed to build scalable and flexible solutions on top of the Command-Query Responsibility Segregation architecture principles.

What functionality does this App Engine actually need to provide?

  • Inversion of Control infrastructure to handle the components (see Component-Driven Development).
  • Enterprise Service Bus Functionality (message handling, pub/sub and delivery).
  • View Adapters to denormalize and publish events into the specific persistence.
  • Seamless access to the persistence engine of your choice.

Already at this point we will discover that different projects will have various requirements for the implementations of this functionality. For example, some would want to use Linq To SQL ORM, while others would prefer NHibernate or even event sourcing. Another frequent sample is with the serialization format of the messages - Binary, XML, Google Protocol Buffers, Data Contracts are just some of the options out there.

If we start digging further, more various requirements will show up later in the life cycle of the project. More than that, they will continuously change, just like as the technologies, frameworks and patterns.

For example, you might need to:

  • Integrate with an external slow processing system (i.e. SEVIS) that takes many hours to handle some request.
  • Issue a heart-beat message or publish statistics for the enterprise monitoring system.
  • Integrate with your ORM, while automatically handling sessions, transactions and flushing.
  • Implement custom self-tuning heuristics that auto-provision more Azure Worker roles depending on the queue length, while dismissing them afterwards.

I'm sure there should be a lot more scenarios in your domain. That's how it always happens.

In order to be able to successfully deliver and evolve your solution in the ever-changing world, it's infrastructure has to be robust, scalable and flexible.

That's how the Lokad.CQRS .NET App Engine and Application Blocks were designed. They are powered by Autofac IoC, which takes care of the component configuration and management. This allows for the flexible and extensible application core that allows to create scalable solutions. Seamless integration with MEF provides some additional BCL-level extensibility.

Fluent Configuration API is built upon the Autofac Modules, providing friendly syntax to enable and configure various features of Lokad.CQRS App Engine. Sensible default values, syntax helpers and some sanity checks are implemented in these modules, in order to make things simpler for developers using Lokad.CQRS.

Lokad.CQRS App Engine Configuration Fluent API

NB: Please, keep in mind, that currently available features represent only a limited subset of existing features, that are planned to be ported to open source by Lokad.com.

In the previous sample we've already covered two most essential features:

We've been using them to implement a ping-pong sample. Let's take App Engine a bit further by exploring into the Task Scheduler and some more functionality from the Message Handling.

Sample 02 - Recurring Payments

Sources for this sample are included in the Lokad.CQRS source code.

Lokad CQRS App Engine Sample Application

In Sample 02 we'll be using Scheduling Feature in order to put multiple instances of SendPaymentCommand into our Azure Queue named "sample-02". Message Handling will process all messages from this queue, dispatching them to all available consumers at once.

You can go ahead and run Sample-02 in Windows Azure Simulation Environment or follow with the explanations first.

Let's start with the implementation of the task scheduler. We'll inherit from IScheduledTask from Lokad.Cqrs.Default namespace (although you could define your own interface to keep code decoupled from Lokad.Cqrs assemblies):

public sealed class SendPaymentsSometimes : IScheduledTask
{
  readonly IMessageClient _sender;

  public SendPaymentsSometimes(IMessageClient sender)
  {
    // sender will be injected by the IoC
    _sender = sender;
  }

  public TimeSpan Execute()
  {
    var amount = (Rand.NextDouble()*100).Round(1);
    // send new message
    _sender.Send(new SendPaymentMessage(amount));
    // sleep for
    return 3.Seconds();
  }
}

As you can see, we ask infrastructure for the IMessageClient implementation, that will be provided by the Lokad.CQRS App Engine infrastructure. Piece of configuration responsible for auto-wiring all available task definitions looks like this one:

builder
  .RunTasks(m => { m.WithDefaultInterfaces().InCurrentAssembly(); })

This is a part of the larger configuration code for the Lokad.CQRS App Engine hosted in Windows Azure Worker Role:

protected override ICloudEngineHost BuildHost()
{
  return new CloudEngineBuilder()
    // this tells the server about the domain
    .Domain(d =>
      {
        d.WithDefaultInterfaces();
        d.InCurrentAssembly();
      })
    // we'll handle all messages incoming to this queue
    .HandleMessages(mc =>
      {
        mc.ListenTo("sample-02");
        mc.WithMultipleConsumers();
      })
    // create IMessageClient that will send to sample-02 by default
    .SendMessages(m => m.DefaultToQueue("sample-02"))
    // enable and auto-wire scheduled tasks feature
    .RunTasks(m => { m.WithDefaultInterfaces().InCurrentAssembly(); })
    .Build();
}

Actual message implementation looks as simple as this:

[DataContract]
public sealed class SendPaymentMessage : IMessage
{
  [DataMember]
  public double Amount { get; private set; }

  public SendPaymentMessage(double amount)
  {
    Amount = amount;
  }
}

Message Handling Feature will listen to the "sample-02" queue, dispatching any message to all available consumers.

Let's create a consumer to handle SendPaymentMessage explicitly:

public sealed class SendPaymentHandler : IConsume<SendPaymentMessage>
{
  public void Consume(SendPaymentMessage message)
  {
    Trace.WriteLine("There is an incoming payment! " + message.Amount);
  }
}

Now, let's create another consumer that will deal with every single message showing up in the queue:

public sealed class ListenToEverythingHandler : IConsume<IMessage>
{
  public void Consume(IMessage message)
  {
    Trace.WriteLine("Got message of type: " + message.GetType());
  }
}

Note, that we do not explicitly associate consumers with the messages. DomainModule will automatically derive all the required mappings by inspecting available assemblies at the application start-up.

After pulling everything in the same project and running it in Windows Azure Simulation Environment we'll get something like this in our output:

DefaultCloudEngineHost: [Info] Starting host
ScheduledProcess: [Debug] Starting 1 tasks in 1 threads
ScheduledProcess: [Debug] Executing SendPaymentsSometimes
ConsumingProcess: [Debug] Starting consumption for Queue x 1 (sample-02)
AzureQueueTransport.Messages: [Debug] Starting 1 threads to handle messages on sample-02
Queue[sample-02]: [Debug] Auto-created queue http://.../sample-02
There is an incoming payment! 80,4
Got message of type: Sample_02.Worker.SendPaymentMessage
Queue[sample-02]: [Debug] SendPaymentMessage - a8c42013-b08b-4409-a584-15b033fd62e8
ScheduledProcess: [Debug] Executing SendPaymentsSometimes
There is an incoming payment! 89,4
Got message of type: Sample_02.Worker.SendPaymentMessage

There is a quick way to make our debug logs more readable, when the matter comes to the messages. We can just override ToString method with our custom implementation:

public override string ToString()
{
  return string.Format("Send Payment ({0})", Amount);
}

This will make passing messages of this type look like:

Queue[sample-02]: [Debug] Send Payment (24.8).

Further reading:

Summary

This article was from Learning series for Lokad.CQRS Guidance. We've talked a little bit about Lokad.CQRS App Engine flexibility, while exploring Scheduled Tasks Feature and multiple message dispatching in the Sample 02 - Recurrent Payments.

In the next tutorial serie we'll learn about NHibernate Module of Lokad.CQRS, which makes it simpler to work with SQL Azure from our message handlers.

You can stay tuned to the updates by subscribing to this Journal.

So what do you think about this article and the project?

« Lokad.CQRS - Plugging NHibernate and Relational Databases into Windows Azure | Main | Lokad.CQRS - Messages, Commands, Events and Ping Pong »