Theory of CQRS Command Handlers: Sagas, ARs and Event Subscriptions
Sunday, September 26, 2010 at 17:47 Tweet in
CQRS,
Cloud Computing,
DDD,
Event Sourcing,
Reactive Extensions,
xLim This is yet another one of these "working paper" theoretical posts about CQRS, where I try to understand it and make the underlying ideas logical for me. If you are reading this after the year 2010, make sure to check the section dedicated to Command and Query Responsibility Segregation and xLim4: CQRS in Cloud. There could be some more recent materials on that.
Meanwhile, here's what we went through in the previous posts:
- The Best Way to Learn CQRS, DDD and Event Sourcing
- Scenario-based Unit Tests for DDD with Event Sourcing
- Domain-Driven Design, Event Sourcing, Rx and Marble Diagrams
- Command Handlers without 2PC and with Various Levels of Reliability
Let's have a quick (but really important) note and move from here.
Dear Reader, CQRS/DDD/ES is not the silver bullet for every single system out there. I would suspect that it is applicable to less than 20% of the cases. Basically, if something like Lightswitch or XAF does the job for you without problems, then there is no need for CQRS.
CQRS works well in scenarios where you need simple scalability, cost-effective cloud deployments or ability to adapt to rapidly changing business requirements (and some ideas of almost-infinite scalability make sense to less than 5% of CQRS implementations). Yet, these articles might still help you by describing approaches to handle extreme cases when project complexity, performance or cost requirements try to jump by a few orders of magnitude.
Last week I was meeting with Jeremie, Romain and Julien. Somewhere along the way we had an interesting discussion on Sagas. This has finally helped to bring together CQRS, DDD and Cloud systems in a seemingly non-contradicting way.
Let's start with Sags.
What do we know about Sagas?
Saga is a message handler running on the server, that helps us to implement long-running business transactions. It also helps to deal with eventual consistency, uncertainty and constraint prohibiting transactions going outside of the Aggregate Root (or entity in general). In addition to negotiating long-running transactions between entities, Sagas are aware of the passing time and can implement functionality like time outs.
Sagas can subscribe to events (InvoicePaid) and receive direct commands (StopProcess). They send commands.
One of the current practices (or at least theoretical ideas) is to model saga as a pure and finite state machine. State in this case could be stored as a mere integer. Although this is one of the possible implementations, I don't think this to be the end of the road for Sagas. There should be more. After all, the major business value is in the sagas, while ARs are just for the "house-keeping" (thought belongs to Greg and Jeremie, I completely agree).
Here's the simple logical explanation (oversimplified theorem, if you like) why saga can't always be a simple state machine. Sagas, just like other handlers, operate in the eventually consistent world. Messages might be duplicated or arrive out of the order. Tough life, if you ask me.
In order to deal with that (in the extreme cases) we need more information, than a simple integer. We need to persist details about the incoming messages and, in case of avoiding any problems of 2PC, persist outgoing messages along with the state. Hence saga can't always be modeled as a simple state machine. Jonathan Oliver seems to share same beliefs as well (and he was actually the trigger for me to finally write this post).
As it seems to me, technically Saga is really similar to any command handler (and in essence it can use both event sourcing and state persistence in the underlying store). The primary differences between saga and traditional AR hosted in command handler seem to be:
- business purpose - AR are more for the house-keeping, while sagas actually reflecting highly specific business details on how it deals with the eventual consistency and probabilistic nature of the real world;
- domain intent - AR manages bounded contexts, while saga orchestrates interactions between them, time and the unexpected;
- technical constraints - sagas can subscribe to the events and get commands, while command handlers get only commands.
I agree with the community on the first two points, but the last one just does not make sense to me. The problem shows up when we are talking about the partitioned world (approach of Pat Helland again). Events might be published within multiple partitions. How does infrastructure know that certain events from partition A and C should be routed to partition D? Is it just because there could be a saga listening for them?
At the same time with commands everything is simple: there is always one recipient and partition could be located based on the identity. So we have a clear technical distinction that adds to the differences between commands and events in the DDD world.
Commands vs. Events
Let's say we scaled our theoretical CQRS model to 1000 partitions. Whenever an aggregate publishes an event, should we send it to 999 other partitions, just because there might be an event handler interested? Even if there are no subscribers, this will result in a lot of traffic and some really complex deployment to manage (aside from bogging down the message bus).
In the finance folks seem to use AMQP servers and capabilities of the protocol for that (with filtering rules to avoid sending everything to everywhere).
With the commands everything is rather straightforward - it has a recipient uniquely identified by the identity. Given the partitioning schema we know exactly, where should the command go.
So it looks like commands and events (as types of messages) mean different things not only in the DDD/ES world, but also from the technical perspective of distributed solutions. Let's see if we can stabilize a consistent abstraction here.
Command Message (D for Do in Marble diagrams) has following features:
- It is a message that instructs specific entity (identified by it's identity) to perform certain actions.
- Messaging infrastructure is aware of this command's identity and uses it to route command to the specific partition.
- Command message is an entity itself - it has an identity. This allows the recipient to stay idempotent and filter out seemingly similar commands that were duplicated by the messaging infrastructure (this happens within the command handler in activity logic).
- This message contains one or more domain commands. All domain commands in the same message are directed to the entity. They are expected to fail or succeed together (essentially this is just a construct to avoid generating composite commands).
- Domain commands are named like DoSomething, they instruct the target entity to do something that might result in different outcomes or fail.
- In the DDD world command message determines unit of work, while specific domain commands map to the methods called upon the target aggregate root (with the exception of cases where domain service is used).
Essentially command signature could be represented as:
public interface ICommand {}
public sealed class CommandMessage
{
public readonly Guid EntityId;
public readonly ICommand[] Commands;
public readonly long KnownVersion;
// constructor
}
where ICommand could be anything:
public sealed class RenameProject : IEntityCommand
{
public readonly string Name;
// constructor
}
Known version is just an optional member used to implement concurrency locking, if needed.
Event Message (E on marble diagrams) is described by:
- Event generally is something that happened in the past. It is the history that can't be changed. This intent is expressed in its naming: SomethingChanged.
- Event Message is transport level message travelling through the infrastructure. It is associated with the sender's identity and it's version. These two properties together uniquely identify each event and allow subscriber to handle duplicates possibly generated by the scalable infrastructure.
- Event does not have a recipient. It is merely published to the messaging infrastructure (to the world). The latter will be responsible for routing the event to it's subscribers (there could be zero or more). Routing will be based on the subscriptions published by the entities and happens in form of gathering events and converting them into the commands directed to the subscribers.
So it looks like we have a non-contradicting match between commands and events in the scalable architecture and in the Domain-Driven Design. In essence we map incoming command messages directly towards the aggregate commands (stripping away the routing and partitioning information and letting the command handler utilize version and identity information for managing entity). At the same time we can publish resulting events as event messages (supplying them with the sender info and the other contextual details for the subscriptions).
But what are these subscriptions that magically convert published events into the commands?
Event Subscriptions
Concept of event subscription precisely matches to the concept of IQbservable from the Reactive Extensions for .NET (have I already mentioned how brilliant Erik Meijer and his team are?). If we map theory behind this cloud programmability logic to CQRS, we'll have something like this:
- Subscriptions are published by entities interested in certain events. Entity is anything with a unique identity that resides in a command handler: Aggregate Root, Saga, Read Model. In the code entity is decoupled from the subscription code, but they come together.
- Entity publishes it's event query to the environment (similar to IQueryable but for events). Query handling could be IQbservable executed on the AMQP server to build bindings, query replicated to all partitions for massive scaling or just subscriptions persisted and evaluated at the message bus.
- Subscription query is responsible for filtering and aggregating events and converting them to commands that target specific recipients. Subscription is a way to say "I'm interested in these messages, send them to me (identity to send to provided)".
- Time ticks, timeout and domain events - are examples of the events that the subscription query might be interested in.
- Command recipients are still responsible for ordering and de-duplicating, since transfer could happen across the consistency boundaries. Ordering is specific to the actual case and could come in simple retries (infrastructure will help to retry on transient failures) or something more complex like sequence numbers.
By explicitly expressing event subscriptions we keep them decoupled from the infrastructure code (testable and portable), while still having the ability to express some complex time-based interactions. For example, below is an Rx query to have a responsive, yet throttled, event processor (from MSDN forum):
var fire = new Subject<T>();
var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());
events
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);
return fire;
At the same time, although subscription intent comes with the saga implementation, it will be the responsibility of the infrastructure to handle and implement it. And nobody really knows (not saga, at least), how the implementation will work. It could even be some high-performance CEP engine or distributed peer-to-peer network of partitions (if we want to build self-tuning and self-healing system that is hard to take down and is easy to scale). This is in essence similar to how seemingly similar LINQ query could be handled by various query providers (OData, SQL Server, MySQL etc).
Here's how Saga and subscription behavior might look like with the Marble diagrams:

