Skip to content

Execution Engine

Epsio's execution engine's main responsibility is to plan and maintain results of views (pre-defined queries with names). Each view receives changes from the forwarder and streams them through its internal operators, until finally the changes reach the sink, where they are consolidated and written to the user's database (this is done continuously until either terminated because of a user action or an error that is unrecoverable). The view operators heavily rely on intermediate states which allow them to efficiently stream changes through them, without ever performing full recalculations.

Planning the Dataflow

To create a view, users run the epsio.create_view('<view_name>', 'SELECT ... '); command on their database. This command is forwarded to the execution engine via the Commander, where the planning will begin. Epsio's planning takes several steps:

  1. Break down the query into an AST (abstract syntax tree).
  2. Create a logical plan from the AST. This plan defines what high-level steps logically need to take place (such as JOIN, GROUP BY, etc). If we ran
CALL epsio.create_view('my_view', 'SELECT count(*) FROM example_table e JOIN numbers_table n ON e.b = n.a WHERE e.b > 30')

the unoptimized plan would look something like this:

Projection: COUNT(*)
  Aggregate: groupBy=[[]], aggr=[[COUNT(*)]]
    Filter: e.b > Int64(30)
      Inner Join:  Filter: e.b = n.a
        SubqueryAlias: e
          TableScan: example_table
        SubqueryAlias: n
          TableScan: numbers_table
3. Optimize the logical plan. There are a large series of optimizations, both statistical (such as JOIN order & type) and not (push down filters & projections). For example, in the above query, the unoptimized plan would be optimized to:

Aggregate: groupBy=[[]], aggr=[[COUNT(*)]]
  Inner Join: e.b = n.a
    SubqueryAlias: e
      Filter: example_table.b IS NOT NULL AND example_table.b > Int64(30)
        TableScan: example_table projection=[b], partial_filters=[example_table.b > Int64(30)]
    SubqueryAlias: n
      Filter: numbers_table.a IS NOT NULL AND numbers_table.a > Int64(30)
        TableScan: numbers_table projection=[a], partial_filters=[numbers_table.a > Int64(30)]
As can be seen in this example, projections are pushed down as well as filters (in order to avoid pulling unnecessary columns or rows during the initial view population stage).

4. Convert the logical plan into a physical plan, and run physical optimizations, such as grouping multiple operations together. The total physical plan is a graph of nodes, with the sink (the node that is responsible for sinking back into the user's database) always being the last. Each node is called a Collection; there are ~15 types of Collections.

Executing a View

The final stage when planning a view is to convert the logical plan (built from the query) into a physical one. This physical plan is essentially a graph of nodes, with the final node being the sink node (the node that is responsible for sinking back into the user's database). These nodes are called Collections. Each Collection is responsible for doing a certain operation, and then transferring relevant changes (referred to internally as diffs) to the next collection. For example, the above view

CALL epsio.create_view('my_view', 'SELECT count(*) FROM example_table e JOIN numbers_table n ON e.b = n.a WHERE e.b > 30')

would be planned out into a series of collections roughly like this:

Trough

Stateful vs Stateless Collections

Some collections are stateless, while others are not. A collection only holds state if necessary in order to stream diffs efficiently- in the above diagram, as can be seen with the "database" icons, only the reduce and join collections hold state. The join collection, which is responsible for JOINing two collections based on an equivalent key, has a state which holds mappings of each input's keys to their respective values. When a diff comes in on one side, the JOIN collection looks to the state it holds on the other side and finds the equivalent diffs which need to be "joined" to it. The reduce collection, which is responsible for aggregating incoming diffs and outputting aggregated diffs, holds the state of all aggregations. In the above example, the reduce collection would hold a single value- the total count. Upon receiving a diff, it would either subtract or add from the total count, and then output a diff removing the old count, and another diff adding the new count.

A filter collection, conversely, does not need to hold any state in order to filter out incoming diffs. Another such collection which does not require state is map (a collection whose job is to map one diff to another, for example projecting columns).

All stateful collections in Epsio are based on a highly optimized fork of RocksDB. Each stateful collection has its own RocksDB state, and each state is uniquely configured to work efficiently for its collection.

Execution of Collections

Epsio opens several threadpools, based on Tokio, Rayon, and RocksDB regardless of the number of views:

  1. An AsyncIO threadpool, with threads equal to the amount of CPUs, for non CPU intensive tasks which need to be executed with extremely low latency regardless of CPU pressure. Tasks in this thread will often open other tasks and await them (giving room for other tasks to run on these threads until the opened tasks finish)- meaning these threads should usually be available.
  2. A threadpool for CPU intensive tasks (with threads also equal to the amount of CPUs), such as sorting diffs or consolidating diffs together.
  3. A threadpool for IO tasks. This threadpool usually has roughly 512 threads regardless of the amount of CPUs.
  4. Internal RocksDB threadpools, which are used for flushing memory to disk, compactions, and more.

Each collection creates a task which is run in the first threadpool (the AsyncIO threadpool). When a collection receives diffs, it opens tasks in the two other threadpools, and awaits them. Collections are highly parallelized and break down the job they need to do into many tasks; if there exist enough threads available in the threadpool to run all the tasks, a single Collection will fully utilize all resources available. If not, the Collection's tasks may simply run sequentially on the same thread. Because of many optimizations done in rayon, most CPU intensive work will still take advantage of thread locality if possible.

Transferring Messages between Collections

Epsio views are transaction consistent, never showing intermediate results. Transactions can be batched together if they arrive at the same time, to ensure high throughput (the default is batching collections every 50ms). Each collection can receive two types of messages:

  1. Diffs - a micro-batch of Diffs
  2. TimeEnd - An end to a given series of transactions.

When a collection has processed all diffs and receives a TimeEnd, it is consistent transactionally with the user's database. For joins (or any other collection with more than one input), it must receive TimeEnds from all inputs before it considers a time "done".

For example:

Trough

In the above diagram, there are three messages going through- two Diff batches, and one TimeEnd. The Sink will receive diffs and consolidate them with each other, but only upon receiving a TimeEnd will it sink all received diffs into the user's database.

There are two different ways to transfer messages:

  1. Mmap based buffers - Buffers are created between the sending and receiving collection. Each collection coordinates with a centralized Memory regulator to pull a safe amount of diffs from the buffer, and then processes the micro-batch of diffs.
  2. Synchronously - The collection receives the next collections operation as a callback, and simply runs it on each outputted diff. This is common for stateless operations such as filter or map, where the cost of having a buffer is not worth the ability to run asynchronously to the next operation.