Blog
chevron-right
Progressive Event Stream Analytics at Scale Using Kubernetes, DragonflyDB and TypeScript

Progressive Event Stream Analytics at Scale Using Kubernetes, DragonflyDB and TypeScript

Przemyslaw Maciolek
September 11, 2024

We have been asked how Motif Analytics work underneath on more than one occasion. There is a simplified answer for that question (”workers process SOL query and we map/reduce results”), but it does not bring all the interesting design decisions we did and optimized over the last two years.

When we started working on it, we initially used an approach similar to our local (in-browser) mode, hoping to reuse a lot of abstractions. I.e. run the main process and a bunch of workers tightly coupled with it. Unfortunately, Worker Threads just didn’t work as well as Web Workers for us. We’ve been hitting some unexpected scaling limits rather quickly and using Worker Threads with Typescript (and the multi-module code transpilation) was painful on its own. Add to this the limitations in memory sharing between the “threads” and we ended up seeing almost no benefits of using them. So we took a step back and fully decoupled the worker from the main thread, leveraging the shared-nothing architecture approach. It took us a while to get there but we ended up with a pretty robust and scalable architecture.

I am an engineer myself and concluded it might be fun to read, hold tight!

The challenge

Motif is sequence analytics

One of the fundamental distinguishing features for Motif Analytics is that it operates on the concept of sequence, rather than tabular data or event streams. A sequence is essentially an ordered list of events, grouped by actor and can be trivially generated from a table, It brings new exciting capabilities and requires special tools to operate on it. For that reason, we brought SOL (Sequence Operations Language) which allows working with this data model.

In database taxonomy, Motif Analytics might be considered a NoSQL Array Database. Was it really required to come up with another database? In their paper: “What Goes Around Comes Around… And Around…”, Michael Stonebraker and Andrew Pavlo argue that repeated efforts to replace either SQL or the Relational Model are destined to failure. They review the last 20 years in databases and conclude that all alternatives were either already replaced by RDBMS/SQL or are going to get merged soon. However, one exception is Array Databases:

They may become more important because RDBMSs cannot efficiently store and analyze arrays despite new SQL/MDA enhancements.

Even if we’re building a database engine, it’s not as hard as it used to be. We reuse a lot of existing parts and Apache Arrow makes it much easier than it was, say, 10 years ago. Yet still, it’s not a trivial project.

Motif is progressive analytics

It is our core belief that we should remove friction, to the extent possible, from the analytics workflow. One way to look at it is that not all queries are created equal and different tradeoffs might be associated with them.

  • exploration (S1) queries are formed many times, should be interactive (e.g. <2s) and might have lower sampling rate associated
  • for hypothesis forming (S2) the demands on confidence intervals are tighter, though the queries can also be sampled and should still be fairly fast (e.g. <10s)
  • for reporting (S3) the queries are run less frequently and can be even considered a batch job, yet these should not be sampled anymore

All three modes are available with Motif. Additionally, rather than implying a predefined sampling rate, Motif uses adaptive random sampling techniques, which ultimately means that it’s all effortless to use. To top it off, the sampling rate can be progressively increased, if needed, via our incremental query engine.

We believe in simplicity

I very much remember my first interview with Theron, CTO and co-founder of Motif. He mentioned how important it is for him to focus on simplicity in engineering and I could not agree with him more. This comes at a caveat though: building complex is simple, yet building simple is hard!

Simplicity comes in different shapes and forms. I see it mainly as (to the extent possible):

  • use stateless, ephemeral processes
  • favour composition over inheritance
  • balance new abstractions
  • keep unidirectional flow
  • prefer idempotent operations
  • support fault tolerance
  • leverage existing standards, formats, services and libraries if they fit well and don’t come at high maintenance cost
Our code is 100% TypeScript (at least right now)

TypeScript is great and makes it easy to build things running both on the server and in the browser. It can deliver outstanding performance for a scripting language, yet it also has some limitations which make it challenging for high-throughput data processing engines. Perhaps the biggest one is inherent cost of serializing/deserializing data, which significantly reduces the usability of distributed caching. We found that in many cases serializing and then deserializing data using the fastest means possible takes as much time as fully processing it from raw input.

Motif Cloud Processing Architecture

Motif provides two ways to work with data. In the local mode, data can be loaded and processed fully in the browser, which works great for evaluating smaller datasets (typically, up to 2M events). For larger datasets, we offer Cloud Mode, which currently supports up to 10B+ event datasets (we’re working on supporting larger datasets), runs on Kubernetes and handles autoscaling.

Before we get there, lets look at our processing pipeline bottom-up.

Data Blocks

Motif Analytics introduces two core model primitives. You probably know these fairly well already. We just provide some standardisation here:

  • event is a unique record, which is labeled by a name, associated with an actor and has some monotonic order field (typically by timestamp)
  • sequence is an ordered collection of events associated with a single actor

The ingestion first partitions the raw events by specified actor field, then sorts it and stores in data blocks (with all normalization and transformation done along the way). A block is a unique partition of data, which contains a collection of sequences. It might contain a single sequence (when the sequence is very large) or tens of thousands of them (when these are shorter). What’s crucial though is that a single sequence cannot span more than one data block. This approach simplifies feeding the query engine with data and allows to run many parallel query operations. As an added benefit, sampling for large datasets can be handled through randomly picking data blocks and processing them.

