Software Design Blog

Journey of Rinat Abdullin

Theory of CQRS Command Handlers: Sagas, ARs and Event Subscriptions

This is yet another one of these “working paper” theoretical posts about CQRS, where I try to understand it and make the underlying ideas logical for me. If you are reading this after the year 2010, make sure to check the section dedicated to Command and Query Responsibility Segregation and xLim4: CQRS in Cloud. There could be some more recent materials on that.

Meanwhile, here’s what we went through in the previous posts:

Let’s have a quick (but really important) note and move from here.

Dear Reader, CQRS/DDD/ES is not the silver bullet for every single system out there. I would suspect that it is applicable to less than 20% of the cases. Basically, if something like Lightswitch or XAF does the job for you without problems, then there is no need for CQRS.

CQRS works well in scenarios where you need simple scalability, cost-effective cloud deployments or ability to adapt to rapidly changing business requirements (and some ideas of almost-infinite scalability make sense to less than 5% of CQRS implementations). Yet, these articles might still help you by describing approaches to handle extreme cases when project complexity, performance or cost requirements try to jump by a few orders of magnitude.

Last week I was meeting with Jeremie, Romain and Julien. Somewhere along the way we had an interesting discussion on Sagas. This has finally helped to bring together CQRS, DDD and Cloud systems in a seemingly non-contradicting way.

Let’s start with Sags.

What do we know about Sagas?

Saga is a message handler running on the server, that helps us to implement long-running business transactions. It also helps to deal with eventual consistency, uncertainty and constraint prohibiting transactions going outside of the Aggregate Root (or entity in general). In addition to negotiating long-running transactions between entities, Sagas are aware of the passing time and can implement functionality like time outs.

Sagas can subscribe to events (InvoicePaid) and receive direct commands (StopProcess). They send commands.

One of the current practices (or at least theoretical ideas) is to model saga as a pure and finite state machine. State in this case could be stored as a mere integer. Although this is one of the possible implementations, I don’t think this to be the end of the road for Sagas. There should be more. After all, the major business value is in the sagas, while ARs are just for the “house-keeping” (thought belongs to Greg and Jeremie, I completely agree).

Here’s the simple logical explanation (oversimplified theorem, if you like) why saga can’t always be a simple state machine. Sagas, just like other handlers, operate in the eventually consistent world. Messages might be duplicated or arrive out of the order. Tough life, if you ask me.

In order to deal with that (in the extreme cases) we need more information, than a simple integer. We need to persist details about the incoming messages and, in case of avoiding any problems of 2PC, persist outgoing messages along with the state. Hence saga can’t always be modeled as a simple state machine. Jonathan Oliver seems to share same beliefs as well (and he was actually the trigger for me to finally write this post).

As it seems to me, technically Saga is really similar to any command handler (and in essence it can use both event sourcing and state persistence in the underlying store). The primary differences between saga and traditional AR hosted in command handler seem to be:

  • business purpose - AR are more for the house-keeping, while sagas actually reflecting highly specific business details on how it deals with the eventual consistency and probabilistic nature of the real world;
  • domain intent - AR manages bounded contexts, while saga orchestrates interactions between them, time and the unexpected;
  • technical constraints - sagas can subscribe to the events and get commands, while command handlers get only commands.

I agree with the community on the first two points, but the last one just does not make sense to me. The problem shows up when we are talking about the partitioned world (approach of Pat Helland again). Events might be published within multiple partitions. How does infrastructure know that certain events from partition A and C should be routed to partition D? Is it just because there could be a saga listening for them?

At the same time with commands everything is simple: there is always one recipient and partition could be located based on the identity. So we have a clear technical distinction that adds to the differences between commands and events in the DDD world.

Commands vs. Events

Let’s say we scaled our theoretical CQRS model to 1000 partitions. Whenever an aggregate publishes an event, should we send it to 999 other partitions, just because there might be an event handler interested? Even if there are no subscribers, this will result in a lot of traffic and some really complex deployment to manage (aside from bogging down the message bus).

In the finance folks seem to use AMQP servers and capabilities of the protocol for that (with filtering rules to avoid sending everything to everywhere).

With the commands everything is rather straightforward - it has a recipient uniquely identified by the identity. Given the partitioning schema we know exactly, where should the command go.

So it looks like commands and events (as types of messages) mean different things not only in the DDD/ES world, but also from the technical perspective of distributed solutions. Let’s see if we can stabilize a consistent abstraction here.

Command Message (D for Do in Marble diagrams) has following features:

  • It is a message that instructs specific entity (identified by it’s identity) to perform certain actions.
  • Messaging infrastructure is aware of this command’s identity and uses it to route command to the specific partition.
  • Command message is an entity itself - it has an identity. This allows the recipient to stay idempotent and filter out seemingly similar commands that were duplicated by the messaging infrastructure (this happens within the command handler in activity logic).
  • This message contains one or more domain commands. All domain commands in the same message are directed to the entity. They are expected to fail or succeed together (essentially this is just a construct to avoid generating composite commands).
  • Domain commands are named like DoSomething, they instruct the target entity to do something that might result in different outcomes or fail.
  • In the DDD world command message determines unit of work, while specific domain commands map to the methods called upon the target aggregate root (with the exception of cases where domain service is used).

Essentially command signature could be represented as:

public interface ICommand {}

public sealed class CommandMessage
{
  public readonly Guid EntityId;
  public readonly ICommand[] Commands;
  public readonly long KnownVersion;

  // constructor
}

where ICommand could be anything:

public sealed class RenameProject : IEntityCommand
{
  public readonly string Name;

  // constructor
}

Known version is just an optional member used to implement concurrency locking, if needed.

