Start ordering with Uber Eats

Order now
Backend, Engineering

Cadence Multi-Tenant Task Processing

December 16, 2021 / Global

Introduction

Cadence is a multi-tenant orchestration framework that helps developers at Uber to write fault-tolerant, long-running applications, also known as workflows. It scales horizontally to handle millions of concurrent executions from various customers. It is currently used by hundreds of different teams within Uber to support their business logic. As we onboard different use cases to Cadence, resource isolation became a pressing issue. Bursty traffic or workflow tasks from one customer usually consumes all the resources in a cluster, causing significant task processing delays for others. This is especially common and undesirable when batch processing jobs were triggered during month-ends, causing delays in human interactive and latency-sensitive workflows. Moreover, the traffic in the clusters may become so heavy that it ultimately overloads our downstream database.

A typical solution to resource isolation is to apply rate limiters to APIs. For example, one may rate limit the speed of starting or signaling workflows, or even the rate at which they make progress by controlling certain worker-related APIs. However, this solution alone is not enough for Cadence. When processing workflows, Cadence generates tasks that need to be processed asynchronously, thus directly rate limiting customer requests will not help throttle the background task processing load. Consider the case where one customer gradually schedules a large number of timers to be fired at the same time: the external requests RPS is quite low, but when all the timers start to fire, the background task processing load will explode.

In order to provide better resource isolation between tenancies, we redesigned the task processing logic in Cadence while taking multi-tenancy into consideration. As a result, we introduced 2 new components: host-level priority task processing and multi-cursor task queue.

 

Background Task Processing in Cadence

Before diving into our new task processing logic, let’s take a step back to understand the basics of Cadence’s previous logic. 

As a general workflow orchestration engine, Cadence moves workflow states from one to another by generating and completing tasks. Such tasks can be timer tasks, signal tasks, activity tasks/child workflow tasks, or tasks for recording workflow metadata in ElasticSearch. Cadence atomically commits the tasks with workflow state transition.

 

Figure 1

 

This is where the background task processing logic comes in. Every workflow, along with its async tasks, will be mapped to a shard (a partition for Cadence workflows), and each shard has its own workers for processing workflow tasks in the background. To be more specific, each shard has a queue processor responsible for loading tasks from the task queue in the database, and sending them to a shard-level worker pool for processing. The queue processor also tracks the state of each task, and deletes completed tasks from the database when it’s safe to do so. 

 

Figure 2

 

The queue processor also maintains a set of cursors to track the current state of processing. The 2 most important cursors are ACK-level cursor and read-level cursor. ACK level traces the progress of task processing. It draws a line between tasks that have been successfully processed (which will be periodically deleted) and those that have not. It’s also the start point of task processing when a shard is loaded, since we know all tasks before this point have already been successfully processed. Read level traces the task loading progress in each task queue, marking a point in the queue beyond which the tasks are yet to be loaded. 

 

Figure 3

 

Although this task processing logic works well most of time, it has 2 major issues:

  1. There is usually a large number of shards in the production environment to achieve scalability. Since each shard has its own worker pool, the total number of workers is enormous. Even if we make the worker pool in each shard relatively small, collectively it is still a huge figure. In fact, it is not practical to use small worker pools, as it might limit processing speed when one workflow is very busy. When the system load is low, most of the workers sit idle; thus a lot of resources are wasted. When the system load is high, the workers become too active and overload our downstream database.
  2. Workflow tasks are loaded and processed in the order they are generated, without considering tenancy. A burst of tasks from one customer will block task processing for other customers and cause delay in their workflows.

 

Host-Level Priority Task Processing

As its name suggests, the host-level priority task processing logic attempts to address the above issue with two new concepts–“host level” and “priority.” 

The idea behind “host level” is to change the worker pool from a shard-level component to one that is shared across shards on a host, so that tasks from all the shards on a host will be processed by a single, large worker pool. The shared pool coordinates and processes tasks in a more efficient and flexible way, allowing us to process the same amount of tasks with a much smaller worker pool. Moreover, by limiting the number of workers, the database is less likely to be overloaded even when all the workers are busy.