The ideal data block size is the one which allows for maximum parallelism, while not adding unnecessary overhead. In our case, this means that minimally, single block overall processing P95 latency should be below our allowed interactive query runtime (<2s).

Apache Arrow as the physical model (mostly)

We use a columnar model underneath and while initially we started with a custom implementation (for performance reasons) we eventually settled on Apache Arrow. This has a slight impact on the overall throughput (due to increased access times to values stored in memory through Apache Arrow), but makes it much easier to integrate with other frameworks and libraries. For example, we use DuckDB for reading the Parquet files (which we use for storing the Data Blocks). When the query needs to access only several dimensions, it is trivial to push them down to Parquet reader in DuckDB and avoid unnecessary deserialization and processing of the unused columns.

Of course, Apache Arrow columnar model still represents tabular data rather than a collection of sequences. To deal with that, we built a lightweight representation of sequences. Our ingestion layer ensures that all events in data block are ordered by actor and timestamp, which means we can assume they the sequences are contiguous in the input. Then, each sequence can be described by an array of positions in the Arrow Table. For example, if a sequences has 4 events, which are stored as records at row indexes 100-103, then the array would contain [100,101,102,103].

Removing and adding events

While update operations of individual dimension values can be handled directly through Apache Arrow API, it’s more complex for operations which remove or add events.

Actually, removal is the simplest case. If, through REPLACE operation, an event would be removed or reordered, it is simply a matter of updating the event array indexes accordingly. E.g. removing the second event would update the example array to 100,102,103

Adding events (again, through SOL REPLACE operation) requires more effort. Adding new records through Apache Arrow directly is rather expensive and would require rewriting the data. To avoid that, we use overlay columns and append new data at the indexes following the original Apache Arrow input data. For example, if now a new event was added to our example sequence, we might have added a new entry at row 200001 using the overlays and the array would now contain [100,102,103,200001]

Computing at scale

The datasets we handle for our customers are frequently made out of tens of thousands of data blocks. This allows for great parallelism, but also requires some coordination of the effort. Also, we do not have unlimited resources. If the dataset contains 10,000 data blocks, it does not mean we will run it concurrently on 10,000 workers. Instead, we have a pool of available workers (which autoscales as needed) and we feed blocks to it.

Let us bring some abstractions we use in our architecture:

  • Metadata is a file which contains information about the dataset and its specific data blocks, including each block path, number of sequences, etc.
  • Object Storage stores Data Blocks and Metadata
  • Task Scheduler is a subsystem which manages adding (or removing) tasks for ongoing request. It is aware of what blocks are available for specified dataset and which were processed already.
  • Worker is a process which can consume and process tasks from Scheduler and output the results down the stream. It fetches the blocks directly from the Object Storage
  • Combiner can combine multiple outputs from Workers for a specific request (to reduce the further processing volume)
  • Reducer does the last step of processing, gathering all results and outputting the data which can be consumed by the client

The processing following a map-combine-reduce flow (you can think of it as of a version of leaf→intermediate aggregator→root aggregator, as described in Facebook Scuba paper). The  map operations are most processing intensive. They take data blocks, SOL query and requests params on the input.

It’s critical to keep low latency for all of this this and handle all failures gracefully. We found that DragonflyDB is a great choice for that. It is lightweight, has great developer experience and implements the Redis protocol, which nicely complements our core beliefs in simplicity:

The Redis protocol has served us well in the past years, showing that, if carefully designed, a simple human readable protocol is not the bottleneck in the client server communication, and that the simplicity of the design is a major advantage in creating a healthy client libraries ecosystem.
Task Scheduler

The task processing management can be broken into four interconnected parts:

  • task management, which decides whether more tasks need to be pushed (or the request should be cancelled); as a simple rule, it can be assumed that each task represents one input data block for a specific query request
  • regular task stream, which allows to pull the task by a random worker
  • cached task stream, which sends the task to a specific worker, which advertise to have cache results available
  • the worker itself, which gets and processes the task

When the request from the client comes to the backend, the request state is initialised and a new task scheduler instance is created. It will create a list of blocks to process from the dataset metadata and start pushing these for the workers to handle. Each iteration the current capacity is checked. These properties are tracked in DragonflyDB instance and updated by workers on starting and finishing the tasks. If there are task slots available for the given request, a block ID is removed from the list and a new task is created. It includes the request details and is associated with a single data block. It also includes information about where to route the responses back, so these could be reduced when ready.

The task is sent using Streams and Consumer Groups. These can be considered a type of a message queue, which some extra capabilities for delivering and acknowledging the messages.

The task scheduler keeps scheduling the tasks until all of them were processed or timeout happens. E.g. if it’s an interactive task, the scheduler can decide it will return the responses after 2s basing on what it collected already and cancel the already scheduled tasks.

Request cancellation notifications are handled through a separate pubsub channel, so workers are aware of the requests which should be no longer processed and tasks can be dropped.

