Uber’s services rely on the accuracy of our event prediction and forecasting tools. From estimating rider demand on a given date to predicting when an UberEATS order will arrive, Uber uses forecasting algorithms to enhance user experiences (UX) across our product portfolio.
To architect a precise and easily interpretable forecasting experience for engineering and operations, we built a custom prediction system by leveraging an open source distributed RESTful search engine composed of query engine Elasticsearch, data indexing pipeline Logstash, and visualization tool Kibana (ELK). Simple but powerful, our resulting architecture is easily scalable and works in real time.
In this article we discuss why and how we built this system and share best practices and lessons learned from working with Elasticsearch.
Designing a custom prediction system
The quality of our predictions are measured by how closely they match final results of trip features, like uberPOOL match rate, match quality, etc.
We needed to design a highly customizable architecture so that our prediction system could sync with our existing products because:
- Some trip features only apply to uberPOOL, for example, match rate and match quality.
- Specific characteristics of our products—uberPOOL, uberX, UberBLACK, etc.—often impact trip features. For instance, dynamic pickup and dropoff is a feature that exists in uberPOOL, but not uberX or UberBLACK, and may impact the overall equation.
- Rider and driver behavior, such as going a different route than that suggested by Uber, can affect predictions.
To fulfill the specifications above, we defined the product requirements of our new prediction system as:
- Being product-aware. Accurately distinguishing which product the rider was using and altering our predictions if necessary is critical for a seamless UX.
- Ensuring a low mean error per product. Each product’s average prediction error needs to be as close to zero as possible to maintain the overall accuracy of our predictions.
- Maintaining a low mean absolute error per product. Moreover, each product’s average absolute prediction error needs to be as close to zero as possible to ensure the individual time or location accuracy.
In addition to meeting product requirements, we also needed to abide by specific engineering standards. High availability, low latency, scalability, and operations friendliness is crucial to the success of our prediction system, since the model would service millions of users across hundreds of cities and dozens of countries. After setting these guidelines, we laid out the blueprints for algorithms that could meet these benchmarks, assessing both online and offline designs.
Architecting an online algorithm
At Uber, we often use a combination of historical and real-time trip data to train our trip prediction algorithms. While we can predict some patterns like trip density and match rate depending on location, date, time, and other set variables, these patterns adjust as our systems advance and operations expand to new markets. As a result, capturing system dynamics in real time or near-real time is critical to prediction accuracy.
In our effort to maximize precision, we decided to use an online algorithm that does not require much training and model maintenance work. We chose k-nearest neighbours algorithm (KNN), which finds k nearest neighbors (meaning, similar historic trips over a period of time) and then performs a regression on them to create a prediction. This two-step algorithm works by:
- Choosing k candidates based on our self-defined similarity function derived from features like: geolocations, time, etc.
- Calculating the weights for each selected candidate based on the similarity function and the weighted average for each response variable as output.
While the second part of this algorithm is mostly local computation and presents little engineering challenge, the first part of this algorithm is more challenging. Essentially a large-scale search problem, this step requires a relatively complex similarity function to sort candidates and select the top-ranked K as input for the second part of the algorithm. Our goal here is to group the most similar historical trips out of a huge number of trips. It is very difficult to sort all of the trips using the similarity function in a search engine because of the diversity and quantity of data. Instead, we achieve this in two steps:
- Reduce the search space by applying high-level filtering logic, for example, filtering data by city or product ID.
- Perform similarity-based ranking on the reduced data set, and select top-ranked K out of it.
Although powerful, KNN is a challenging algorithm to use when dealing with large-scale data. To use it effectively, we needed a robust store and search engine able to deal with thousands of queries per second (QPS) and hundreds of million records at a time. We also needed geospatial query support to assist with filtering k candidates.
Using ELK as a data store and search engine
While we briefly looked at other databases that support geospatial query like MySQL, our schema flexibility demands and search requirements made it an easy decision to use an open source ELK solution to power our data store and search engine.
ELK, built with Elasticsearch, Logstash and Kibana, is an integrated solution for searching and analyzing data in real time. Elasticsearch, the centerpiece of the solution, is a search engine built on top of Apache Lucene. It provides distributed and full-text search with a RESTful interface and schema-free JSON documents. Logstash is a data-collection and log-parsing engine, and Kibana is a data visualization and analytics plugin for Elasticsearch.
ELK was an easy choice for us because it offers full-text search, geospatial query, schema flexibility, and an easy-to-assemble data pipeline. (ELK can load data in near real time by ingesting Kafka topics.) ELK provided the flexibility necessary to meet our engineering guidelines and deliver an accurate, quick prediction system for our riders.
At a basic level, our prediction architecture was built using Kafka for data streaming, Hive as a data warehouse for managing queries and analytics, and four separate in-house services that use the ELK stack to create a robust prediction system. Below, we outline these four services and depict the overall architecture:
- Prediction service: Serves predictions in real-time to users
- Training service: Trains parameters and data pipelines offline
- Trip service: Manages the state of trips through their lifecycle, from request to completion
- Configuration service: Stores and serves trained parameters for other services
Our datastore is comprised of completed trip data, metrics such as trip time, distance, and cost. To work quickly and efficiently, our pipeline must be able to ingest and serve this trip data in near real time. Our solution is to publish data to a Kafka topic upon the end of a trip. From there, Logstash ingests the Kafka topic, transforming and indexing the data into Elasticsearch. The data pipeline is setup for immediate data query, guaranteeing optimal performance by using the most recent trips. We also need a primary store for storage and analytics; we leverage Hive as an extract, transform, and load (ETL) tool to manage data processed by Kafka. With these two data pipelines, we use ELK for real-time query and Hive for training and analytics.
KNN regression is an online algorithm, which means we do not need to train models for it. But the values for k and regression parameters still need to be trained and selected accordingly. We built a training service to select values for K and regression parameters. This service can train these parameters at different granularities, as well as output global default values, per-country values, per-city values, and per-product values. Once these parameters are trained, the training service pushes the values into our dynamic configuration service.
From there, the configuration service serves actionable parameters for our prediction service. The trip service then calls our predicted metrics from our prediction service for the upcoming trip. In this step, our prediction service queries Elasticsearch for candidates and the configuration service for the parameters, feeding them into the algorithm to generate predictions. Prediction results are returned to our trip service and stored as trip data, later to be published to Elasticsearch and Hive so we can track the performance of our predictions to train future parameters and improve accuracy.
Growing at Uber-scale
Our prediction system was optimized for horizontal scalability across markets and products, so it was relatively easy to implement and maintain at first. However, as we expanded our scale to incorporate more products in more cities, we had to consider high latency caused by a garbage collection pause and cascading failure induced by CPU overusage. Both of these issues are caused by overloading Elasticsearch nodes, so we focused our efforts on improving search performance and designing a more scalable architecture.
Reducing query size to improve search performance
Data size matters in Elasticsearch; a large index means too much data, caching churn, and poor response time. When query volumes are large, ELK nodes become overloaded, causing long garbage collection pauses or even system outages.
To address this, we switched to hexagonal queries, dividing our maps into hexagonal cells. Each hexagonal cell has a string ID determined by the hexagon resolution level. A geodistance query can be roughly translated to a ring of hexagon IDs; although a hexagonal ring is not a circular surface, it is close enough for our use case. Due to this adjustment, our system’s query capacity more than tripled.
Horizontally scaling with virtual clusters
As the system expands to new markets, we need to add more nodes to our clusters. A common method of scaling Elasticsearch is to add more nodes horizontally. To do so, we organize trip data into daily indexes, with each index possessing a single shard replicated to all data nodes in the cluster. A HAProxy is deployed to balance search traffic to all data nodes in the cluster. In this model, the system is linearly scalable relative to the number of cluster nodes.
As nodes are added, however, the cluster destabilizes. While a single cluster architecture can handle around three thousand queries per second, our system totals 40 nodes (and counting) at a size of 500GB per node. At this scope, the cost of cluster inter-node communication starts to outweigh the benefit of using a large cluster.
To overcome this limitation, we developed a virtual cluster. An uncommon architectural choice, virtual clusters consist of multiple physical clusters sharing the same cluster alias. Applications access the virtual cluster through HAProxy by specifying the cluster alias and requests can be routed to any of the clusters with the cluster alias. With this architecture, we achieve virtually unlimited scalability by just adding more clusters.
Lessons learned: Elasticsearch as a NoSQL database
We use Elasticsearch as a secondary store–pushing data to it asynchronously–because it is not designed for consistency. To effectively utilize its valuable search features, we needed to operate Elasticsearch carefully. Below, we outline some lessons learned working with this powerful but complex database:
- Provision enough resources. Elasticsearch works like a charm when powered by ample resources. It is designed for speed and powerful features with the assumption that hardware and staff are abundant. However, it falters when resources are constrained. To make the most of Elasticsearch, you need to understand the scale you are dealing with and provision enough resources .
- Organize data according to your business logic. When the size of your data is too big to fit into one node, you need to organize your data intelligently.
- Order Elasticsearch queries for efficiency gains. Elasticsearch supports a lot of filters in its query, and their order greatly affects performance. More specific filters should be prioritized and placed before less specific filters so that they can filter out as much data as possible early as possible into the querying process. Less resource-intensive filters should be implemented before more resource-intensive filters so that they have less data to filter. Similarly, cacheable filters should go before non-cacheable filters so it can best leverage caches.
Uber’s new real-time prediction system has now been rolled out across 400 cities worldwide. There is plenty more to do, so if architecting prediction algorithms for systems at Uber-scale sounds interesting to you, consider joining our growing herd on the Shared Rides or Infrastructure teams.
Our journey with prediction and ELK has only just begun—are you game?
Guocheng Xie is a software engineer on Uber’s Shared Rides team. Yanjun Huang is an engineer on Uber’s Infrastructure team.
Update: Since Uber first began using ELK, the ELK stack has expanded to include additional technologies and is now referred to as the Elastic Stack.
Guocheng Xie is a senior software engineer on Uber's Marketplace Data and Simulation team.
Yanjun Huang was a senior software engineer on Uber's Core Infrastructure team and is an Elasticsearch Expert.
Posted by Guocheng Xie, Yanjun Huang
Cinnamon: Using Century Old Tech to Build a Mean Load Shedder
November 22 / Global
Real-Time Analytics for Mobile App Crashes using Apache Pinot
November 2 / Global
Auto insurance maintained by Uber
CheckEnv: Fast Detection of RPC Calls Between Environments Powered by Graphs
Our Journey Adopting SPIFFE/SPIRE at Scale
Selective Column Reduction for DataLake Storage Cost Efficiency