The Best Way to Learn CQRS, DDD and Event Sourcing

The best way to learn something by heart is to try to reinvent it yourself. You'll either give up or will learn all the problems and trade-offs really well. That's what I'm planning to do, while sharing my thoughts and practical experience with you.

To catch up with Command-Query Responsibility Segregation you can check out the Getting Started with CQRS (includes videos, sample project references and articles by brilliant people like Greg Young and Udi Dahan). Ultimately my own articles, thoughts and notes from the production will go into the xLim 4: CQRS in Cloud series.

Ok, let's get started.

So far the learn by doing approach had been working extremely well for me. At the moment I'm learning how to make CQRS solutions leverage their potential by going the path of the Domain-Driven Design and Event Sourcing and while keeping in mind constraints of the infinitely scalable systems by Pat Helland. I'm also trying CQRS outsourcing potential (or rather potential for efficiently distributing development effort in parallel) and attempt to figure out methodology of continuously integrating DDD/ES solutions with the legacy CRUD systems. Latter is really important since, that's what I've been doing a lot recently in Salescast.

All this goodness just requires getting at least one more developer, Mercurial repository and spending a few evenings building your spike project and evolving it.

This teaches a lot. Yet sometimes development just happens to deviate away from the common design patterns (for "common" - see Reference Implementations of Greg and Mark in the CQRS section). In a strange way.

That's how my command handlers look like:

public sealed class ProjectHandler : IHandleCommands
{
  readonly EventStorePartition _store;

  public ProjectHandler(EventStorePartition store)
  {
    _store = store;
  }

  public IDisposable Subscribe(IQbservable<DomainCommand> observable)
  {
    return observable
      .WherePartitionIs(_store.Partition)
      .Subscribe(Handle);
  }

  void Handle(DomainCommand d)
  {
    _store.Update<ProjectAggregateRoot>(
      d.AggregateId, 
      r => DomainInvoker.RouteToDo(r, d.Command), 
      d.Command is CreateSolution);
  }
}

Or if we need to ensure for this AR that commands should not be based on stale data, then Handle transforms to:

void HandleWithConcurrencyCheck(DomainCommand d)
{
  _store.Update<ProjectAggregateRoot>(
    d.AggregateId,
    r => {
        if (d.Version != r.Version)
          throw new InvalidOperationException(
             "Uhm. Root was changed since client last saw it");

        DomainInvoker.RouteToDo(r, d.Command);
      },
    d.Command is CreateSolution);
}

Event handlers suffer in the same way. Yet they potentially benefit from IQbservable even more, since we could theoretically filter interesting events at the server, by using .NET Observable Query Provider capable of instructing AMQP server to send us (this specific event handler running within this partition) only specific events for this partition.

Another interesting thing is how event store looks like, when developed in .NET 4.0 with the observable goodness:

// read events from file within partition
// ... skipped...
var subject = new Subject<Change>();
var aggregateRoot = (TAggregateRoot) factory(subject);

// apply events to domain
foreach (var change in history)
{
  aggregateRoot.Apply(change.Event);
}

// subscribe to any changes produced by our actions
var newChanges = new List<Change>();
using (subject.Subscribe(newChanges.Add))
{
  update(aggregateRoot);
}

// nothing changed
if (newChanges.Count == 0) return;

try
{
  // turn changes into domain events capable of crossing partition boundaries
  var events = newChanges
    .Select(c => new DomainEvent(id, c.Event, c.Version, DateTimeOffset.Now))
    .ToArray();
  // naive approach to persisting event history for now. Easy to improve
  var allEvents = history.Concat(events).ToArray();
  storageItem.Write(stream => _formatter.Serialize(stream, allEvents), condition);
}
catch (StorageConditionFailedException ex)
{
  // normally this should never happen, since commands are processed by a single
  // processor per partition, but just in case...
  var msg = string.Format("Record was modified: '{0}'; Id: '{1}'", type, id);
  throw new OptimisticConcurrencyException(msg, ex);
}

Yet one more interesting thing is how easy it turns out to turn CQRS solution with a message server (Erlang-powered RabbitMQ, for example) into a desktop application with in-memory event bus. You just need to swap messaging libraries with ConcurrentQueue from .NET 4.0, swap partitionable event and view storage for file-based and ask .NET 4.0 TPL to keep pumping events and commands, while the application is running:

var commands = new ConcurrentQueue<DomainCommand>();
var events = new ConcurrentQueue<DomainEvent>();

var data = new FileStorageContainer("data").Create();
var views = new FileStorageContainer("views").Create();

var cts = new CancellationTokenSource();

var sender = new DelegateSender(commands.Enqueue);
var viewStore = new ViewStore(views);

using (var form = new Form1(sender, viewStore))
{

  var tasks = new List<Task>
  {
    Task.Factory.StartNew(() => ProcessCommands(commands, events, data, cts.Token, form.Replay)),
    Task.Factory.StartNew(() => ProcessEvents(events, views, cts.Token))
  };

  Application.Run(form);

  cts.Cancel();
  Task.WaitAll(tasks.ToArray());
}

Essentially this proved to me that CQRS architecture (or some portions of it) can be bundled into a single Windows Forms Application (or whatever desktop UI you are using), scaling everything down to a single process and keeping event bus, event and command handlers inside. What's more important, all these processes still run in different threads (effectively leveraging multi-core capabilities to pre-render views), yet they are simple to understand and isolate (I didn't have a single UI threading issue, which tend to show up frequently whenever I start developing any async UI).

Theoretically CQRS for the desktop is the same MVC pattern with additional explicit constraints on organizing controllers and distributing their logic in async manner.

PS: for the latest articles in the CQRS/DDD series see xLim 4: CQRS in Cloud.

- by .