Building Blocks in CQRS world à la Lokad
Once you have broken your systems into separate subsystems (bounded contexts) and introduced Command-Query Responsibility Segregation inside each one, there are multiple paths to take from there.
Each path depends on your background, familiar technologies and available developers.
Two Primary View Points
I think that all views on building distributed systems in .NET world can be organized in two groups.
Developers from Udi Dahan/NServiceBus world tend to structure systems with CRUD aggregates (persisted in SQL or RavenDB) and SOA Services, implement business workflows with sagas that look almost like aggregates. Choice of persistence usually is restricted to transactional systems (MSMQ being the star here). There is a smart usage of caching and internet infrastructure to deal with scaling.
People with preference for Greg's ideas tend to express business behaviors with event-sourced aggregates, avoid use of "orchestration" term and structure flows with either state machines or stateless document sagas. Views are usually rebuilt by replaying events. Choice of persistence does not matter and transactions are much less important.
à la Lokad
I'm personally closer to Greg's side, but a lot more constrained (startup environment and hybrid hosting environments took their tall). Primary differences (for the latest projects) is that I do not use any sagas (there is no such term) and there are no future messages.
Let's highlight primary domain building blocks that are used in post-CQRS world according to Lokad:
- Aggregate Roots
- Workflows
- Projections
- Domain Services
- Processes
Aggregate Roots
Aggregate Roots with event sourcing (or AR+ES) - have complex behaviors, are easily tested and persistence - ignorant. They serve as consistency boundary and are partitionable by Id.
When command arrives to the server, it is passed to the command handler, which loads the aggregate root (and any relevant services) and executes an action against that aggregate root. That action results in events that both change state of the aggregate and are published as messages to all subscribers.
One of the simple mental patterns of implementing AR+ES entities revolves about splitting state and behaviors in two distinct classes: aggregate state and behavior.
Aggregate State class contains structural representation of AR+ES, that can be mutated only by applying events to it.
public class CustomerAggregateState
{
public CustomerAggregateState(IEnumerable<IEvent<IIdentity>> events)
{
foreach (var @event in events)
{
Apply(@event);
}
}
public int Version { get; private set; }
public bool ConsumptionLocked { get; private set; }
public void Apply(IEvent<IIdentity> e)
{
RedirectToWhen.InvokeEventOptional(this, e);
Version += 1;
}
public void When(CustomerLocked e)
{
ConsumptionLocked = true;
}
public void When(CustomerUnlocked e)
{
ConsumptionLocked = false;
}
// ETC...
}
Actual aggregate class contains behaviors that are usually executed in response to commands sent. In order to carry them out, aggregate uses it’s own state and any available domain services. Any changes are passed down to state as events. They will also be saved in unit of work, then - committed to event store (which will publish them afterwards).
public class CustomerAggregate
{
Action<IEvent<CustomerId>> _unitOfWork;
CustomerAggregateState _state;
public void LockCustomerForAccountOverdraft(IPricingModel pricing)
{
if (_state.ManualBilling)
return;
var balance = pricing.GetPaymentThreshold(_state.Currency).Convert(d => -d);
if (_state.Balance > balance)
{
Context.Explain("Balance {0} is above threshold of {1}. Don't lock",
_state.Balance, balance);
}
else
{
LockCustomer("Overdraft");
}
}
public void LockCustomer(string reason)
{
if (_state.ConsumptionLocked)
return;
Apply(new CustomerLocked(_state.Id, reason));
}
// ETC
void Apply(IEvent<CustomerId> e)
{
_state.Apply(e);
_unitOfWork(e);
}
}
Command handler deals with bringing all this together:
/// This command handler can be replaced by a set of lambdas
public class CustomerHandler
{
IAggregateStore<CustomerId, CustomerAggregate> _store;
IPricingModel _pricing;
public void When(LockCustomer c)
{
_store.Update(c.Id, a => a.LockCustomer(c.Reason));
}
public void When(LockCustomerForAccountOverdraft c)
{
_store.Update(c.Id, ar => ar.LockCustomerForAccountOverdraft(_pricing));
}
// ETC...
}
Domain Services
IPricingModel from the code above is actually a sample of domain service. It is acquired by command handler and then passed to aggregate root to provide all sorts of rich functionality. If Aggregate Roots (with their complex behaviors and advanced structural persistence) are brains, then domain services are actually the muscles. Here are some samples:
- index lookups;
- pricing calculators;
- mail messaging;
- integration with payment systems.
Workflows
Workflows are the corner stones for interactions in our bounded contexts (or sub-systems). They subscribe to all sorts of events that happen in the environment both outside and inside the bounded context and define immediate reactions to these in form of commands that will be sent to players inside.
public sealed class BillingWorkflow
{
readonly IFunctionalFlow _flow;
public BillingWorkflow(IFunctionalFlow flow)
{
_flow = flow;
}
public void When(CustomerBillChargeAdded e)
{
_flow.ToCustomer(new WriteCustomerInvoiceForBill(e.Id, e.BillId));
}
public void When(CustomerInvoiceWritten e)
{
_flow.ToCustomer(new RequestCustomerInvoicePayment(e.Id, e.InvoiceId));
}
public void When(InvoicePaymentReceived e)
{
_flow.ToCustomer(new AddCustomerInvoicePayment(e.CustomerId, e.InvoiceId,
e.GrossAmount, e.PaymentCode, e.PaymentId));
}
// etc
This code is extremely simple and serves one and one purpose alone - to explicitly define events that this bounded context reacts to. These reactions will then be carried out by our command handlers which will load appropriate aggregate roots (brains) and let them operate domain services (muscles), while remembering what happened for future generations.
Explicit difference from sagas is:
- Term "saga" is completely overloaded. Initially it was introduced for managing long-lived database transactions and then got hijacked by Udi Dahan and NServiceBus.
- Sagas usually allow complicated business logic structured around internal state. Workflows usually don't have any logic and serve merely as a way to explicitly define events which this Bounded Context subscribes and reacts to.
If we bring all this into one picture, that's what will show up:

Projections
OK, this picture is nice, but dealing with events outside of Aggregate Roots can be complicated (unless you enjoy querying event streams). That's where projections come into play. They allow to project event streams into any structural representation (view or persistent read model). This view is eventually consistent and persistence - ignorant (in other words, it can live in any key-value store with decent consistency guarantees).
Projections are usually way too simple to require any testing.
public class CustomerInvoicesProjection
{
IAtomicWriter<CustomerId, CustomerInvoicesView> _writer;
public CustomerInvoicesProjection(IAtomicWriter<CustomerId, CustomerInvoicesView> writer)
{
_writer = writer;
}
public void When(CustomerInvoiceWritten e)
{
_writer.UpdateEnforcingNew(e.Id, i => i.AddInvoice(
e.InvoiceId,
e.Totals.Total,
e.Header.CreatedUtc));
}
public void When(InvoicePaymentReceived e)
{
_writer.UpdateOrThrow(e.CustomerId, i => i.AddPayment(
e.InvoiceId, e.GrossAmount, e.PaymentId));
}
public void When(CustomerInvoiceClosed e)
{
_writer.UpdateOrThrow(e.Id, i => i.CloseInvoice(e.InvoiceId, e.Reason));
}
}
[DataContract]
public class CustomerInvoicesView
{
[DataMember(Order = 1)]
public IDictionary<long,CustomerInvoice> Invoices { get; set; }
public CustomerInvoicesView()
{
Invoices = new Dictionary<long, CustomerInvoice>();
}
public void AddInvoice(InvoiceId invoiceId, CurrencyAmount total, DateTime createdUtc)
{
Invoices.Add(invoiceId.Id, CustomerInvoice.Create(invoiceId, total, createdUtc));
}
public void AddPayment(InvoiceId invoiceId, CurrencyAmount grossAmount, string paymentId)
{
Invoices[invoiceId.Id].ApplyPayment(grossAmount, paymentId);
}
public void CloseInvoice(InvoiceId invoiceId, string reason)
{
Invoices[invoiceId.Id].Close();
}
}
On our diagram they would look like this:

Where the service (that provides query capabilities over the view) could be located in the client (Web UI) or inside server-side bounded context. For instance I could maintain an eventually consistent list of all outstanding invoices in my payment processing BC, so that when a payment comes, I could map it to the invoice by reference number.
Processes
These 4 building blocks can be combined in various ways to define a system that passively reacts to external events (which could be caused by user) and publishes responding events outside. How do we proactively do something to drive world around us?
Let's code us our virtual user that will be running loops over the projected data and carrying out actions, when he finds something interesting. These actions will come either as events published to everybody ("FraudTransactionDetected") or commands to a command handler within the same bounded context ("TryClosePendingInvoice").

In some other teams people would use "sagas with state machines and future messages" in order to implement this simple user. In our case, this is just a simple process that accesses some service in a loop. The service can be our own read model or some 3rd party system that we are polling.
while (!token.IsCancellationRequested)
{
try
{
var doc = _reader.GetOrNew();
foreach (var cmd in GetOverdraftSuspects(doc))
{
_endpoint.ToCustomer(cmd);
}
token.WaitHandle.WaitOne(waitPeriod);
}
catch (Exception ex)
{
Context.Debug(ex);
// to avoid overloads
token.WaitHandle.WaitOne(TimeSpan.FromMinutes(5));
}
}
But, unlike sagas, this process is not a pain in the neck, when matter comes to debugging, maintenance and upgrades in real world.
Bounded Context
All these 5 blocks from above can be combined together within a bounded context to express any kind of desired behavior. For instance, let's say we are defining a BC for managing access of users to some cloud service. This BC would be responsible for:
- telling service fabric, which users are granted access to to it (and with which keys)
- receiving fine-grained usage statistics from service fabric;
- aggregating these statistics on a periodical basis into consumption bills for actual billing process.
Such BC would have following components:
- Workflow that subscribes to outside account management events (from other BCs) and passes them as commands to internal command handlers (which will have aggregates for consumption tracking);
- these command handlers would also call domain service for service fabric management API to add/remove access keys for users;
- there will be a continuously running process to poll management API of service fabric for any new consumption details and pass them to consumption aggregates.
- there will be a projection that maintains a list of all accounts along with their respective billing periods.
- this projection will be scanned once in a while by a process that will look for accounts that should have their consumption bills settled. For each of these, it will send a command to internal aggregate, instructing it to close them.
Somewhere in bounded contexts far far away, there could also be:
- a billing process in some other bounded context, that is interested in events about closed cnosumption bills (using them to charge customer's balance)
- Web Admin UI that displays a view with last 500 resources consumed and top consumers (kindly populated by corresponding projections)
- Web client UI that displays for the customers detailed break-down of the resources they have consumed within the last billing period.
- etc...
But these are different bounded contexts, with different story, purpose, tech requirements and combination of these 5 core building blocks.
More Information: there is some practical code and tooling in Lokad.CQRS Sample Project. This project is backed up by CQRSGuide.com, providing a practical and focused overview on building systems in CQRS world. This article will be embedded into this guide in the next few weeks.
Update
This article has been improved upon with: Anatomy of Distributed System à la Lokad
Friday, March 9, 2012 at 12:09
Reader Comments (10)
Excellent and inspirational stuff.
Keep posting stuff like this - makes it easier for us CQRS/DDD newbies to get started.
Do you have sample code for this somewhere? :)
:)
Check out Lokad.CQRS Sample project. It has not been updated to the very latest yet, but is pretty thorough.
http://lokad.github.com/lokad-cqrs/
Hi Rinat,
Very nice explanation! Would you consider the UI, as a whole, as a separate bounded context or is it made up of parts that each belong to the bounded context they do presentation for?
Hi Magnus,
It depends. Usually my UI is a separate bounded context that holds: local projections (that subscribe to events from other bounded contexts and populate views used for display), and client UI that shows it's views and allows sending commands back to the originating bounded contexts. This simplifies development and deployment experience.
I think this is a great set of building blocks, especially for a CQRS system. With a bit more framework and tooling it would be tempting to call this a silver bullet!
I have some questions:
"That action result in events that both change state of the aggregate and are published as messages to all subscribers."
Who publishes event messages? The event store? Or is there a separate process polling the event store?
Why is there a distinction between "CustomerAggregateState" and "CustomerAggregate"? In a typical DDD application these would be combined into a single class and I'm not sure I see the value. Also, I see that the aggregate holds a reference to the unit of work. Can you elaborate on the value of that?
What is the difference between a workflow and a command handler other than that a workflow consumes events instead of commands?
"How do we proactively do something to drive world around us?"
Perhaps another way to look at this problem is that workflows are triggered by domain events, whereas processes are triggered by scheduled events (ie: every 5 minutes).
I really like the projection model of dealing with read models. Projections can be viewed as generalized database indices and it would seem a DSL could be beneficial. What are your thoughts?
Hi Lev,
There is a little bit more framework and tooling in Lokad.CQRS Sample Project: http://lokad.github.com/lokad-cqrs/ :)
Re publishing messages - it depends on implementation. In the proper scenario, event store is responsible for publishing events (via the separate background polling process that is notified of changes).
Re introduction of aggregate state: I introduce this separation into my projects as a way to simplify AR+ES implementation. Both classes can be easily merged as one. Updated blog post with a little bit more details.
Re aggregate holding reference to unit of work: just another development trick that just makes life a little bit easier in various scenarios. Nothing really important.
Re command handlers vs workflows: Technically these are both message handlers. Difference is mostly in their role and intent (just like difference between commands and events). I also prefer to keep workflows stateless, if possible.
Re workflows vs process. Yes. Although there is a subtle difference - processes usually trigger these events themselves. They do that by implementing their own timer (which could be more complicated than "every 5 minutes") and checking some projected view or by listening to notifications being sent by other systems.
Re projections: technically "projection" is a class that is responsible for denormalising incoming events into a persistent read model. (more detail). So it's more like persistent SQL View rather than index, although similarity is there (same principles). AFAIK Jeremie was working on some F# DSL for projections, yet I prefer to express them via code - more compact and flexible.
Your workflows are not dissimilar to sagas. What would you say are the distinction? For me it seems it's largely just semantics.
Jimit, difference from sagas is:
* Term "saga" is completely overloaded. Initially it was introduced for managing long-lived database transactions and then got hijacked by Udi Dahan and NServiceBus.
* Sagas usually allow complicated business logic structured around internal state. Workflows usually don't have any logic and serve merely as a way to explicitly define events which this Bounded Context subscribes and reacts to.
"Workflows usually don't have any logic" - I disagree; that's the very definition of what a workflow is. I would argue that workflow is the superset with sagas being specialized workflows that are event-driven (often implemented using state-machines) and processes being more sequential with well defined inputs and outputs.
Sorry for confusion. What I meant is that "Workflows in Lokad's approach usually don't have any logic" for a number of reasons.