Home © Rinat Abdullin 🌟 AI Research · Newsletter · ML Labs · About

Rx works nicely with DDD and Event Sourcing

Yesterday I finally got to the point of trying to build Command-Query Responsibility Segregation solution for Cloud Computing environment that uses Domain Driven Design and Event Sourcing (quick overview of these buzz words). The whole purpose of the project is to learn, so there's a freedom to experiment upon the accepted patterns and practices.

Mark Nijhof put together really thorough article on the Domain Events in CQRS. It has a lot of code and information packed in. Let's take it from there, but with a slightly different and highly experimental route.

Usually Aggregate root implementations have all sorts of interfaces and methods helping the surrounding infrastructure to pick them up and handle. This makes the domain code look a bit complex for me.

However it seems that these AggregateRoots are a native candidate for leveraging Reactive Extensions for .NET. AR might need to inherit and implement just a single interface (snapshots might require the second one, though):

public class ProjectAggregateRoot : ISubject<IEvent,Change>

Where ISubject is an interface from System.Reactive that merely says:

public interface ISubject<in T1, out T2> : 
  IObserver<T1>, IObservable<T2>

How does it affect our aggregate design? Not much, we just slightly extend the "Apply Event" method, naming it "OnNext" and adding the ability to accept IEvent and publish Change:

Subject<Change> _subject = new Subject<Change>();

public void OnNext(IEvent value)
{
  EventInvoker.Apply(this, value);
  Interlocked.Increment(ref _version);

  _subject.OnNext(new Change(_version, value));
}

public IDisposable Subscribe(IObserver<Change> observer)
{
  var subscribe = _subject.Subscribe(observer);
  return subscribe;
}

Note: As you know, Reactive Extensions were designed for the asynchronous operations (i.e.: cloud computing interactions or UI events). Thus they work tightly with PFX. However, since I don't know anything about this integration and side effects, I'm dispatching and executing everything in sync so far.

Change is just an event that was applied and thus has a version number:

public sealed class Change
{
  public readonly long Version;      
  public readonly IEvent Event;

  public Change(long version, IEvent @event)
  {
    Version = version;
    Event = @event;
  }

  public override string ToString()
  {
    return string.Format("r{0:####}: {1}", Version, Event.ToString());
  }
}

What does this give us? We can drop all event subscription and management interfaces and functionality, letting Linq-2-Events to do all the handling in rich and tested way:

  • LoadFromHistory methods are equivalents of IObserver[TEvent]
  • GetChanges functionality is fulfilled by subscribing to IObservable[Change]

What's more important, our aggregate can avoid referencing any custom interfaces, since both IObservable and IObserver are in .NET 4.0 BCL. This also provides a wide range of extension methods available.

Given that, it becomes rather simple task to write store that works like this:

// somewhere in IoC init
var store = new HybridStore(serializer, storage);
store.RegisterEventSource<ProjectAggregateRoot>();

// somewhere in the handler we perform atomic update
// if there is a concurrency problem, service bus will be
// responsible for reapplying changes later
store.Update<ProjectAggregateRoot>("project123", e =>
{
  e.AddTask("t1", "Satori Project");
  e.AddTask("t2", "Build Simple Domain");
  e.AddTask("t3", "Build Simple Sync reader");
  e.CreateTaskReference("t2", "t1");
});

For the sake of consistency, here's how the actual persistence (highly prototypical) looks like:

public void Write(Type type, object key, 
  AddEntityDelegate addEntityDelegate, 
  UpdateEntityDelegate updateEntityDelegate)
{
  var item = MapTypeAndIdentity(type, key);

  Func<object, ISubject<IEvent, Change>> factory;
  if (_factories.TryGetValue(type, out factory))
  {
    var condition = StorageCondition.None;
    var domain = factory(key);
    var changes = new List<Change>();

    using (domain.Subscribe(changes.Add))
    {
      try
      {
        item.ReadInto((props, stream) =>
        {
          var source = (Change[])_serializer
            .Deserialize(stream, typeof(Change[]));
          foreach (var change in source)
          {
            domain.OnNext(change.Event);
          }
          condition = StorageCondition.IfMatch(props.ETag);
        });
      }
      catch (StorageItemNotFoundException) { }
      var version = changes.Count;

      using (domain.Subscribe(_subject))
      {
        updateEntityDelegate(key, domain);
      }
      if (version == changes.Count)
        return;
    }

    try
    {
      item.Write(stream => _serializer.Serialize(changes.ToArray(), stream),
        condition);
    }
    catch (StorageConditionFailedException ex)
    {
      var msg = string.Format(
        "Record was modified concurrently: '{0}'; Id: '{1}'. Please, retry.",
        type, key);
      throw new OptimisticConcurrencyException(msg, ex);
    }
  }
}

Since we are forced to deal with the event streams, it's easy to subscribe and do things like:

using (store.Subscribe(Console.WriteLine))
{
  store.Update<ProjectAggregateRoot>("project123", e =>
  {
    e.CreateTaskReference("t2", "t3");
  });
}

The listener subscription above will reveal us that creating task reference in this case actually results in two events:

r5: Domain.TaskRemovedFromParentTask
r6: Domain.TaskAddedToParentTask

Since Reactive Extensions for .NET are built upon the event streams I think that doing all sorts of related operations operations (writing event denormalizers, merging streams, writing behavioral unit tests) might be simplified in Domain Driven Design with the Event Sourcing.

Such abstraction allows to separate different concerns rather clearly. For example the actual implementation of the underlying storage passed to HybridStore could be FileStorageContainer or BlobStorageContainer for Windows Azure from Lokad.CQRS since both allow atomic updates and optimistic concurrency locking (or any reasonable RDB or NoSQL).

It's interesting to note, that these storage container implementations were actually developed for the project without any hint of DDD or ES. Yet, since we have such a nice persistence ignorance in CQRS/ES architectures, they could be plugged in without problems.

Also, it is rather simple to implement partitioning in this scenario, since all access to the storage goes with the entity key (identity). If you add into the mix cloud infrastructure capable of doing dynamic repartitioning (i.e. Windows Azure storage) and providing computing capacities on demand - you'll get foundation for building almost-infinitely scalable solution (although message queues still need proper support for transactional sending and ACKs). CQRS will provide high-level approaches for shaping the architecture and evolving it in scalable and cost-effective manner; while DDD holds the methodology for designing and managing the business core of the solution.

All in all, future looks quite exciting, doesn't it?

Published: August 21, 2010.

🤗 Check out my newsletter! It is about building products with ChatGPT and LLMs: latest news, technical insights and my journey. Check out it out