Introduction
Uber operates on a massive scale, facilitating over 2.2 billion trips every quarter. Deriving even simple insights necessitates a scaled solution. In our case, we needed to count the number of jobs someone had participated in while on the Uber platform, for arbitrary time windows. This article focuses on the challenges faced and lessons learned as we integrated Apache Pinot™ into our solution.
Background
Specifically our solution needed to resolve:
- Several permutations of job counts, broken down by role, marketplace, and completeness axes
- Point-in-time tenure at a given trip, or a given timestamp (i.e., where does job X lie on person A’s job history, chronologically?)
Our previous solution was simple: retrieve jobs for a given subject with a page size limit of 50, and paginate the result until there are no further jobs. In Uber’s early days, with no single account accruing comparatively much tenure, this worked well. However, Uber ventured into new verticals, and some accounts began to present tenure in the tens of thousands, it became clear that we needed a more robust solution.
Trip-specific tenures are a narrow use case
A main product requirement was that this solution must be able to compute tenure lookback. This by itself would have been tenable, but accompanied by our data retention policy, it was deemed unreasonable to accommodate by our downstream team.
Access rescission of data older than 2 years
The same team, in the interest of cost savings, determined that data older than 2 years would be sequestered into a higher latency storage tier. However, a change of plans mid-project resulted in them dropping online access to this data altogether.
Reduced complexity
Our solution at the time required stitching three data sources for each of Uber’s marketplaces: Rides and Eats.
Similar use cases in the broader team
Immediately after presenting our design to the wider team, we found teams across Uber having adjacent projects with similar requirements, each reimplementing their own solution to count job tenure independently.
Taking these constraints into account, we considered several architectures. One we seriously considered was backed by Apache Hive™, and Docstore (Uber’s in-house distributed database), finally landing on a solution leveraging Apache Pinot™. An interesting feature here is the hybrid table, presenting a seamless interface which stitches real-time and offline data together.
Architecture
Development Challenges
Apache Pinot™ is an extremely powerful product, giving us unparalleled flexibility with our schema. However, we encountered several roadblocks along the way, and below, we detail the challenges and our solutions to these issues that may be insightful to the reader.
Challenge #1a – Capacity Planning
Our first challenge was to formulate a capacity requirement to provision the hardware required for our dedicated tenant. Apache Pinot™ utilizes compression techniques, making it difficult to measure space on disk a priori. Here we opted to take a 10% sample dataset and projecting disk usage based on the space taken by the sample was sufficient in our case.
Challenge #1b – Query Performance
One interesting side note here is that, while we were able to approximate dataset size on disk via sampling, we were unable to accurately predict query performance. In this case, we had to wait until after we scaled up for the entire dataset before we were able to acquire crucial metrics, e.g. p99 read time, disk read throughput, etc. Once we were able to do so, production-sized traffic brought our dedicated cluster to its knees, seeing read times exceeding 10s broker limits, maxing out read throughput on SSDs, and CPU usage. We proceeded to immediately investigate optimizations, as it was not unexpected, since each query was tantamount in load to a full table scan.
For reference, our query is of the shape:
SELECT
*FROM
pinot_hybrid_table
WHERE provider_id = ‘…’
AND requester_id = ‘…’
AND timestamp >= … AND timestamp <= …
This concerned the team, and we took several steps to improve performance, including:
- Sorted provider_id column
This colocates all trips made by the same provider on the same day, minimizing segments visited per query.
Without this, all the jobs fulfilled by a provider on a given day would be equally distributed amongst all the segments for a given day. It follows that if a provider has performed multiple jobs on the same day, in order for the broker to fulfill our read query, it would quickly converge to retrieving all segments for every day a provider has fulfilled a job. - Adding inverted indices on provider_id and requester_id
We also enabled inverted indices on the provider_id and requester_id columns. In combination with a sorted column, this provides a sorted inverted index on the provider_id column. This allows a lookup time complexity of log(n), as it performs a binary search to find the rows that correspond to the given provider_id value.
One surprising fact which caught us off-guard in a big way, was that these inverted indices are created for each segment. This is a very significant difference between this, and say, a traditional RDBMS index. This index is not a global structure shared amongst all the segments, directly pointing to the data being queried. In our case, the broker must still memory-map a portion of each segment in the table in order to use the index. This is extremely inefficient without implementing additional segment-pruning techniques, which we implemented soon after. - Adding bloom filters on provider_id and requester_id
Bloom filters are a probabilistic data structure, testing whether an element is the member of a set, giving two answers: possibly in set, or not in set. When enabled for a column, Apache Pinot™ creates one Bloom filter per segment, and allows the broker to skip segments altogether when fulfilling queries. If a Bloom filter exists for a column on a segment, and there is an equality predicate for that column in the query, the broker is able to quickly determine whether the record exists in the segment or not. As our dataset does not totally fit in memory, we chose an MMAP (memory-mapped) off-heap configuration, whereby segments are lazily loaded into memory, and previously loaded segments will be unmapped if there is insufficient physical memory by the operating system (as in our case). However, the segment’s associated Bloom filters can be stored on heap (in-memory), and will remain there, even if their underlying segments are no longer in physical memory.
The speedup observed should be linked to the number of segments skipped, thus it benefits datasets where the read pattern does not require fetching a large proportion of total segments (e.g. a full table scan).
See Figures 3, and 4, where one can observe the significant drop in (numSegmentsQueried – numSegmentsProcessed) after enabling Bloom filters.
- Increased segments per day
To create our Apache Pinot™ segments, we schedule an Apache Spark™ job running daily, creating and uploading new segments composing the offline table. The number of segments created per schedule interval is adjustable, and we initially started with four segments a day. However, as segments began to grow very large (over 4GB a segment), we incrementally increased this to 8, then 16, and finally landed on 32 segments a day.
The tradeoff made here was that while increased segment count can cause increased load on Zookeeper metadata storage, and increased Apache Pinot™ servers’ memory heap usage, smaller segment sizes causes segments to be more quickly read off disc, and proportionally more of the data on the segment to be included in the result. Empirically, we observed a significant reduction in p99 read latency, and unnoticeable broker CPU usage increase.
- Added a cross-datacenter cache to our upstream consumer
While not specific to Apache Pinot™, our main upstream consumer executes identical queries between staging and production environments. While we expect the time delay between queries to be minimal, we decided that 30 minutes of staleness is acceptable.
Statistic | Before Bloom Filters | After Bloom Filters |
timeUsedMs | 387 | 1740 |
numDocsScanned | 21 | 21 |
totalDocs | 50,520,067,053 | 50,550,326,486 |
numServersQueried | 18 | 18 |
numServersResponded | 18 | 18 |
numSegmentsQueried | 20,491 | 20,488 |
numSegmentsProcessed | 4,829 | 48 |
numSegmentsMatched | 17 | 17 |
numConsumingSegmentsQueried | 2 | 2 |
numEntriesScanned | 16,466,904 | 147 |
Challenge #2 – Business edge cases
As Uber continues to add features to the platform, so do their downstream impacts on source data. Currently, we can receive trip-level information via three modalities: Apache Hive™, Apache Kafka™ Topic, and API response, each complete with a different schema. Retrofitting and representing new features into an existing schema in a sensible way can be difficult, in particular, Apache Hive™ schemas. Historical data holds a lot of inertia and can make migrating schemas unreasonable.
For example, consider the Fare Split feature, where the cost of a ride can be split amongst multiple riders. Prior to this feature, it was always true that a job had one rider, which was always the payer, and each Hive record implied one job performed by the driver on the order. These invariants no longer hold true. The choice was made here to duplicate records and set the status to FARE_SPLIT, while setting driver_uuid column to NULL.
It is complications like this that make performing a simple COUNT DISTINCT aggregation not possible. For each business case, deciding whether or not a record contributes to a subject’s tenure must be clearly thought out:
Redispatched rides (Another driver is dispatched to fulfill an already assigned job) | To whom should the job tenure be attributed to? Both drivers? If these are represented as multiple records, should it count as 2 jobs for the rider? |
Guest Rides | Due to the nature of these accounts, they can cause hot shards, causing table scan queries to be expensive. |
Unfulfilled orders | Should unfulfilled orders contribute to tenure? |
Requester cancellation, Order failure, etc. | |
Multiple dispatch of same provider on the same order | 1 order with multiple jobs fulfilled by the same driver should contribute to how much tenure for the requester? |
Scheduled orders | Should orders that haven’t occurred yet accrue tenure against a subject? |
Challenge #3 – Slow data
Another challenge we had to solve here was slow upstream data. While most data arrives in seconds, it’s possible that a trip may not appear in upstream data sources for up to a week. To solve this, we created a pipeline that dynamically generates backfill pipelines, but only scheduled to run between T – 7d and T.
Alongside this, we also perform offline data quality checks which are simple COUNT(*) queries for T – 1d date to ensure our source Apache Hive™ tables are in sync with results from our Apache Pinot™ hybrid table.
Challenge #4 – Bursty upstream loads
Upstream traffic with spiky traffic patterns was also an issue. In this case, the specific rate limiting implementation caused a large spike in traffic every ten seconds, each of which was breaching the ten-second Apache Pinot™ broker timeout, failing the request.
We addressed this by adding jitter to our upstream client at request time to more evenly distribute our queries over time.
After overcoming these challenges, we have been serving live production traffic for almost a year, and performing load testing shows at least 200% headroom, after including a buffer for failover traffic. Our p99 read latency is ~1s: impressive, as some of our upstream queries can hit over 2,000 segments, with each segment consuming approximately 90 MB of disk space. After iterating on our solution, we began to publish metrics comparing our previous downstream with our solution. Our Apache Pinot™-based solution presented an almost 1:1 accuracy, giving us the confidence to rely on it. We first gated the change with a config flag, and after some time, cut over fully and deprecated the previous implementation.
Conclusion
With some simple additions, we are confident we will be able to answer more powerful questions. For example:
- Which city did someone take the plurality of their last 50 trips in?
- Who amongst Uber’s population are high-tenure?
Finding point-in-time tenure at job granularity is expensive. Avenues to improve performance and reduce storage costs here are still being explored. While still in the design process, we expect that by dropping the job-granularity requirement, we should be able to significantly increase read throughput.
Acknowledgments
Special thanks to Caner Balci, Qiaochu Liu, Jacob Sevart, and Ujwala Tulshigiri for their contributions to this post.
Apache®, Apache Hive™, Apache Kafka®, Apache Spark™ and Apache Pinot™ are registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.
Ryan Woo
Ryan is a Senior Software Engineer on the Safety and Insurance Team, based in San Francisco.
Sameer Kapoor
Sameer is an Engineering Manager on the Safety and Insurance team, based in San Francisco.
Posted by Ryan Woo, Sameer Kapoor
Related articles
Most popular
Differential Backups in MyRocks Based Distributed Databases at Uber
Upgrading Uber’s MySQL Fleet to version 8.0
Sparkle: Standardizing Modular ETL at Uber
Charting the mobility evolution: excerpts from Uber’s latest industry paper
Products
Company