Note, that we don't have any specifics related to the persistence or message sending. This is because we can do anything from MSMQ with MS SQL and up to AMQP with event sourcing and message dispatcher. Just like Aggregate Roots within Command Handlers, Sagas can work with various persistence and messaging implementations. In the case of event sourcing or state-based persistence we can avoid all problems of 2-phased commit and have as much reliability and redundancy as we want (just like with the AR and command handlers).
Idempotency and reordering for the distributed scenario could be guaranteed by the state machine implementation or activity-based handling (in this case Change persists all the required information). Retrying logic and exception notifications are handled same way as well.
Command Handling Logic
This brings all together in logic like this, where Command Handler could host Aggregate Root or Saga:

Important notes:
This is logical representation of subjects and their relations. Actual implementations will vary and will probably be a lot simpler. Although as long as the logical concepts are intact (and constraints are fulfilled) it should be simple to make system more scalable, rich and reliable just by implementing the proper logic. This includes but is not limited to:
- event sourcing;
- catching-up message dispatch, avoiding 2PC;
- variable scalability and redundancy;
- reordering and idempotency for the cloud scenarios.
KnownVersion field is needed for the concurrency locks (they are passed through the Views).
- Domain Services are used somewhere in the "Do" method of command handler.
If we think more about it, read models fit in the same logic as well. They might have much simpler implementation (state-based persistence without any message dispatcher and limited idempotency) since we don't expect complex behaviors and are just interested in the reads. Syntax and implementation should be optimized for this scenario. Duck project, Jeremie was tweeting about, is a good sample of such approach, where infrastructure implementation is tailored to express the intent in the most efficient way in the given technical scenario.
The Big Picture
In short this boils down to the following schema, that describes technical constraints and capabilities of command handlers:

