Home © Rinat Abdullin 🌟 AI Research · Newsletter · ML Labs · About

Query Multiple Aggregates from Event Stream

Given a domain log (containing all events and commands of some context) it's really easy to load any aggregate (or any group of them) and perform some query, given that you are using event sourcing.

Last night I've got a problem with one of the systems running on staging grounds in Rackspace cloud. This system performs some big data processing and analytics for OSA analysis. One of the required steps is to assemble data and then run massive Map Reduce operation on top of it, involving some 3rd party services. Roughly 8 reduce batches (out of 300) failed with the timeouts caused by that external service. This morning I have to figure out the exact data problem and find ways to 'scavenge' the results without rerunning long operation again.

Luckily I'm using good old CQRS/ES to manage all behaviors and integrations (actual data is never referenced or managed by AR+ES entities). Also, to facilitate debugging (and view rebuilds), systems record all passing messages into a separate event stream called domain log. So the task is reduced to getting the latest version of this stream (which is just an append-only file) and loading it in a snippet:

var dataSerializer = ProtoBufDataSerializer.LoadAndScanDomains();
var envelopeSerializer = new ProtoBufEnvelopeSerializer();
var envelopeStreamer = new EnvelopeStreamer(envelopeSerializer, dataSerializer);

// load all messages
var stores = new FileTapeStream(@"C:\temp\domain.tmd")
    .ReadRecords(0, int.MaxValue)
    .Select(b => envelopeStreamer.ReadAsEnvelopeData(b.Data))
    .SelectMany(b => b.Items.Select(i => i.Content))
    // pick only events for an aggregate bound to StoreId
    .OfType<IEvent<StoreId>>()
    // group them and load into the state objects
    .GroupBy(b => b.Id.Id)
    .Select(events => new StoreAggregateState().With(events));

foreach (var store in stores)
{
    var missingForecasts = store.Forecasts.Values.Where(f => !f.Delivered);
    foreach (var forecast in missingForecasts)
    {
        Console.WriteLine("Missing {0} with key {1}", forecast.Dataset, forecast.ApiKey);
    }
}

That gives me enough data to check out the missing batches and deal with the situation.

Important: obviously the purpose of this snippet is to facilitate the debugging. In production you would almost never use such queries across the entire domain log. Projections are a better fit here.

This approach relies on strongly-typed identities and their binding to aggregate roots in order to simplify querying of event stream. However this is not essential.

Published: November 16, 2011.

🤗 Check out my newsletter! It is about building products with ChatGPT and LLMs: latest news, technical insights and my journey. Check out it out