Now that the first issue has been solved by limiting the total amount of resources, the next question is how to distribute the limited resources more fairly, so that customers with higher loads will not block others. Remember that the workflow tasks are loaded and processed in the order that they are generated. So we assign priorities to each task before processing. Generally, a task will get low priority if it is from a customer that has already generated too many tasks. In order to determine the number of tasks from each customer, we first send all the tasks to go through the Priority Assigner, which is essentially a per-domain rate limiter. To put it simply, if there is no available token in a certain domain’s rate limiter, then that domain will be deemed as having generated too many tasks, and later tasks from this particular domain will get low priority.

The worker pool will process tasks with different priority levels based on a weighted round-robin policy. This policy ensures that: 

  1. High-priority tasks will be processed more frequently
  2. Low-priority tasks will also get a chance to be processed without waiting for too long
  3. There will be no delays for low-priority tasks to be processed when there are no high-priority tasks

 

Figure 4

 

In the new design, we also improved the retry logic in task processing. Originally, if a workflow task fails to be processed, it would be retried repeatedly and stuck in the worker pool. When such stuck tasks accumulate, the entire shard will be blocked. We now send a task back to its owning shard if it fails consecutively after a specified number of retry attempts. 

To summarize, with the host-level priority task processing, we manage to: 

  1. Greatly reduce the worker pool size and prevent databases from being overloaded
  2. Provide resource isolation among customers. 

Both aforementioned problems are therefore solved. Yet this is not a perfect solution. When we throttle low-priority tasks, we are buffering them in memory. And we have to keep loading new tasks from the database, otherwise we are unable to know if the next task should be high priority or not. When more and more tasks are being buffered, eventually our system will run out of memory. 

Another issue from buffering a large number of tasks is that the ACK level moves much slower. This is because the ACK level will stop at the first buffered task that is not yet processed, regardless of the state of the tasks afterwards. Since tasks have been assigned different priorities, it is possible that a task which comes later in the queue gets a higher priority and has already been processed. Remember that when a shard reloads, task processing will start from the persisted ACK level. So in a case where the ACK level gets stuck at a certain buffered task for a long time, resources will be wasted re-processing a large number of tasks, even though correctness is guaranteed by the idempotent task processing logic. 

 

Multi-Queue/Cursor Processing

We introduced the multi-queue/cursor processing logic to address the above issues in the host-level task processing—buffered tasks and stuck ACK level. 

The basic idea of multi-queue/cursor processing logic is to create multiple virtual queues which are responsible for a range of potentially low-priority tasks, so as to minimize their impact on the original queue. A separated virtual queue maintains its own set of cursors (read level, ACK level, etc.) and controls its own pace of loading and processing tasks. Specifically, the separate queue controls its read level to stop at a certain point and prevent loading more potential low-priority tasks. Meanwhile, when the original queue encounters a low priority task that falls into the responsibility of the new queue, it simply ignores the task and prevents itself from buffering too many low-priority tasks. The new queue also maintains its own ACK-level cursor. Although the ACK level in the new queue might still get stuck due to unprocessed low-priority tasks, it will not affect the original queue, which now contains generally high-priority tasks, thus significantly reducing the chance of reloading and reprocessing tasks upon shard reload.  

We call the process of creating a new queue “splitting.” When a queue is split, tasks in the original queue are divided into 2 (or more) disjoint groups based on the taskID and the splitting attributes, and are put in separate queues. Note that the splitting attribute is designed to be general (domain, workflow ID, task type, etc.) as long as after the split, tasks in new queues are mutually disjoint and add up to cover all tasks in the original queue. In our implementation, we use domain name as our splitting attribute as it best serves our vision of a multi-tenancy platform.

 

Split Policy

Since we have a splitting process, now comes the more interesting question of when to create such separate queues, and what tasks should we put in them. We introduced a split policy to govern such decisions.

Periodically, the split policy will check the number of buffering tasks from each domain in a queue. If the number reaches a threshold, that domain will be split out and a new queue will be created for it. The taskID range assigned to this new queue is determined by [original queue’s ACK level, original queue’s read level + look-ahead]. Using the original queue’s ACK and read level ensures all buffered tasks from that domain will be moved to the new queue. The look-ahead part is used to throttle future task loading from that domain to deal with any memory pressure and prevent buffering too many low priority tasks. Its value can be configured per domain based on experience and the customer’s use case.

