Home » Event Sourcing 🌟 AI Research · Newsletter · ML Labs · About

Event Sourcing a la Lokad

I've seen quite a few ways of doing Aggregate Roots and event sourcing. Here's the dead simple approach that works for me.

For the sake of simplicity, we logically separate aggregate root (event generation from commands, given certain internal state to make the decisions) from the aggregate state (building state from the events).

public interface IEvent {}
public interface ICommand{}

public interface IAggregateState
{
    void Apply(IEvent e);
}
public interface IAggregateRoot
{
    void Execute(ICommand c);
}

Given that, an aggregate state looks like a view handler, while aggregate root itself resembles a command handler (from the scenarios, where ES is not employed at all). That's actually the point.

Let's define a simple aggregate, that tracks opened bills for the customer. First, we define contracts for our commands and events (I'm using a VS T4+ANTRL Combo here to avoid writing message contract classes by hand or bloating this post):

let customerId = CustomerId CustomerId;

CreateBill? (customerId, DateTime startDateUtc)
BillCreated! (customerId, DateTime startDateUtc)

CloseBill? (DateTime closeDateUtc)
BillClosed! (customerId, DateTime closeDateUtc)

AddServicesToBill? (int serviceCount)
ServicesAddedToBill! (int serviceCount)

Then, we proceed to write the aggregate state. It should know, how to build itself from the events.

public sealed class BillAggregateState : IAggregateState
{
    public CustomerId Customer { get; private set; }
    public DateTime Opened { get; private set; }
    public DateTime Closed { get; private set; }
    public int Services { get; private set; }

    public void Apply(IEvent @event)
    {
        RedirectToWhen.InvokeEvent(this, @event);
    }
    public void When(BillClosed e)
    {
        Closed = e.CloseDateUtc;
    }
    public void When(BillCreated e)
    {
        Customer = e.CustomerId;
        Opened = e.StartDateUtc;
    }
    public void When(ServicesAddedToBill e)
    {
        Services += e.Services;
    }
}

The only "magical" place here is the RedirectToWhen helper, which is actually quite simple (see gist).

Given the state, we can define our aggregate as:

public class BillAggregate : IAggregateRoot
{
    readonly BillAggregateState _state;
    readonly Action<IEvent> _observer;

    public BillAggregate(Action<IEvent> observer, BillAggregateState state)
    {
        _state = state;
        _observer = observer;
    }
    void Apply(IEvent e)
    {
        _state.Apply(e);
        _observer(e);
    }
    public void Execute(ICommand c)
    {
        RedirectToWhen.InvokeCommand(this, c);
    }
    public void When(CreateBill bill)
    {
        Apply(new BillCreated(bill.CustomerId, bill.StartDateUtc));
    }
    public void When(AddServicesToBill c)
    {
        Apply(new ServicesAddedToBill(c.ServiceCount));
    }
    public void When(CloseBill e)
    {
        Apply(new BillClosed(_state.Customer, e.CloseDateUtc));
    }
}

Having said all that, here's how the "event sourcing magic" actually works:

IEnumerable<IEvent> givenEvents = ...;
IEnumerable<ICommand> whenCommands = ...;

// load state from the event history
// or, if you have snapshot - load it here first
// we will not do the latter here
var state = new BillAggregateState();
foreach (var e in givenEvents)
{
    state.Apply(e);
}    
var thenEvents = new List<IEvent>();
var ar = new BillAggregate(thenEvents.Add, cs);
foreach (var c in whenCommands)
{
    ar.Execute(c);
}
// do something with the events that were produced.
// for example - append them to the history and then publish in async
// or do both at once and face 2PC
return thenEvents;

That's, basically, it. Note, that we are not relying on any frameworks, code or interface definitions outside the scope of this article. A few caveats:

  • aggregate identities are carried outside of the commands/events and passed by the message bus via strongly-typed message contexts (see Lokad CQRS PDF for detail). Aggregates don't care about their own identity.
  • versioning and version checks are not within the scope of this article, yet they could be added to the snippet above as needed.
  • all commands that come in, are joined by a logical transaction; obviously your message bus must support command batching in order for this to work.

This is the current approach of Lokad to Event Sourcing in the distributed world. It's likely to evolve a bit further, if we find ways to make it even more simple and straightforward.

BTW, the situation gets even more interesting if we assume that:

  • all messages (commands and events alike) carry unique identifier that is used at all steps of message processing to enforce message deduplication (required for repartitioning or cloud environment in general).
  • entity identities (i.e.: aggregate root identifiers) that are carried in the trasport headers, not only simplify our contracts (while still being exposed to the domain code in a decoupled way), but also provide simple foundation for message routing and aggregate re-partitioning.
  • if we keep track of the message causality (event X was caused by command batch Y) in the transport headers along with client vectors, this provides us with the foundation to do partial message ordering (for the cases where cloud environments are really stressed and tend to mess up order a lot).

This topic is continued in the post on Tape Storage, which serves as persistence foundation for event sourcing and also enable fully portable systems to be developed and deployed with Lokad.CQRS.

Published: June 26, 2011.

Next post in Event Sourcing story: Lokad.CQRS Retrospective

🤗 Check out my newsletter! It is about building products with ChatGPT and LLMs: latest news, technical insights and my journey. Check out it out