Tape Storage in Lokad.CQRS for Event Sourcing
Ok, it looks like we are starting to head towards more stable interfaces for the tape storage in Lokad.CQRS. Tape storage, just like any other block in the scaffolding, is an extremely simple piece on it's own. Here's the definition:
Tape Storage in Lokad.CQRS Framework is a set of interfaces (and their implementations) that are designed around append-only storage. You can define named streams (tapes) and then append blocks of data (simple byte[]) to these streams (optimistic concurrency is supported). You can also read data rather efficiently.
This weird-looking storage concept is actually the only thing you need to support event sourcing with custom replication and redundancy that could be plugged on top (a bit more in an old post of mine). Obviously, just like it is with any other concept in Lokad.CQRS, this design considers cloud environments and volatility from the start.
Another usage for the tape storage is persisting domain logs which represent a filtered subset of messages (events and/or commands) that went through some partition. They are normally used for audits, replicating data or rebuilding views (either locally or in the cloud).
Tape storage interfaces (bound to change a little bit) currently look like this:
/// <summary>
/// Factory for storing blocks of data into append-only storage,
/// that is easily to scale and replicate. This is the foundation
/// for event sourcing.
/// </summary>
public interface ITapeStorageFactory
{
/// <summary>
/// Gets or creates a new named stream.
/// </summary>
/// <param name="name">The name of the stream.</param>
/// <returns>new stream instance</returns>
ITapeStream GetOrCreateStream(string name);
/// <summary>
/// Initializes this storage for writing
/// </summary>
void InitializeForWriting();
}
Where each stream is:
/// <summary>
/// Named tape stream, that usually matches to an aggregate instance
/// </summary>
public interface ITapeStream
{
/// <summary>
/// Reads up to <see cref="maxCount"/> records with <see cref="offset"/>.
/// </summary>
/// <param name="offset">The number of records to skip.</param>
/// <param name="maxCount">The max number of records to load.</param>
/// <returns>collection of blocks</returns>
IEnumerable<TapeRecord> ReadRecords(long offset, int maxCount);
/// <summary>
/// Returns current storage version
/// </summary>
/// <returns>current version of the storage</returns>
long GetCurrentVersion();
/// <summary>
/// Tries the append data to the tape storage, ensuring that
/// the version condition is met (if the condition is specified).
/// </summary>
/// <param name="buffer">The data to append.</param>
/// <param name="appendCondition">The append condition.</param>
/// <returns>whether the data was appended</returns>
bool TryAppend(byte[] buffer, TapeAppendCondition appendCondition);
}
We currently (in the current trunk) support following implementations of the tape storage:
- In Memory - part of portable core, for in-memory testing.
- File - part of portable core, for out-of-cloud deployments;
- MS SQL - part of Azure extension;
- Azure Blob Storage - part of Azure extension.
No other implementations are planned to be added to the framework, but it is trivial to write your own.
I'll address the question of plugging these stores into the event sourcing a bit later. In the mean-time, let me share with you some details on the File-based implementation of Tape Storage.
File Tape Storage will be the primary persistence option for developing decoupled systems with Lokad.CQRS and deploying them to production in on-premises or custom cloud scenarios. This addresses fully portable scenario of Lokad.CQRS.
In the File tape storage, we have a file per stream. Implementation is optimized for relatively fast atomic reads and fast file enumerations. Replication and redundancy are easy to implement.
In addition to these common features, we have added:
- SHA1 hashing for the data - to detect cases, where server was shot before finishing the commit.
- signature-based block boundaries - to support partial data recovery (manually) if really important file is corrupted by file system (and you didn't have event replication).
- readability.
The last one is fun and is based on the internal Lokad experience. I found myself more productive in the initial phases of the development, when I could open some data file in a notepad and read it. This helps in debugging and visualizing complex systems.
Hence, my choice for all new projects is to use human-readable serialization whenever possible. For example, using JSON serialization for Atomic Storage and message contracts. You can always "optimize" by switching system to more efficient formats later (since storage interfaces are designed to be simple, migrating them is also simple on purpose).
However, with the file-based tape storage, it didn't work out nicely in the beginning. You see, we have to write some binary system information to the file in addition to the actual data blocks (SHA1 hashes, signature bytes, length specifiers etc). This leads to files that look garbled, even if the actual data contents are human-readable:
So I took the effort to make sure that: if the contents of the binary blocks are human-readable, then the entire file will be human readable without any unicode-gone-wrong garbage:
These header and footer comment lines are actually system information bits and signature lines that by some coincidence happen to be readable, when viewed in text editor.
Side benefit effect is: that if you are storing system initialization info or BDD scripts for your ES aggregates in version control using Lokad.CQRS tape storage and readable serialization, then these files would be actually readable and diff-able as well.
If you are dumping some binary-encoded data into the same tape storage (for example using Google Protocol Buffers instead of ServiceStack Text Serializers), then this implementation of tape storage would still be able to handle it, creating just 108 bytes of overhead per block (this includes positioning pointers, signatures and SHA1 hash). We can still scan a file of 100MB (10k records of 1024 bytes) in 1.5 seconds, including all hashing and transformations. This is without any performance optimization.
BTW, normally your aggregates would rarely go beyond 10000 events (for normal business scenarios). And different aggregate instances are stored in different files anyway.
However, if your system grows to the point, when this becomes the bottleneck, then it will be justified to take any of the provided implementations as a sample and write custom tape storage implementation, fine tuned for blazing performance and the details of your unique project. Since interfaces are designed to be simple on purpose, that shouldn't be hard to do. Besides, by then you would have months of production usage behind your back and human-readable event sourcing files would offer no additional benefit for you. BTW, also in a few months we should probably have the maintenance console open-sourced and capable of visualizing any provided tape storage in human-readable form.
As the story develops, there are a few more interesting things planned in the short term, as we move forward with the distributed development and Lokad.CQRS. Stay tuned!
Monday, July 4, 2011 at 23:55
Reader Comments (2)
Hi Rinat,
How do you handle snapshots? I didn't see a snapshot interface or snapshot like implementation.
Thanks
Al, snapshots are just one optimization to speed up reads. I actually do something even more simply in my systems - cache event streams in memory. If you need to implement actual snapshots for some reason, you can just serialize aggregate state and use it.