In the graph below, different colors represent tasks from different customers. When trying to split the orange domain out, without the look-ahead, the read cursors in Q1 and Q2 become meaningless, because they are equal to the maximum taskID assigned to the original Q (“Max Level” in the graph) and there is no further loading required. For Q3, its loading will start immediately and whenever it encounters a task from the orange domain, it will buffer the task in memory, which defeats our goal of reducing buffered tasks. In contrast, by adding the look-ahead range, whenever there is pressure on memory, we can stop the loading in Q2 (orange domain) until the look-ahead maximum is reached. 

 

Figure 5

 

We abstract this logic into a split policy interface and implement it for various scenarios, including cases where a domain buffers lots of tasks in memory, where tasks get stuck due to invalid workflow state, and where potential new features are involved (e.g., suspending a domain processing). We also implement a random split policy to test the queue split code path more easily.

 

Leveled Queue Splitting 

Theoretically, nothing prevents us from splitting queues until we have one virtual queue for each split attribute (user domain in this case). However, it is not efficient to have too many such queues. We can only filter tasks based on domain after loading them from the database, so more queues means more task loading efforts and more DB IO operations. Because of this we decided to limit the total number of processing queues.

To achieve this goal, we introduce a queue level concept to help organize all virtual queues. The basic idea is that each virtual queue belongs to a specific queue level. Say that the original queue belongs to Level 0. When a split happens, the new queue will go down to the next level, Level 1. When we split and create more new queues from Level 0, if an existing queue in Level 1 has an overlapping task ID range with the new queue, these two queues will be merged to ensure there is only one virtual queue in each level for a certain taskID range. In addition, we also cap the max queue level by disabling split beyond a certain level. Combining these two, we are able to control the max number of times tasks are loaded for a certain taskID range.

Continuing with the previous example, if after splitting the orange domain out, we decide to split the blue domain out as well, instead of creating a separate queue only for the blue domain, it will be merged with the existing queue on Level 1, ensuring that tasks in that range are loaded only twice. 

Figure 6

 

Other Challenges 

Rolling out the new task processing feature to production is challenging. One of the major issues is that the priority task processing logic has a couple of critical knobs to tune, such as the task RPS threshold (beyond which tasks will be assigned low priority), task channel size, and weight in the round robin policy. And we didn’t have enough data to help us pick the optimal value at the beginning. Moreover, such config values depend on use cases and load patterns. We gradually gained insights by observing production traffic and tested various sets of values through trials and errors. We finally found the config values that worked pretty well for most of our use cases.

Deciding the values for the multi-cursor queue is even more challenging because it’s much harder to get signal from the production cluster, as most of the burst loads are handled by the priority task processing logic gracefully and the multi-cursor queue is seldomly used. We shadowed production traffic and did a couple of controlled experiments in our testing environment to gain insights for tuning the values.

Another challenge is to ensure that all tasks are processed quickly, as the total number of workers in the host-level worker pool is limited. Even if the task can’t be processed for some reason, it should fail fast instead of blocking others. Although this is generally true, in rare circumstances, some tasks might still experience high latency. To minimize the risk, we made great efforts to hunt each case down and mitigate it to a degree that won’t occupy all worker resources or block other tasks.

 

Result

With the host-level priority task processor, we successfully reduced the number of worker Goroutines on each Cadence history host from 16K to 100 while handling the same amount of load, a reduction of more than 95% that ensures the database won’t get overloaded during peaks. It also solves all bursty load situations in our production cluster by being able to throttle any bursty domains while ensuring a low processing latency for the rest.

Figure 7

 

In terms of multi-cursor queue, we tested the case where 3 million timers are scheduled to fire within 10 minutes (5K timers per second), which is way higher than the load our testing cluster can support. As shown in the following figure, the multi-cursor queue logic is able to spread the entire load over around 1.6 hours, gradually process them, and ensure low latency for new tasks from other domains.

Figure 8

 

Conclusion

Resource isolation and multi-tenancy have become pressing needs as Cadence at Uber onboards more and more customers. Prior to this re-architecture effort, Cadence’s design and implementation focused little on that, and customers used to experience high workflow latency when any spiky load was running. Now with the multi-tenant task processing logic, we are able to build adequate resource isolation by properly distributing the limited resources across all customers (host-level priority task processing) and isolating individual Cadence domains in extreme cases (multi-queue/cursor processing). We have also optimized the implementation to avoid head-of-line blocking and overloading the downstream database. Together these upgrades have made our production cluster more stable and improved our customers’ experience.