Implementing C# projections for Event Store
In one of my previous posts I mentioned migration of some Lokad systems to dedicated Event Store. One of the steps in this migration process involve switch from legacy Lokad.CQRS View projections to new C# projections.
New version of view projections does not have smart in-memory replay managed by the system, but they are inherently faster due to batch processing nature. Design is really simple: we subscribe to events from the storage (from last known checkpoint) and pass to projection, batching events together for better performance. After each batch, we update checkpoint.
Here's how one projection implementation (maintaining list of comments per account) might look like:
public sealed class AccountViewProjection : BatchProjectionFor<AccountView>
{
public AccountViewProjection(IKeyValueStore store) : base(store)
{
RequestEventsInBatchesOf(100);
}
public override void HandleEventsIdempotently(ICollection<object> events)
{
var comments = events
.OfType<AccountCommentAdded>()
.ToLookup(c => c.AccountId);
if (!comments.Any()) return;
// This batch change is fast, but it must be idempotent
// for this specific projection type, since underlying
// storage does not support transactions spanning multiple keys
// Probably we could make this async...
comments
.AsParallel()
.ForAll(g => Store.UpdateEnforcingNew(g.Key, view =>
{
foreach (var added in g)
{
view.AddComment(added.Comment, added.Manager.FullName);
}
}));
}
}
This design is shaped by the constraint that we need to work efficiently with dead-simple key-value storage like Azure blob storage (but support simple migration to any database engine). This is caused by the fact that existing Lokad.CQRS projections run use this storage.
What we actually do here - for each projection:
- Start a projection manager as separate runtime task (can be implemented as Task that is retried on failure).
- Calculate projection version from the codebase. If projection code has changed, then kill all the cached views and reset checkpoint.
- Subscribe to specific event stream starting from the last known checkpoint.
- Feed retrieved events in batches to the projection.
- After processing batch, update the checkpoint.
Please, keep in mind: this is the very first version that is not optimised. I'm following mantra: first, make it work, then make it beautiful, then make it fast.
Additional notes:
- Projection can actually specify batch size it's willing to accept.
- Detection of changes in projection's codebase is done using the same code as in projection rebuilder of Lokad.CQRS (via Mono.Cecil).
- Since each projection subscribes to event store individually, they are completely independent. However, this could mean a lot of traffic going through. Simple solution would be to have an event store cache per node, which is shared by all projections on the same node. This cache could be either in memory or on disk. See mantra, though.
- In case of projection failure, it will blow up it's manager. Runtime will restart this task and projection will resume from the last known checkpoint. If we have too many repetitive failures - circuit breaker will kick in to let the system cool down.
What do you think? How do you implement your view projections in C#?
Published: August 16, 2013.
🤗 Check out my newsletter! It is about building products with ChatGPT and LLMs: latest news, technical insights and my journey. Check out it out