The details and examples of Schemaless triggers, a key feature of the datastore that’s kept Uber Engineering scaling since October 2014. This is the third installment of a three-part series on Schemaless; the first part is a design overview and the second part is a discussion of architecture.
Schemaless triggers is a scalable, fault-tolerant, and lossless technique for listening to changes to a Schemaless instance. It is the engine behind our trip processing workflow, from the driver partner pressing “End Trip” and billing the rider to data entering our warehouse for analytics. In this last installment of our Schemaless series, we will dive into the features of Schemaless triggers and how we made the system scalable and fault-tolerant.
To recap, the basic entity of data in Schemaless is called a cell. It is immutable, and once written, it cannot be overwritten. (In special cases, we can delete old records.) A cell is referenced by a row key, column name, and ref key. A cell’s content is updated by writing a new version with a higher ref key but same row key and column name. Schemaless does not enforce any schema on the data stored within it; hence the name. From Schemaless’s point of view, it just stores JSON objects.
Schemaless Triggers Example
Let’s see how Schemaless triggers works in practice. The code below shows a simplified version of how we do billing asynchronously (UPPERCASE denotes Schemaless column names). The example is in Python:
We define the trigger by adding a decorator, @trigger, on a function and specifying a column in the Schemaless Instance. This tells the Schemaless triggers framework to call the function—in this case, bill_rider—whenever a cell is written to the given column. Here, the column is BASE, and a new cell written in BASE indicates that a trip has finished. This fires the trigger, and the row key—here, it’s the trip UUID—is passed into the function. If more data is needed, the programmer has to fetch the actual data from the Schemaless instance—in this case, from Mezzanine, the trip store.
The flow of information for the bill_rider trigger function is shown in the diagram below for the case where the rider is billed. The directions of the arrows indicate the caller and callee, and the numbers next to them indicate the order of the flow:
First the trip enters Mezzanine, which makes the Schemaless Trigger framework call bill_rider. When called, the function asks the trip store for the latest version of the STATUS column. In this case the is_completed field does not exist, which means the rider has not been billed. The trip information in the BASE column is then fetched and the function calls the credit card provider that will bill the rider. In this example we succeed in charging the credit card, so we write back success to Mezzanine and set is_completed to True in the STATUS column.
The trigger framework guarantees that the bill_rider function is called at least once for every cell in the Schemaless instance. A trigger function is fired typically once, but in case of errors either in the trigger function or transient errors outside of the trigger function, it may be called multiple times. This means that the trigger function needs to be idempotent, which in this example is handled by checking whether the cell has already been processed. If so, the function returns.
As you read about how Schemaless supports this flow, keep the example in mind. We’ll explain how Schemaless can be viewed as a log of changes, discuss the API as it relates to Schemaless, and share the techniques we use to make the flow scalable and fault-tolerant.
Schemaless as a Log
Schemaless contains all cells, which means it contains all versions for a given row key, column key pair. Because it holds this history of cell versions, Schemaless acts as a log of change in addition to a random-access key-value store. In fact, it is a partitioned log, where each shard is its own log, as shown in the diagram below:
Every cell is written to a specific shard based on the row key, which is a UUID. Within a shard, all cells are assigned a unique identifier, called the added ID. The added ID is an auto-incrementing field that denotes the insertion order of the cells (newer cells will get higher added IDs). In addition to the added ID, every cell has a datetime for when the cell was written. The added ID for a cell is identical on all replicas of the shard, which is important for failover cases.
The Schemaless API supports both random access and log-style access. The random-access API addresses individual cells, each identified by the triple row_key, column_key, ref_key:
put_cell (row_key, column_key, ref_key, cell):
// Inserts a cell with given row key, column key, and ref key
get_cell(row_key, column_key, ref_key):
// Returns the cell designated (row key, column key, ref key)
// Returns the cell designated (row key, column key) with the highest ref key
Schemaless also contains batch versions of these API endpoints, which we omit here. The trigger function bill_rider, shown earlier, uses these functions to retrieve and manipulate individual cells.
For the log-style access API, we care about the shard number and timestamp and added ID (collectively called location) of the cells:
get_cells_for_shard(shard_no, location, limit):
// Returns ‘limit’ cells after ‘location’ from shard ‘shard_no’
Similarly to the random-access API, this log-access API has some more knobs to use to batch fetch cells from multiple shards at once, but the above endpoint is the important one. The location can either be timestamp or added_id. Calling get_cells_for_shard returns the next added ID in addition to the cells. For example, if you call get_cells_for_shards with location 1000 and ask for 10 cells, the next location offset returned would be 1010.
Tailing the Log
With the log-style access API you can tail a Schemaless instance, much like you would tail a file on your system (e.g., tail -f) or an event queue (e.g., Kafka) where the latest changes are polled. The client then keeps track of the offsets seen and uses them in the polling. To bootstrap the tailing, you start from the first entry (i.e., location 0) or from any time or offset after.
Schemaless triggers accomplishes the same tailing by using the log-style access API, and it keeps track of the offsets. The benefit over polling the API directly is that Schemaless triggers makes the process fault-tolerant and scalable. Client programs link into the Schemaless triggers framework by configuring which Schemaless instance and which columns to poll data from. Functions or callbacks are attached to this data stream in the framework and are called, or triggered, by Schemaless triggers when new cells are inserted into the instance. In return, the framework spins up the desired number of worker processes on the hosts where the program is running. The framework divides the work over the available processes and handles failing processes gracefully, by spreading the work from the failing process over the remaining healthy processes. This division of work means that the programmer only has to write the handler (i.e., trigger function) and make sure it’s idempotent. The rest is handled by Schemaless triggers.
In this section, we will discuss how Schemaless triggers scales and minimizes the impact of errors. The diagram below shows the architecture from a high-level perspective, taking the billing service example from earlier:
The billing service that uses Schemaless triggers runs on three different hosts, where we (for brevity) assume one worker process per host. The Schemaless triggers framework divides the shards between the worker processes so that only one worker process handles a specific shard. Notice that Worker Process 1 pulls data from shard 1, while Worker Process 2 handles shards 2 and 5, and Worker Process 3 handles shards 3 and 4. A worker process only deals with cells for the assigned shards by fetching new cells and calling the registered callbacks for these shards. One worker process is designated leader and is responsible for assigning shards to worker processes. If a process goes down, the leader reassigns the shards for the failing process to other processes.
Within a shard, cells are triggered in the order in which they were written¹. This also means that if the triggering of a particular cell always fails due to a programmatic error, it will stall cell processing from that shard. To prevent delays, you can configure Schemaless triggers to mark cells that fail repeatedly and put them on a separate queue. Schemaless triggers will then continue with the next cell. If the number of marked cells exceeds a certain threshold, the triggering stops. This often indicates a systematic error, which needs to be fixed by a human.
Schemaless triggers keeps track of the triggering process by storing the added ID of the latest successfully triggered cell for each shard. The framework persists these offsets to a shared storage, such as Zookeeper or the Schemaless instance itself, which means that if the program is restarted, the triggering will continue from the offsets stored in the shared storage. The shared storage is also used for meta-info, such as coordinating the leader election and discovering added or removed worker processes.
Scalable and Fault-Tolerant
Schemaless triggers is designed for scalability. For any client program, we can add worker processes up to the number of shards (typically 4096) in the tailed Schemaless instance. Moreover, we can add or remove workers online to handle varying load independently of other trigger clients for a Schemaless instance. By keeping track of the progress purely in the framework, we can add as many clients as possible for the Schemaless instance to send data to. There is no logic on the server side to keep track of the clients or push state to them.
Schemaless triggers is also fault-tolerant. Any process involved can go down without hurting the system:
- If a client worker process goes down, the leader will distribute the work from the failing process, ensuring that all shards get processed.
- If the leader among the Schemaless triggers nodes goes down, a new node will be elected as leader. During leader election, cells are still processed, but work can’t be redistributed and processes can’t be added or removed.
- If the shared storage (e.g., ZooKeeper) goes down, cells are still processed. However, like during leader election, work can’t be redistributed and processes can’t be changed while shared storage is down.
- Lastly, the Schemaless triggers framework is insulated from failures inside the Schemaless instance. Any database node can go down without problem, since Schemaless triggers can read from any replica.
From an operational side, Schemaless triggers has been a pleasant companion. Schemaless is the ideal storage for source of truth data, as the data can be accessed via the random-access API or via its log-style access API. Using Schemaless triggers on top of the log-style access API decouples the producers and consumers of the data, allowing the programmer to focus on processing and ignore any scaling and fault-tolerance issues. Finally, we can add more storage servers at runtime to increase the data capacity as well as performance as we get more spindles and memory. Today, the Schemaless triggers framework drives the trip processing flow, including ingestion into our analytics warehouse and cross-datacenter replication. We are excited about its prospects for the remainder of 2016 and beyond.
¹ Except for retry events, where the trigger callback is periodically given the old and failed cells.
Earlier parts of this series for reference:
Jakob Holdgaard Thomsen
Jakob Holdgaard Thomsen is a Principal Engineer at Uber, working out of the Aarhus office, helping to make Uber's systems more performant and more reliable.
Posted by Jakob Holdgaard Thomsen
Cinnamon: Using Century Old Tech to Build a Mean Load Shedder
November 22 / Global
Real-Time Analytics for Mobile App Crashes using Apache Pinot
November 2 / Global
Auto insurance maintained by Uber
CheckEnv: Fast Detection of RPC Calls Between Environments Powered by Graphs
Our Journey Adopting SPIFFE/SPIRE at Scale
Selective Column Reduction for DataLake Storage Cost Efficiency