Uber’s goal is to ignite opportunity by setting the world in motion, and big data is a very important part of that. Presto® and Apache Kafka® play critical roles in Uber’s big data stack. Presto is the de facto standard for query federation that has been used for interactive queries, near-real-time data analysis, and large-scale data analysis. Kafka is the backbone for data streaming that supports many use cases such as pub/sub, streaming processing, etc. In the following article we will discuss how we have connected these two important services together to enable a lightweight, interactive SQL query directly over Kafka via Presto at Uber scale.
Presto at Uber
Uber uses open source Presto to query nearly every data source, both in motion and at rest. Presto’s versatility empowers us to make smart, data-driven business decisions. We operate around 15 Presto clusters spanning more than 5,000 nodes. We have around 7,000 weekly active users running approximately 500,000 queries daily, which read around 50 PB from HDFS. Today, Presto is used to query a variety of data sources like Apache Hive™, Apache Pinot™, AresDb, MySQL, Elasticsearch, and Apache Kafka, through its extensible data source connectors. You can also find more information about Presto in some of our previous blogs:
Apache Kafka at Uber
Uber has one of the largest deployments of Apache Kafka, processing trillions of messages and multiple petabytes daily. As Figure 2 shows, today we position Apache Kafka as a cornerstone of our technology stack. It empowers a large number of different workflows, including pub-sub message bus for passing event data from the Rider and Driver apps, streaming analytics (e.g., Apache Flink®), streaming database changelogs to the downstream subscribers, and ingesting all sorts of data into Uber’s Apache Hadoop® data lake. Plenty of interesting work has gone into making sure it’s performant, reliable, and user-friendly. Such as:
Over the years, Data teams at Uber have seen increasing needs on analytics over the Kafka streams, as timely, ad-hoc data analytics over real-time data provide data scientists, engineers, and operations teams valuable information.
Here is an example of a typical request for the Kafka team: Operation teams are investigating why several messages are not processed by a critical service and this would have a direct end-customer impact. Then Operation teams collected several UUIDs of the reported issues and requested to check if they exist in the input/output Kafka streams of the service. As Figure 3 depicts, the request can be formulated as the query: “Is the order with UUID X missing in the Kafka topic T.”
Such a problem is usually solved by real-time analytics in Big Data. Among the various technologies available in this domain, we focused on 2 categories of open-source solutions, namely: Streaming Processing and real-time OLAP Datastore.
The stream processing engines such as Apache Flink, Apache Storm™, or ksql process the stream continuously and either output a processed stream or incrementally maintain an updatable view. This streaming processing is not a good fit for the aforementioned problem, because users want to perform point lookup or run analytical queries over the events in the past.
The real-time OLAP Datastores such as Apache Pinot, Apache Druid™, and Clickhouse®, on the other hand, are better suited. These OLAP stores are equipped with advanced indexing techniques and therefore they are able to index the Kafka streams to serve low-latency queries. In fact, Uber adopted Apache Pinot several years ago and today Pinot is a key technology inside Uber Data Platform to power multiple mission-critical real-time analytics applications. You can read our previous blog on the use of Pinot at Uber.
However, real-time OLAP requires a non-trivial onboarding process to create a table that ingests from the Kafka stream and tune the table for optimal performance. In addition, a real-time OLAP store also takes storage and compute resources for serving, so this solution is recommended for use cases that query the table repeatedly and demand lower latency (such as user-facing applications), but not ad-hoc troubleshooting or exploration.
As a result, this problem motivated the Kafka and Presto teams to jointly explore a lightweight solution, with the following considerations in mind:
- It reuses the existing Presto deployment, a mature technology that has been already battle-tested at Uber for years
- It does not require any onboarding—a Kafka topic can be discovered and is available to be queried immediately after creation
- Presto is known for its powerful query federation capabilities across multiple data sources, so it allows the correlation between Kafka and other data sources like Hive/MySQL/Redis to derive insights across data platforms
However, this Presto approach also has its limitations. For example, it is less performant than real-time OLAP stores, as the Kafka connector does not have an index built and therefore has to scan the Kafka stream in a range of offsets. In addition, there are other challenges to be addressed on the connector to meet the scalability requirements at Uber, which we elaborate on in the next section.
Challenges at Uber
Presto already has a Kafka connector that supports querying Kafka over Presto. However, the solution doesn’t fully fit into the large-scale Kafka architecture that we have at Uber. There are several challenges:
- Kafka Topic and Cluster Discovery: At Uber, where we provide Kafka as a service, users can onboard new topics to Kafka through the self-service portal at any time. Therefore, we need Kafka topic discovery to be dynamic. However, the Kafka topic and cluster discovery in the current Presto Kafka connector are static, which requires a connector restart every time we onboard a new topic.
- Data Schema Discovery: Similar to Kafka topic and cluster discovery, we provide schema registry as a service and support user self-service onboarding. Therefore, we need the Presto-Kafka connector to be able to retrieve the latest schema on demand.
- Query Restriction: Limiting the number of data each query can consume from Kafka is important to us. There are many large Kafka topics at Uber where the byte rate can go up to 500 M/s. As we know that the Presto-Kafka query is relatively slow compared to other alternatives, a query that pulls a large amount of data from Kafka will take a long time to finish. This is not good for user experience, as well as for the health of Kafka clusters.
- Quota Control: As a distributed query engine Presto could consume messages from Kafka concurrently at very high throughput, which might lead to potential cluster degradation for Kafka clusters. Limiting the max Presto consumption throughput is critical for the stability of Kafka Clusters.
Uber’s data ecosystem provides a way for users to write an SQL query and submit it to the Presto cluster for execution. Each Presto cluster has one coordinator node, which is responsible for parsing SQL statements, planning queries, and scheduling tasks for worker nodes to execute. The Kafka connector inside Presto allows the use of Kafka topics as tables where each message in a topic is represented as a row in Presto. On receiving the query, the coordinator determines if the query has appropriate filters. Once validation completes, Kafka connector fetches the cluster and topic information from Kafka Cluster Management service. Then it fetches schema from the schema service. Then Presto workers talk to the Kafka cluster in parallel to fetch the required Kafka message. We also have a broker quota on the Kafka cluster for Presto users, which prevents cluster degradation.
The following sections dive deep into the improvements we made to overcome the limitations on the existing Presto Kafka Connector and enable it for large-scale use cases.
Kafka Cluster/Topic and Data Schema Discovery
We made changes to enable on-demand Cluster/Topic and Schema Discovery. First, Kafka topic metadata and data schema are fetched through KafkaMetadata at runtime, we extract TableDescriptionSupplier interface to supply those metadata, then we extend the interface and implement a new strategy that reads Kafka topic metadata from the in-house Kafka cluster management service and schema registry at runtime. Similarly, we refactor the KafkaClusterMetadataSupplier and implement a new strategy that reads cluster metadata at runtime. Since the cluster metadata are fetched on-demand, we are able to support multiple Kafka clusters in a single Kafka connector too. A cache layer for all these metadata is added to reduce the number of requests that hit the Kafka cluster management schema services.
In order to improve the reliability of Kafka and Presto clusters, we wanted to avoid large queries reading too much data. To achieve this, we added column filter enforcement that checks for the presence of either _timestamp or _partition_offset in the filter constraints of Presto queries for Kafka. Queries without these filters would be rejected.
Quota Control on Kafka Cluster
Kafka is an important piece of infrastructure for Uber with many real-time use cases, and the Kafka cluster’s degradation could have a huge impact, and so we want to avoid it at any cost. As a distributed query engine Presto could spin up hundreds of consumer threads to fetch messages from Kafka concurrently. This kind of consumption pattern might exhaust network resources and lead to potential Kafka Cluster degradation, which we want to prevent.
One thing we could do is to limit the consumption rate from the Presto Cluster level, but it’s not very easy to achieve from the Presto side. As an alternative, we decided to leverage Kafka’s broker quota to achieve our goal. We made a change that allows us to specify a Kafka consumer client ID from the connector configuration. With this change, we are able to use a static Kafka client ID for all the workers in Presto, and they would be subject to the same quota pool.
Of course, this approach comes with a price: Multiple presto queries concurrently will take a longer time to finish. This is the sacrifice we have to make. In reality, since we have the query filter, most queries are able to finish within a reasonable amount of time.
We have seen great productivity improvement on doing ad-hoc exploration after rolling out the feature. Before it, engineers would take tens of minutes or even longer to look up data for the example we mentioned above, but now we can write a simple SQL query SELECT * FROM kafka.cluster.order WHERE uuid= ‘0e43a4-5213-11ec’ and the result can be returned within a few seconds.
As of writing this blog post, more and more users are adopting Presto on Kafka for ad-hoc exploration. There are 6,000 daily queries and we also get good feedback from Presto users, who say that Presto on Kafka made their data analysis much easier.
Moving forward, we plan to contribute the improvements we made back into the open-source community. You can also check out our PrestoCon talk for more details on the work we have done.
If you are interested in building large-scale data infrastructure at Uber, consider applying for a role with us. We are hiring!
MySQL is a registered trademarks of Oracle and/or its affiliates
ClickHouse is a registered trademark of ClickHouse, Inc. https://clickhouse.com.
Apache®, Apache Kafka®, and Kafka®, Apache Flink®, Apache Hadoop®, Apache Storm™, Apache Pinot, Apache Druid™ are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.
Yang Yang is a Staff Software Engineer on Uber’s Streaming Data Team. She works on building a highly scalable, reliable Kafka ecosystem at Uber, including uReplicator, Kafka Consumer Proxy, and other internal tooling.
Yupeng Fu is a Principal Software Engineer on Uber’s Data team. He leads several streaming teams building scalable, reliable, and performant streaming solutions. Yupeng is an Apache Pinot committer.
Hitarth Trivedi is a Senior Software Engineer on Uber’s Data Analytics team. Hitarth primarily works on Presto.
Selective Column Reduction for DataLake Storage Cost Efficiency
September 20 / Global
CheckEnv: Fast Detection of RPC Calls Between Environments Powered by Graphs
September 13 / Global
Fast Copy-On-Write within Apache Parquet for Data Lakehouse ACID Upserts
June 29 / Global