At Uber, data informs every decision. Presto is one of the very core engines that powers all sorts of data analytics at Uber. For example, the operations team makes heavy use of Presto for services such as dashboarding; the Uber Eats and marketing teams rely on the results of these queries to make decisions on prices. In addition, Presto is also used in Uber’s compliance department, growth marketing department, and ad-hoc data analytics.
The scale of Presto at Uber is large. Currently, Presto has 9,000 daily active users, processing 500K queries per day and handling over 50PB of data. Uber’s infrastructure encompasses 2 data centers with 7,000 nodes and 20 Presto clusters across 2 regions.
Uber’s Presto Deployment
- The UI/Client layer: This includes internal dashboarding, Google Data Studio, Tableau, and other tools. In addition, we have some backend services that use JDBC or query parsing to communicate with Presto.
- The routing layer: This layer is responsible for dispatching queries to different Presto clusters. The dispatching is based on stats pulled from each Presto cluster, including the number of queries and tasks, CPU and Memory usage, and so on. We determine which cluster each query should be scheduled to based on such stats. In other words, this layer functions as a load-balancing and query-gating service.
- Presto clusters: At the bottom, multiple Presto clusters communicate with the underlying Hive, HDFS, Pinot, and others. Join operations can be performed between different connectors or different datasets.
In addition, for each layer of the above architecture, we have:
- Internal monitoring
- Support for using Kerberos security
The Presto workloads are divided into 2 categories:
- Interactive: Queries sent by data scientists and engineers
- Scheduled: Mainly Batch queries, which are scheduled and recurring, including Dashboard, ETL, etc.
Using Alluxio for Local Caching
Recently, we deployed Alluxio in our product environment in 3 clusters, each with more than 200 nodes. Alluxio is plugged in as a local library and it leverages the Presto workers’ local NVMe disks. We’re not caching all the data, but a subset of it through selective caching.
Below is a diagram of Alluxio as a local cache. The Alluxio Cache Library is a local cache that runs inside the Presto worker. We have implemented a layer on top of the default HDFS client implementation.
When any external API reads from an HDFS call, the system first looks at the inventory of the cache to see if it is a cache hit or miss. If it’s a cache hit, it will directly read the data from the local SSD. Otherwise, it will read from the remote HDFS and cache the data locally for the next read. In this process, the cache hit rate has a significant impact on the overall performance.
We will discuss the detailed design and improvement of the Alluxio local cache in part 2 of this blog series.
Key Challenges and Solutions
Challenge 1: Real-Time Partition Updates
The first challenge we encountered was the real-time partition updates. At Uber, a lot of tables/partitions are constantly changing because we upsert data constantly into Hudi tables.
The challenge is that the partition ID alone as a caching key is not sufficient. The same partition may have changed in Hive, while Alluxio still caches the outdated version. In this case, partitions in cache are outdated so the users will get outdated results while running a query if the data is served from caching, resulting in an inconsistent experience.
Solution: Add Hive’s Latest Modification Time to Caching Key
Our solution is adding the latest modification time to the caching key as below:
- Previous caching key: hdfs://<path>
- New caching key: hdfs://<path><mod time>
Presto currently has the access to the latest file modification time by making an HDFS API call on Hive split read. Specifically, when processing a split, Presto worker will explicitly call HDFS listDirectory API and as part of the information returned by HDFS, there is the file latest modification. Through this solution, the new partition with the latest modification gets cached, ensuring that users always get a consistent view of their data. Note that there could be a race condition window that after Presto worker gets the latest modification time, the remote files get updated again and Presto worker still misses the latest changes. On the one hand, two consecutive updates within such a small interval is rare; on the other hand, such scenarios are no worse than when there is no caching, but the table directories are changed during query execution. In such cases, even the existing, non-caching execution can lead to inconsistent behavior. Another note is there is a trade-off: outdated partitions still present in cache, wasting caching space until evicted. Currently, we are working to improve the caching eviction strategy.
Challenge 2: Cluster Membership Change
In Presto, Soft Affinity Scheduling is implemented by the simple, mod-based algorithm. This algorithm has the disadvantage that if a node is added or deleted, the whole ring is messed up with a different cache key. Therefore, if a node joins or leaves a cluster, it can hurt the cache efficiency of all the nodes, which is problematic.
Presto locates the same set of nodes for a given partition key for this reason. As a result, we always hit the same set of nodes both for caching and queries. While this is good, the problem is that Presto previously used a simple hash function that can break when the cluster changes.
As shown below, currently, we use a simple hash mod-based node lookup: key 4 % 3 nodes = worker#1. Now node #3 goes down, new lookup: key 4 % 2 nodes = 0, but worker#0 does not have the bytes.
Solution: Node IDbased Consistent Hashing
Consistent hashing is the solution. Instead of the mod-based function, all nodes are placed on a virtual ring. Relative ordering of nodes on the ring doesn’t change, regardless of joining or leaving. Instead of using mode-based hash, we always look up the key on the ring. We can ensure that no matter how many changes are made, they are always based on the same set of nodes. Additionally, we use replication to improve robustness. This is the solution to the cluster membership issue.
Challenge 3: Cache Size Restriction
Uber’s data lake is at large scale, Presto clusters scans 50PB of data per day. However, our local disk space is only 500 GB per node. The amount of data accessed by Presto queries is much larger than the disk space available on Worker nodes. Although it is possible to put everything in the cache, heavy eviction can hurt overall cache performance.
Solution: Cache Filter
The idea is to only cache a selected subset of data, which includes certain tables and a certain number of partitions. The solution is to develop a cache filter, a mechanism that decides whether to cache a table and how many partitions. Below is a sample configuration:
Having a cache filter has greatly increased cache hit rate from ~65% to >90%. The following are the areas to pay attention to when it comes to cache filter:
- Manual, static configuration
- Should be based on traffic pattern, e.g.:
- Most frequently accessed tables
- Most common # of partitions being accessed
- Tables that do not change too frequently
- Ideally, should be based on shadow caching numbers and table-level metrics
We’ve also achieved observability through monitoring/dashboarding, which is integrated with Uber’s internal metrics platform using JMX metrics emitted to a Grafana-based dashboard.
In the sections below we discuss the improvements to the local cache metadata.
File-Level Metadata for Local Cache
First, we want to prevent stale caching. The underlying data files might be changed by the third-party frameworks. Note that this situation might be rare in Hive tables, but very common in Hudi tables.
Second, the daily reads of unduplicated data from HDFS can be large, but we don’t have enough cache space for caching all the data. Therefore, we can introduce scoped quota management by setting a quota for each table.
Third, metadata should be recoverable after server restart. We have stored metadata in the local cache in memory instead of disk, which makes it impossible to recover metadata when the server is down and restarted.
Therefore, we propose the file-level metadata, which holds and keeps the last modified time and the scope of each data file we cached. The file-level metadata store should be persistent on disk so the data will not disappear after restarting.
With the introduction of file-level metadata, there will be multiple versions of the data. A new timestamp is generated when the data is updated, corresponding to a new version. A new folder storing the new page is created corresponding to this new timestamp. At the same time, we will try to remove the old timestamp.
Cache Data and Metadata Structure
As shown above, we have two folders corresponding to two timestamps: timestamp1 and timestamp2. Usually, when the system is running, there will not be 2 timestamps simultaneously because we will delete the old timestamp1 and keep only timestamp2. However, in the case of a busy server or high concurrency, we may not be able to remove the timestamp on time, in which case we may have 2 timestamps at the same time. In addition, we maintain a metadata file that holds the file information in protobuf format and the latest timestamp. This ensures that Alluxio’s local cache only reads data from the latest timestamp. When the server restarts, the timestamp information is read from the metadata file so that the quota and last modified time can be managed correctly.
Since Alluxio is a generic caching solution, it still needs the compute engine, like Presto, to pass the metadata to Alluxio. For this, we use the HiveFileContext on the Presto side. For each data file from the Hive table or Hudi table, Presto creates a HiveFileContext. Alluxio makes use of this information when opening a Presto file.
When calling openFile, Alluxio creates a new instance of PrestoCacheContext, which holds the HiveFileContext and has the scope (4 levels: database, schema, table, partition), quota, cache identifier (i.e., the MD5 value of the file path), and other information. We will pass this cache context to the local file system. Alluxio can thus manage metadata and collect metrics.
Per-Query Metrics Aggregation on Presto Side
In addition to passing data from Presto to Alluxio, we can also call back to Presto. When performing query operations, we will know some internal metrics, such as how many bytes of data read hit the cache and how many bytes of data were read from external HDFS storage.
As shown below, we pass the HiveFileContext containing the PrestoCacheContext to the local cache file system (LocalCacheFileSystem), after which the local cache file system calls back (IncremetCounter) to the CacheContext. This callback chain will continue to the HiveFileContext, and then to RuntimeStats.
In Presto, RuntimeStats is used to collect metrics information when executing queries so that we can perform aggregation operations. After that, we can see the information about the local cache file system in Presto’s UI or the JSON file. We can make Alluxio and Presto work closely together with the above process. On the Presto side, we have better statistics; on the Alluxio side, we have a clearer picture of the metadata.
First, we would like to onboard more tables and improve the process of table onboarding with automation, in which Alluxio Shadow Cache (SC) will be helpful. Second, we want to have better support for the changing partitions/Hudi tables. Lastly, load balancing is another optimization we can implement. There’s still a long way to go along our journey.
As compute-storage separation continues to be the trend along with containerization in big data, we believe a unified layer that bridges the compute and storage like Alluxio will continue to play a key role.
Because the callback process described above makes the CacheContext’s lifecycle grow considerably, we have encountered some problems with rising GC latency, which we are working to address.
Adopt Semantic Cache (SC)
We will implement Semantic Cache (SC) based on the file-level metadata we propose. For example, we can save the data structures in Parquet or ORC files, such as footer, index, etc.
More Efficient Deserialization
To achieve more efficient deserialization, we will use flatbuf instead of the protobuf. Although protobuf is used in the ORC factory to store metadata, we found that the ORC’s metadata brings more than 20-30% of the total CPU usage in Alluxio’s collaboration with Facebook. Therefore, we are planning to replace the existing protobuf with a flatbuf to store cache and metadata, which is expected to improve the performance of deserialization significantly.
In this article, we discussed the design and implementation of Uber’s Presto caching solution, to improve interactive query performance across various use cases at Uber. We shared the journey of Presto’s adoption of Alluxio local disk caching at Uber, discussed how we tailored and extended existing solutions to address challenges we encountered that are specific to Uber’s scale and use cases. The solution has been running in production for over a quarter and with minimum maintenance overhead.
Chen Liang is a Senior Software Engineer on Uber's Interactive Analytics team working with Presto and Alluxio integration. Before joining Uber, Chen was a Staff Software Engineer on LinkedIn's Big Data Platform team. Chen is also a committer and PMC member of Apache Hadoop. Chen holds master degrees from Duke University and Brown University.
Dr. Beinan Wang is a Software Engineer from Alluxio and is a committer of PrestoDB. Before Alluxio, he was the Tech Lead of the Presto team at Twitter, and he built large-scale distributed SQL systems for Twitter’s data platform. He has twelve-year experience working on performance optimization, distributed caching, and volume data processing. He received his Ph.D. in Computer Engineering from Syracuse University on symbolic model checking and runtime verification of distributed systems.
How Uber Serves Over 40 Million Reads Per Second from Online Storage Using an Integrated Cache
February 15 / Global
DataCentral: Uber’s Big Data Observability and Chargeback Platform
February 1 / Global
Jupiter: Config Driven Adtech Batch Ingestion Platform
Public transport agencies trial mixed fleets to implement local on-demand transport
How Uber Serves Over 40 Million Reads Per Second from Online Storage Using an Integrated Cache
Building Scalable, Real-Time Chat to Improve Customer Experience