In an incremental task scheduler mode, rather than cancelling the task, it’s paused instead and its current state is kept in DragonflyDB. When a continuation request comes for this particular instance, it’s unpaused and processing continues. This provides continuous progressive analytics experience.

Worker

The worker receives the task and processes it. It first fetches the associated block from Object Storage, then executes SOL, does the Map operation (relevant to specific request, e.g. for Metrics API, it calculates aggregates and groups them accordingly) and eventually pushes the results out. They typically go through combiner using PUBSUB first and are periodically combined with other worker responses for the same request and then are stored in DragonflyDB for the final reduce phase.

We use heavily the Redis Stream/Consumer Group concepts (though implemented via DragonflyDB). When a message (task request) arrives at the worker using XREADGROUP, it starts being processed and is acknowledged on success (via XACK). If the worker fails for whatever reason, the message status will timeout and eventually it will be picked by one of the other workers through XAUTOCLAIM, unless the request was already cancelled. This allows to scale the number of workers quite significantly (we have successfully run several thousand concurrently so far).

There is however one interesting detail which justifies getting into another layer of depth: caching.

Regular vs Cached Task Stream

Out of all processing steps, running SOL is typically the phase which takes the biggest chunk of time (though frequently on par with deserializing the data from Parquet into Apache Arrow). Frequently, the user navigates through various visualisations or changes some params, which trigger a new API request, though fundamentally based on the same SOL query, for the same dataset.

If our backend would be written in Rust, Zig, Java or C++ (or many other languages), we would probably try reusing a distributed cache framework for these. However, with our backend in Javascript, things are more complicated. TLDR, the serialization/deserialization costs are prohibitive for using a distributed cache. So we came up with something on our own. We built a worker-aware local SOL results cache.

When the worker sees a specified SOL query + dataset combination for the first time, it caches the query results in a local, in-memory LRU-cache and sends a notification to a centralized key-value store (a Hashmap in DragonflyDB), indicating that a specified block is cached at a specified worker instance. The cache has some expiration time associated and additionally, if worker removes the entry from its local cache, the central key-value store is updated with information that the cache is no longer available.

Then, when the task scheduler is processing another block, it checks if it was potentially cached. If it was, instead of pushing a regular Redis Protocol Stream message, it instead sends a message directly to the specified worker PUBSUB channel and to another Redis Protocol Stream, dedicated for cached results. Worker then has one second (by default) to process the message. If it does not succeed, the cache request expires and it is later processed just like any other request through Redis Protocol Streams.

This approach has the best of both worlds - we have the ability to reuse the cached results, yet the request has some guarantees of not being lost, even when the worker is abruptly shutdown (e.g. due to Kubernetes Pod being killed).

Fault Resilience and Tolerance

Workers

Using message queue for feeding workers with tasks makes it much easier to reason about modes of handling failures. This is a capability not unique to Redis protocol, though it provides some unique semantics for dealing with it. On reading the message, it enters a Pending Entries List (PEL). This indicates the message is being processed. When worker processing the message unexpectedly dies, the message can be eventually picked up again through XAUTOCLAIM, which lists the messages that were idle for specified duration (essentially, a timeout). Only when message is acknowledged (XACK) it can be considered processed and is taken down from the PEL.

There’s a built-in safety mechanism for duplicate message processing. If the message was already acknowledged or removed from the queue, XACK will return “0” when trying to acknowledge it again, so it can be stopped from sending down the stream.

It’s worth noting though that in case of interactive queries, to not delay getting the response, a failed task might be not retried, in favour of returning partial (sampled) results at reduced latency.

Server and Task Scheduler

The fault-tolerance gets more interesting when dealing with server/task-scheduler failures. These can be actually continued! The key to achieve that is realising that our Incremental Task Scheduler does not really keep a state in memory but rather updates/reads state via DragonflyDB.

In case request fails (e.g. network connection issue), the client can actually retry it. By providing the same UUID of the request ID, the server can actually continue it rather than restart, reusing the already processed task results. This also makes the overall architecture much more resilient, since the only single-point-of-failure is now DragonflyDB (and it provides replication for high-availability).

Wrap Up

Where does this bring us? For the past three months we saw following stats:

The system is stable, scales well and haven’t caused a single outage (knock on the wood). We achieved that largely through keeping the design simple, but not too simple!. The key decisions were:

  • breaking the dataset into independent data blocks (great for parallelism and fault resiliency)
  • using Apache Arrow whenever possible, yet keeping a lightweight logical model over it (reusing existing OSS components but retaining the Sequence semantics)
  • building a task scheduler on top of DragonflyDB (Redis Protocol) Streams & Consumer Groups (high throughput, low latency and good fault resilience)
  • making service instances stateless and instead keeping the state in DragonflyDB (great for autoscaling, fault resilience and pausing/continuing the requests)

Our Cloud Architecture provides us with the fabric, which allows us run the SOL Query Engine over the data effortlessly. Our sequence-oriented language helps solve complex problems elegantly, though implementing it brings some interesting challenges from engineering perspective. Going to cover it in the subsequent post. Subscribe and stay tuned!

Sign up for an account to try Motif
Get started for free