Again, it will be the responsibility of the implementor to pick whatever combinations are necessary in order to capture and express certain intent in the most efficient way.
Unfortunately at the moment we don't seem to have established theoretical concepts and explicit implementations in the CQRS community (see CQRS reference page for the links). This results in a bit of confusion and general perception of the distributed CQRS architectures as something really complex and highly specific. Fortunately there are really interesting ideas and articles showing up every day. Community is really vibrant and helpful as well.
So far, what do you think?
PS: Sorry for the complex article. Eventually I will be able to understand the subject enough to explain it without a lot of words. For now, I'm still figuring everything out. You can check out xLim 4 reference page for any later articles in the series.
Reader Comments (16)
All very interesting, but I think I need to turn this into a small concrete example to fully grasp it
The CQRS space seems to add more complexity every time I see an article on it. I'd first like to see the most basic problem addressed: users don't want async user interfaces. Web developers don't want to code this way either. It's very hard to sell this framework when something like updating a simple form via ajax needs to traverse two levels of messaging (commands + events). Don't get me wrong, I love the idea of query/command separation, but it seems like we are now trying to make CQRS a solution for everything. Sagas ? Isn't this what workflows are all about ? Simplify.
@all, Sorry for the complex article. Eventually I will be able to understand the subject enough to explain it without a lot of words. For now, I'm still figuring everything out.
@bradrover, I'm sorry for not stressing this out in the first place. CQRS is obviously not the silver bullet for every single scenario. It just works well for rather complex systems, where you need scalability, cost-effective cloud deployments or ability to adapt to rapidly changing business requirements. And the extreme scalability things I've been writing about applies to less than 5% of such systems.
Keep posting!
Great article. Few days ago I had an insight that, in CQRS/ES, sagas are really just like ARs, but haven't thought about events and partitions the way you elaborated on that.
I like very much your 'mental model' of CQRS. It puts everything in it's place. I am looking forward to trying to rearrange NCQRS abstractions to fit this model and see how it works.
@Szymon, thanks for sharing! I would love to hear your thoughts on how this model works out for NCQRS and developers using it.
While reading certain things popped into my mind:
- Who says time and out of order concepts cannot be modeled in a statemachine? A statemachine is about telling what state you are in, what valid transitions are. Not about how you handle invalid transitions (if they are invalid indeed), that's an implementation detail to me.
- A lot of prior art exists for the topics raised here. Peer-to-peer networks & efficient routing algorithms could solve the "getting the message delivered to the right recipient part".
- CommandMessage sounds like a meta model of a command or a batch of commands. It's about identifying traits of command that do not naturally fit what the command is trying to achieve. It's like a soap envelope(header/body) or an http request (headers/body) without the transport notion.
Overall very insightful article for the more advanced and "pure" cqrs/es/ddd practitioner.
@seagile, Thank you for the comments!
1. Indeed, ordering could be handled in the state machine (that's similar to what Jonathan is doing with Stateless). Logically state machine is just an entity and it is responsible for handling ordering and duplication in the most convenient way. Same applies to the transitions caused by time with one caveat - time ticks (or timeout events) come from the scheduler, which is part of the infrastructure. I think it's easier to have these details at the subscription level (exposed and visible), rather than to reimplement reliable scheduling within the saga.
2. Yes. I agree that there is a lot of prior art. Actually in essence this article is just a recompilation of what already exists. The idea in raising these topics was to ensure that there will not be any big logical problems with CQRS implementations, should we need to resort to such level of scalability (less than 5% real-world cases probably).
3. Yes, command message is a meta-model (just like concept of Change or Failure). In the real world command message it is just a transport message that carries batch of commands over to the designated recipient. I believe, that's how it is done in all these buses that allow message batching.
I think you missed the core point of Sagas:
Sagas are a case of Simple Event Processors (http://en.wikipedia.org/wiki/Event-driven_architecture#Simple_event_processing)
"Simple event processing concerns events that are directly related to specific, measurable changes of condition. In simple event processing, a notable event happens which initiates downstream action(s). Simple event processing is commonly used to drive the real-time flow of work, thereby reducing lag time and cost[4]."
A Saga contract is made of a set kinds of events (facts). It initiates with a set of facts of a given kind and finishes with a set facts of another kind (Has any Event Processor).
Unlike an Aggregate Root (Domain Object) that represents the entity carrying out actions after a command or an event and finishes with a set of facts.
Both Sagas and Aggregate Roots are Event Processors.
Sagas make use of Aggregate Roots to end its process. But unlike Sagas, Aggregate Roots are cross context artifacts, meaning that you might have two Sagas playing with the same AR at the same time (house keeping eh:). You don't get Sagas playing with Saga, at least I hope you don't.
On another note, what I'm personally trying to do is to figure out what is the role of the domain model in a EDA (domain model is not about house keeping. I really don't understand how people can arrive to such conclusion when they know nothing about the subject). This is the challenge, since without a domain model the subject is well studied! If not I would advise you to study EDA instead of CQRS :).
As for SOA it is just set of open technologies for interop and development of API's. SOA as a computer science is simply a big ball of mud, since the focus is not really solving problems but to use technology per technology sake (or sell products).
I strongly advise you to read: http://www.psgroup.com/detail.aspx?ID=681 (2006).
"Let's say we scaled our theoretical CQRS model to 1000 partitions. Whenever an aggregate publishes an event, should we send it to 999 other partitions, just because there might be an event handler interested? Even if there are no subscribers, this will result in a lot of traffic and some really complex deployment to manage (aside from bogging down the message bus)."
You need to define the subject of your partitioning.
My experience is that the single point of resource contention is one of changing the persistent memory (Repositories).
I would advise you to take care of partitioning at the Repository level if one is really tackling scalability.
If your logical partitions are replicas of each other just send the change to one and let the thing propagate (well 1000 replicas I would say its insane :).
If not, you need some kind of name space associated that associates the Entity with Repository.
Customer e = AllCustomers.get(customerId, customerNameSpace).
To this you need some kind of partition map between customerId's and customerNameSpace. The customerNameSpace might be calculated based on a set of customer attributes. This is similar to sharding, yet far simpler since Aggregate Roots form a consistency boundary.
Persistent Sagas can be defined has Transient Domain Objects (Or Transient Aggregate Roots).
Another kind of partitioning is one of the domain model (or logic partitioning). This is is easier since it regards to code only so it can scale horizontally almost infinitely. But it does rise some difficulties when it comes to deployment since you may have two nodes using different versions of the same domain model yet sharing the same Repository.
You can colocate you domain model partitions and repository partitions making a cluster. This makes sense if we design partitions bases on geographical access.
A point of research would be adaptive partitioning of event stores bases on pattern of access. Say a mechanism that would move Aggregate Roots amongst it Repository partitions correlating client-location accesses with partitioning locations.
With the above we can then tackle your problem of routing events. An event has a set of subscribers. These subscribers are Event Processor (Both Sagas and Aggregate Roots are Event Processors). If you have partitioned you Repositories has explained above the Event Router needs to take this into account to to it efficiently. There is no point in routing a event to a cluster whose underlying repository partition does not the memory of the Aggregate Root handling them.
This seams complex, but is very simple. In most situations you don't need to partition the Repository so.
Hope it helps,
Nuno
PS: By the way, Events also have identity (are unique), yet are immutable so they can transferred with no side effects. Think of facts.
Saying that Entities is about house keeping in DDD is worst then saying that Tables is about house keeping in a Relational Data Model. Ignorance is bliss.
By the way I liked your diagram of the Command Handling Logic.
I forgot to explicitly answer your core question albeit I think it was implicit in my post:
"How does infrastructure know that certain events from partition A and C should be routed to partition D? Is it just because there could be a saga listening for them?"
No.
Technically there are several solutions for routing events that come from the top of my head :) I've never implemented this neither it was required by my current employer. But I'm open to offers :)
Let's look at an event signature:
OrderSubmitted(orderid, customerid, ....);
Now suppose that we have an Aggregate or Saga that subscribes to this events. Say a Customer AggregateRoot subscribe to this.
In highly scalable environment the Customer Aggregate can be stored in a node today (or cluster nodes), and in another node in the next day. Furthermore, the domain model can be deployed in multiple nodes.
So a brute force approach as you have said, it would be to publish the event to the 1000 partitions. Very bad.
Now let's look at how the internet solved this problem. It uses a routing tables.
So:
Repository Partition Map:
Customer name-space repository-address
Event Routing Map:
Customer<name-space> OrderSubmitted *.Customer$Id.* end-point-address (or set of end points)
.....
Basically what we have is a partition map for repositories and a routing map for events.
The above routing map states that entities in given repository partition (customer name space) subscribe to OrderSubmitted events whose second argument (Customer ID) matches one of its Entities.
Maintaining this routing tables is out of the scope of my post, but I would advise you to have a look at routing algorithms for routing messages in the internet (How do you guys think that when you make an HTTP request it reaches its intended recipient?)
Nuno
Nuno, I want to thank you again for such a detailed comment!
I believe that ARs are merely a house keeping, while compared to cross-consistency processes just because:
* ARs are operating in their cosy reality where everything is consistent.
* We don't have race conditions, uncertainty or indeterministic scenarios.
Sagas, on the other hand are responsible for handling eventual consistency, timeouts, dropped transactions and negotiate ARs located within different "time lines". I think that dealing with inconsistent reality is more difficult than house-keeping within a single consistency boundary, where you can even have transactions!
I agree about command routing based on the id of the recipient entity (which is used for partitioning, BTW). Although I don't believe TCP/IP protocol concepts apply to event routing. They could work with commands (which, just like the packet, has recipient), but events are a slightly different story.
As for the adaptive partitioning and moving things around... I prefer to keep things implemented in the simplest way possible (even though theory behind might look elaborated). This helps to build scalable systems in such hectic environment as cloud))
Hi,
"Sagas, on the other hand are responsible for handling eventual consistency, timeouts, dropped transactions and negotiate ARs located within different "time lines"."
Ouch .That is quite a lot for our Sagas. What about Sagas being responsible to guarantee a response under a certain time frame, to a set of events arriving async?
Handling "eventual consistency" IMHO should be done with an Audit concept. Basically an event processor that correlates a set of events already established (facts) and checks if the premisses used were valid within some time constraints.
"Although I don't believe TCP/IP protocol concepts apply to event routing. "
I'm sure you understand the duality of IEnumerable and IObserver. You have said already that observers publish their subscriptions. So it just a matter of defining to where it is published and how we will manage the subscription tables across the network.
"As for the adaptive partitioning and moving things around... I prefer to keep things implemented in the simplest way possible"
I guess that Amazon does this to some degree.
@Rinat, @Nuno
I posted my opinion on your discussion at my blog here
Nuno, if you have a snippet to share on " just a matter of defining to where it is published and how we will manage the subscription tables across the network", then I'd love to see that. I'm not smart enough to visualize reliable IQbservable subscription replication across the network and applying it in fault-tolerant way.
As for the sagas - I suspect we just build our production systems in a different ways. Well, as long as they scale and evolve as needed, theoretical differences should not be a big issue, right?)
Szymon,
replied back with a comment. In short I think that the core theory should be the same for all contexts (just like the basic algebra or laws of logic). Implementations in specific contexts might differ and follow certain patterns. But they will still follow the logic.
// that's my personal opinion based on rather limited experience.