Event Message (E on marble diagrams) is described by:

  • Event generally is something that happened in the past. It is the history that can’t be changed. This intent is expressed in its naming: SomethingChanged.
  • Event Message is transport level message travelling through the infrastructure. It is associated with the sender’s identity and it’s version. These two properties together uniquely identify each event and allow subscriber to handle duplicates possibly generated by the scalable infrastructure.
  • Event does not have a recipient. It is merely published to the messaging infrastructure (to the world). The latter will be responsible for routing the event to it’s subscribers (there could be zero or more). Routing will be based on the subscriptions published by the entities and happens in form of gathering events and converting them into the commands directed to the subscribers.

So it looks like we have a non-contradicting match between commands and events in the scalable architecture and in the Domain-Driven Design. In essence we map incoming command messages directly towards the aggregate commands (stripping away the routing and partitioning information and letting the command handler utilize version and identity information for managing entity). At the same time we can publish resulting events as event messages (supplying them with the sender info and the other contextual details for the subscriptions).

But what are these subscriptions that magically convert published events into the commands?

Event Subscriptions

Concept of event subscription precisely matches to the concept of IQbservable from the Reactive Extensions for .NET (have I already mentioned how brilliant Erik Meijer and his team are?). If we map theory behind this cloud programmability logic to CQRS, we’ll have something like this:

  • Subscriptions are published by entities interested in certain events. Entity is anything with a unique identity that resides in a command handler: Aggregate Root, Saga, Read Model. In the code entity is decoupled from the subscription code, but they come together.
  • Entity publishes it’s event query to the environment (similar to IQueryable but for events). Query handling could be IQbservable executed on the AMQP server to build bindings, query replicated to all partitions for massive scaling or just subscriptions persisted and evaluated at the message bus.
  • Subscription query is responsible for filtering and aggregating events and converting them to commands that target specific recipients. Subscription is a way to say ”I’m interested in these messages, send them to me (identity to send to provided)”.
  • Time ticks, timeout and domain events - are examples of the events that the subscription query might be interested in.
  • Command recipients are still responsible for ordering and de-duplicating, since transfer could happen across the consistency boundaries. Ordering is specific to the actual case and could come in simple retries (infrastructure will help to retry on transient failures) or something more complex like sequence numbers.

By explicitly expressing event subscriptions we keep them decoupled from the infrastructure code (testable and portable), while still having the ability to express some complex time-based interactions. For example, below is an Rx query to have a responsive, yet throttled, event processor (from MSDN forum):

var fire = new Subject<T>();

var whenCanFire = fire
  .Select(u => new Unit())
  .Delay(delay)
  .StartWith(new Unit());

events
  .CombineVeryLatest(whenCanFire, (x, flag) => x)
  .Subscribe(fire);

 return fire;

At the same time, although subscription intent comes with the saga implementation, it will be the responsibility of the infrastructure to handle and implement it. And nobody really knows (not saga, at least), how the implementation will work. It could even be some high-performance CEP engine or distributed peer-to-peer network of partitions (if we want to build self-tuning and self-healing system that is hard to take down and is easy to scale). This is in essence similar to how seemingly similar LINQ query could be handled by various query providers (OData, SQL Server, MySQL etc).

Here’s how Saga and subscription behavior might look like with the Marble diagrams:

CQRS Saga life with Marble diagram

Note, that we don’t have any specifics related to the persistence or message sending. This is because we can do anything from MSMQ with MS SQL and up to AMQP with event sourcing and message dispatcher. Just like Aggregate Roots within Command Handlers, Sagas can work with various persistence and messaging implementations. In the case of event sourcing or state-based persistence we can avoid all problems of 2-phased commit and have as much reliability and redundancy as we want (just like with the AR and command handlers).

Idempotency and reordering for the distributed scenario could be guaranteed by the state machine implementation or activity-based handling (in this case Change persists all the required information). Retrying logic and exception notifications are handled same way as well.

Command Handling Logic

This brings all together in logic like this, where Command Handler could host Aggregate Root or Saga:

Command Handlers in CQRS

Important notes:

  • This is logical representation of subjects and their relations. Actual implementations will vary and will probably be a lot simpler. Although as long as the logical concepts are intact (and constraints are fulfilled) it should be simple to make system more scalable, rich and reliable just by implementing the proper logic. This includes but is not limited to:

    • event sourcing;
    • catching-up message dispatch, avoiding 2PC;
    • variable scalability and redundancy;
    • reordering and idempotency for the cloud scenarios.
  • KnownVersion field is needed for the concurrency locks (they are passed through the Views).

  • Domain Services are used somewhere in the “Do” method of command handler.

If we think more about it, read models fit in the same logic as well. They might have much simpler implementation (state-based persistence without any message dispatcher and limited idempotency) since we don’t expect complex behaviors and are just interested in the reads. Syntax and implementation should be optimized for this scenario. Duck project, Jeremie was tweeting about, is a good sample of such approach, where infrastructure implementation is tailored to express the intent in the most efficient way in the given technical scenario.

The Big Picture

In short this boils down to the following schema, that describes technical constraints and capabilities of command handlers:

Categorization of CQRS Command Handlers

Again, it will be the responsibility of the implementor to pick whatever combinations are necessary in order to capture and express certain intent in the most efficient way.

Unfortunately at the moment we don’t seem to have established theoretical concepts and explicit implementations in the CQRS community (see CQRS reference page for the links). This results in a bit of confusion and general perception of the distributed CQRS architectures as something really complex and highly specific. Fortunately there are really interesting ideas and articles showing up every day. Community is really vibrant and helpful as well.

So far, what do you think?

PS: Sorry for the complex article. Eventually I will be able to understand the subject enough to explain it without a lot of words. For now, I’m still figuring everything out. You can check out xLim 4 reference page for any later articles in the series.