Real-Time Analytics with Go and LMDB

I'd like to share a few details about a project I've been working this year: a lean platform for real-time analytics and batch reporting.

Project Inception

The project started as an internal tool for running ad-hoc reports on business event streams. Existing tools took 5-8 hours to download a billion of events, replay them and produce useful reports.

In my spare time I started working on experimental tooling to speed up the process and figure approaches that combine iterative and incremental event processing with occasional full replays.

Constraints

This project had a few constraints from the start:

  • No multi-node data processing frameworks. Although Kafka and Spark could do the job, they come with dependencies like ZooKeeper and DevOps related overhead. I needed something simple that could efficiently utilize hardware on a single machine.
  • Encryption and compression had to be supported by the infrastructure from the start. While modern file systems could handle both aspects, in some specific cases it would be more efficient to let the application layer handle these concerns.
  • Throughput over latency is a trade-off that allows to avoid a lot of complexity in the code. Analytical pipelines are likely to spend seconds and minutes digging through the numbers anyway, so there is no need to optimize for sub-millisecond latency for the individual events.
  • Simplicity and reuse - an inherent constraint, since the project was initially running on a limited time budget (roughly 15 hours per month).

First Version in .NET

The initial implementation was mostly done in .NET, since that is the platform I was most familiar with.

Implementation consisted of:

  • Analytical event storage, optimized for throughput and supporting a single writer with multiple readers.
  • Data processing stages responsible for incremental event import, compression, filtering and aggregation; they operated on event storage streams (similar to how Kafka is used in multi-node systems), using durable in-process database for indexes, metadata and non-stream artifacts.
  • Full replay stages which operated on compressed event streams, feeding them to in-memory actors for anomaly detection and fine-grained analysis.
  • Visualization and report generation was done with python (charting) and lisp (html dahboard generation) stages.

Dashboard

This overall setup was similar to what you'd do these days with Apache Kafka, Spark and Spark Streaming, but it lived on a single machine. It was able to provide up-to-date visual and numeric answers to complex questions in matter of minutes instead of the hours.

Anomaly detection Weekly patterns

This worked because "heaviest" data transformations were incremental, while full replays operated on optimized datasets.

LMDB - The Transactional Heart

Almost all of the storage needs were handled with LMDB, which saved a ton of time in development without sacrificing performance.

LMDB responsibilities:

  • meta-data of the analytical event store: event chunk headers, event processing checkpoints;
  • indexes, lookup tables, aggregated counters and mappings used by the data processing stages;
  • final reporting artifacts: tables, lookups, time-series and meta-data.

Switching to golang

Some time along the way, I started porting core parts of the platform to golang for following reasons:

  • Golang offers lightweight concurrency primitives that work well for real-time data ingestion, aggregation, incremental and batch processing.
  • I wanted to migrate deployments to Linux/Unix to reduce hosting costs and gain access to its ecosystem: tools and knowledge on performance optimization.
  • Golang also works well for fast and efficient API gateways that ingest data into the system or expose resulting data to the other systems.

First Linux deployment

First deployment of the new version was rolled out recently. It is an analytical platform that captures events from real-time bidding auctions (header bidding and google AdX) coming from thousands of sessions running in parallel. This data can then used for real-time insight, experiments and long-term reporting driven by the financial indicators.

This deployment handles 10M impressions per day across 4 different shards.

AdOps Overview

A few interesting features:

  • real-time dashboards with statistics for CPM, revenue (including Google AdX estimate), Ad size performance, bidder performance and responses, placements, user segments and sessions;
  • aggregating and slicing data across placements, bidders, shards and experiments;
  • running experiments (fine-grained A/B testing) and tracking their financial impact in real-time;
  • long-term data retention and analysis which enable tracking revenue indicators per session and user lifetime, bidder behavior changes and the long-term impact of experiments.

Bidder Responses Sessions

Implementation Details

Complete tech stack of this solution is rather simple:

  • Ubuntu 16 LTS running on dedicated server (e.g.: Intel Xeon with DDR4 ECC RAM and SSD RAID on NVMe);
  • Nginx for API intake and SSL termination;
  • Golang and LMDB as the data processing heart of the system;
  • Partitioned analytical event storage with AES encryption of data-at-rest (high-throughput, single writer and multiple readers);
  • Telegraf with InfluxDB and Grafana for real-time display;
  • python for custom reports that don't fit real-time UI.

System overview

What Next?

It will be interesting to push this platform to its limits either with data loads or challenging ML tasks. So far, event streams running at rate of 10-30M events per day from thousands of connected devices aren't a big challenge for a single small-ish server with Linux, golang and LMDB.

Ping me if you need to capture events in real-time and run some analytics and reports over this data. This could be done either as a managed service with fixed monthly cost or a delivered project. Dedicated data engineering support could also be provided on a monthly basis (tweaking reports, exploring data and tuning ML algorithms).

- by .