As Uber’s business has expanded, the underlying pool of data that powers it has grown exponentially, and thus ever more expensive to process. When Big Data rose to become one of our largest operational expenses, we began an initiative to reduce costs on our data platform, which divides challenges into 3 broad pillars: platform efficiency, supply, and demand. In this post we will discuss our efforts to improve the efficiency of our data platform and bring down costs.
Big Data File Format Optimizations
The majority of our Apache® Hadoop® File System (HDFS) space is occupied by Apache HiveTM tables. Those tables are either stored in Apache ParquetTM File Format or Apache ORCTM File Format. Although we plan to consolidate to Parquet at some point in the future, we have not been able to make it happen yet due to many specific requirements, including situational compatibility and performance.
Both Parquet and ORC file formats are block-based columnar formats, which means that the files comprise a number of blocks, each of which contains a big number of rows (let’s say 10,000), stored in columns.
We spent some good amount of time looking into our files on HDFS, and decided to take on the following optimizations, mainly focusing on the Parquet format:
- Compression Algorithms: By default, we use GZIP Level 6 as the compression algorithm inside Parquet. Recent community development on Parquet’s support for ZSTD from Facebook caught our attention. In our experiment, ZSTD Level 9 and Level 19 are able to reduce our Parquet file size by 8% and 12% compared to GZIP-based Parquet files, respectively. Moreover, both ZSTD Level 9 and Level 19 have decompression speeds faster than GZIP level 6. We decided to adopt ZSTD Level 9 for recompressing our Data after 1 month, and ZSTD Level 19 for compressing after 3 months. This is because ZSTD Level 9 compression is 3 times faster than Level 19 in our experiment. Note that the recompression jobs are background maintenance jobs, which can run with non-guaranteed compute resources. Given the abundance of such resources, we can essentially treat those recompression jobs as free.
- Column Deletion: Many of our Hive tables, especially those ingested from Apache Kafka® logs, contain many columns, some of which are nested. When we look into those columns, it’s clear that some of them are not necessary to be kept for the longer term. Examples of those include debugging metadata for each Kafka message and any fields that need to be deleted after some time due to compliance reasons. Given the columnar format, it’s technically possible to delete columns inside a file without decompression and recompression of other columns. This makes Column Deletion a very CPU-efficient operation. We implemented such a feature at Uber, used it extensively on our Hive tables, and contributed the code back to Apache Parquet.
- Row Reordering: Row order can dramatically affect the size of compressed Parquet files. This is due to both the Run-Length Encoding feature inside Parquet format, as well as the compression algorithm’s capability to take advantage of local repeats. We examined a list of the largest Hive tables at Uber, and performed manually-tuned ordering that reduces the table sizes by more than 50%. A common pattern that we found is simply to order the rows by user ID, and then timestamp for the log tables. Most log tables have user ID and timestamp columns. This allows us to compress many denormalized columns associated with the user ID extremely well.
- Delta Encoding: Once we started to order rows by timestamp, we quickly noticed that Delta Encoding can help further reduce the data size, since the difference between adjacent timestamps is very small compared with the timestamp value itself. In some cases the logs have a steady rhythm, like a heartbeat, and so the difference is constant. However, in our environment where Apache Hive, Presto®, and Apache SparkTM are all used widely, enabling Delta Encoding in Parquet is not easy, as noted in the StackOverflow question. We are still exploring this direction.
HDFS Erasure Coding
Erasure Coding can dramatically reduce the replication factor of HDFS files. Due to the potentially increased IOPS workload, at Uber we are mainly looking at 3+2 and 6+3 schemes with replication factor of 1.67x and 1.5x. Given that the default HDFS replication factor is 3x, we can reduce the HDD space needs by nearly half!
There are several choices for Erasure Code though:
- Apache Hadoop 3.0 HDFS Erasure Code: This is the official Erasure Code implemented in the Apache Hadoop 3.0. The good thing about this implementation is that it works for both big and small files. The drawback is that the IO efficiency is not as good, since the blocks are very fragmented for Erasure Code.
- Client-side Erasure Code: This was first implemented by Facebook in the HDFS-RAID project. The good thing about this approach is that it’s very IO efficient. When all of the blocks are available, the read IO efficiency is the same as the baseline where the blocks are 3-way replicated. The drawback is that it doesn’t work for small files, since each block is a unit for Erasure Code computation.
After consulting with industry experts, we decided to go with Apache Hadoop 3.0 HDFS Erasure Code since this is the direction of the community. We are still in the evaluation phase of the Apache Hadoop 3.0 HDFS Erasure Code, but we are confident that this will make a huge impact to reduce our HDFS costs.
YARN Scheduling Policy Improvements
At Uber, we use Apache YARNTM to run the majority of our Big Data Compute workload (except Presto which runs directly on dedicated servers). Just like many other companies, we started with the standard Capacity Scheduler inside YARN. Capacity Schedule allows us to configure a hierarchical queue structure with MIN and MAX settings for each of the queues. We have created a 2-level queue structure with organizations as the first level, and which allow users to create second-level queues based on sub-teams, priority tiers, or job types.
While the Capacity Scheduler gives us a good start in managing our YARN queue capacity, we soon started to face a dilemma in managing our YARN cluster’s capacity:
- High Utilization: We would like to keep the YARN cluster’s average utilization (measured by CPU and MemGB allocated / total CPU and MemGB capacity of the cluster) as high as possible;
- Meeting User Expectations: We would like to give users clear expectations of how much resource they can expect from the cluster
Many of our users have spiky but predictable resource needs from our YARN cluster. For example, one queue may have a set of daily jobs, each of which starts at a particular time of day and consumes a similar amount of CPU/MemGB for a similar period of time.
If we set the MIN of the queue to the peak usage during the day, then the cluster utilization will be pretty low since the average resource needs from a queue is far below the MIN.
If we set the MAX of the queue to the peak usage during the day, then there is a chance that the queue may be abused over time to consistently take resources close to MAX, and that in turn may affect everyone else’s normal jobs in other queues.
How can we capture the users’ resource needs and set their expectations correctly? We came up with the following idea, called Dynamic MAX.
Dynamic MAX algorithm use the following settings:
- Set MIN of the queue to be the average usage of the queue
- Set MAX of the queue to be the value at which one hour of such usage and the past 23 hours of usage will average to the MIN of the queue
|Dynamic_MAX = max(MIN, MIN * 24 – Average_Usage_In_last_23_hours * 23)|
Dynamic_MAX is calculated at the beginning of each hour, and applied to the queue MAX for that hour.
The intuitions behind the Dynamic MAX algorithm here are:
- If the queue was not used at all in the last 23 hours, we allow the queue to spike to, at most, 24x its MIN. This is usually enough to handle the vast majority of our spiky workloads.
- If the queue was used on average at MIN in the last 23 hours, then we only allow the queue to be used at MIN for the next hour. With this rule, the average usage of the queue in the 24-hour period won’t exceed MIN, thus avoiding the abuse case mentioned above.
The Dynamic MAX algorithm above is easy to explain to users: Basically their usage can spike to at most 24x of their queue’s MIN, however, their accumulated usage within 24 hours cannot be more than that of a constant usage at MIN level for the fairness of the cluster.
In reality, we set the MIN to 125% of the queue’s average usage to account for up to 25% of daily usage variance. This in turn implies that our YARN cluster’s average utilization (measured by CPU/MemGB allocation) will be around 80%, which is a pretty good utilization level for cost efficiency.
Avoid the Rush Hours
Another problem we have with YARN resource utilization is that there is still a daily pattern at the whole cluster level. Many teams decided to run their ETL pipelines between 00:00-01:00 UTC, since that’s supposedly when the last day of logs is ready. Those pipelines may run for 1-2 hours. This makes the YARN cluster extremely busy in those rush hours.
Instead of adding more machines to the YARN cluster, which would reduce the average utilization and hurt cost efficiency, we plan to implement time-based rates. Basically, when we calculate the average usage in the last 23 hours, we apply a scaling factor that is different based on the hour of the day. For example, the scaling factor will be 2x for the rush hours from 0-4 UTC, and 0.8x for the rest of the day.
Cluster of Clusters
As our YARN and HDFS clusters continued to grow bigger, we started to notice a performance bottleneck. Both HDFS NameNode and YARN ResourceManager started to slow down due to the ever-increasing cluster size. While this is mainly a scalability challenge, it also dramatically affects our cost efficiency goals.
To solve this issue, there are 2 strategic options in front of us:
A. Continue to improve the single node performance: For example, we can use machines with more CPU vCores and Memory. We can also run stacktraces and flamegraphs to find out the performance bottlenecks and optimize those one by one.
B. Cluster of Clusters (Federation): We can create a virtual cluster of many clusters. Each underlying cluster will have a size suitable for optimal performance of HDFS and YARN. The virtual cluster above that will handle all workload routing logic.
We chose to take on Option B for the following reasons:
- Most of the HDFS and YARN clusters in the world are smaller than what we need at Uber. If we run super-sized clusters, there is a high likelihood that we will run into unknown bugs that don’t appear in smaller-sized clusters.
- To make HDFS and YARN scale to Uber’s cluster size, we may need to change the source code to make a different trade-off between performance and sophisticated features. For example, we identified that the Capacity Scheduler has some complex logic that slows down task assignment. However, code changes to get rid of those won’t be able to merge into Apache Hadoop trunk, since those sophisticated features may be needed by other companies.
To allow ourselves to leverage the open-source Hadoop ecosystem without forking, we decided to build out our Cluster of Clusters setup. In particular, we use Router-Based HDFS Federation and YARN Federation. Both of them are from open-source Apache Hadoop. As of now, we have already set up dozens of HDFS clusters and a handful of YARN clusters. HDFS Router-based Federation has been a cornerstone of our Big Data scalability effort that also improves Cost Efficiency.
Generalized Load Balancing
We described the P99 and Average Utilization challenge earlier. The solution on Cheap and Big HDDs in Part 3 will touch on the importance of IOPS P99.
In this section, we will talk about the generalized load balancing solution which applies to both HDFS and YARN in the following ways:
- HDFS DataNode Disk Space Utilization Balancing: Each DataNode may have a different ratio of disk space utilization. Within each DataNode, each HDD may have a different ratio of disk space utilization. All these need to be balanced to allow a high average disk space utilization.
- YARN NodeManager Utilization Balancing: At any point in time, each machine in YARN can have a different level of CPU and MemGB allocation and utilization. Again, we need to balance both the allocation and utilization in order to allow a high average utilization number.
The similarity between the above solutions lead to the generalized load balancing idea, which applies to many more use cases in and out of our Big Data Platform, e.g. microservice load balancing, and primary storage load balancing. The common link among all of these is that the goal is always to reduce the gap between P99 and average.
We use several query engines in Uber’s Big Data ecosystem: Hive-on-Spark, Spark, and Presto. These query engines, in combination with the file formats (Parquet and ORC), have created an interesting trade-off matrix for our cost efficiency effort. Other options including SparkSQL and Hive-on-Tez make the decision even more complex.
Here are the major efforts on our query engines to improve the cost efficiency:
- Focus on Parquet File Format: Parquet and ORC file formats share some common design principles like row groups, columnar storage, block-level, and file-level statistics. However, their implementations are completely independent, and have different compatibility with other proprietary systems that we use at Uber. Over time, we have seen better Parquet support in Spark, and better ORC support in Presto. Given the growing demand to add features to file formats, we have to decide on one major file format, and we have chosen Parquet. A single, major file format allows us to focus our energy into a single codebase and grow expertise over time.
- Nested Column Pruning: Uber’s Big Data tables have surprisingly highly nested data. This is partly due to the fact that a lot of our upstream data sets are stored in JSON format (See Designing Schemaless) and we enforce Avro schema on those. As a result, the support for Nested Column Pruning is a key feature in query engines at Uber, or otherwise the deeply nested data will need to be completely read out from the Parquet files, even if we just need a single field inside that nested structure. We added Nested Column Pruning to both Spark and Presto. Those have significantly improved our overall query performance, and they are contributed back to the open-source communities.
- Common Query Pattern Optimization: It’s not uncommon to see SQL queries with near one thousand lines in our workload. While the query engines we use all have a query optimizer, they are not specifically handling the patterns common at Uber. One example of those is the use of SQL constructs like “RANK() OVER PARTITION” and “WHERE rank = 1″ with the goal to extract the value of one column in the row where another column’s value is at its max, or “ARGMAX” in mathematical terms. Engines like Presto can run a lot faster when the query is rewritten to use the built-in function “MAX_BY”.
In our experience, it’s hard to predict which engine will work the best for a particular SQL query. Hive-on-Spark is usually very scalable for a huge amount of shuffle data. Presto, in turn, is usually very fast with queries that touch a small amount of data. We are actively watching for improvements in open-source Big Data query engines, and will continue to shift our workload among them to optimize for cost efficiency.
One of the biggest cost efficiency opportunities we had in the Big Data Platform is efficient incremental processing. Many of our fact data sets can arrive late or be changed. For example, in many cases a rider doesn’t rate the driver of the last trip until he or she is about to ask for the next ride. Credit card chargeback to a trip can sometimes take a month to process.
Without an efficient, incremental processing framework, our Big Data users have to scan many days of old data every day to make their query results fresh. A much more efficient way would be to process only the incremental changes every day. That’s what the Hudi project is about.
We started the Hudi project in 2016, and submitted it to Apache Incubator Project in 2019. Apache Hudi is now a Top-Level Project, with the majority of our Big Data on HDFS in Hudi format. This has dramatically reduced the computing capacity needs at Uber.
Next Steps and Open Challenges
Big Data and Online Service Same-Host Colocation
While we have decided to let Big Data Workload use the Online Service hosts when Online Services don’t need them, having both workloads running on the same host would expose many extra challenges.
There are many research papers in this area of performance impact from colocation. The main difference in our approach is that we plan to give the Big Data Workload a very low priority to minimize its impact on Online Services.
Convergence of the Online and Analytics Storage
A lot of our data sets are stored in both Online Storage systems (Schemaless stored in MySQL databases on flash) as well as the Analytics Storage system (Hive tables stored in HDFS on hard drives). Moreover, to allow instantaneous query speed, we are also investing in storage engines like Pinot. All of these resulted in many copies of the same logical data, although stored in different formats.
Is it possible to have a unified storage system that can handle both online and analytical queries? That will reduce the storage cost significantly.
Project HydroElectricity: Leveraging Maintenance Jobs to “Store” the Extra Compute Power
Compute power in a cluster is very similar to electricity supply. It’s usually fixed in supply, and suffers in situations where the demand is spiky or not uniform.
Pumped-Storage hydroelectricity can store surplus electricity in the form of gravitational potential energy of water, and then transform it back to electricity when demand peaks.
Can we apply the same idea to compute power? Yes we can! The key idea to introduce here is maintenance jobs, which are background tasks that can happen at any time over the next day or even a week. Typical maintenance jobs include LSM compaction, compression, secondary index building, data scrubbing, erasure code fixes, and snapshot maintenance. Almost all low-priority jobs without a stringent SLA can be considered maintenance jobs.
In most of our systems, we don’t explicitly split maintenance and foreground jobs. For example, our Big Data Ingestion system writes ZSTD-compressed Parquet files, which takes a lot of CPU power and generates very compact files. Instead of doing that, we can also let Ingestion write lightly-compressed Parquet files which take a bit more disk space but a lot less CPUs. Then we have a maintenance job that runs at a later time to recompress the file. In this way, we can reduce the foreground CPU needs significantly.
Maintenance jobs can take non-guaranteed compute power to run. As we described earlier, we have plenty of resources for that purpose.
Pricing Mechanism for Big Data Usage
Given the multi-tenant Big Data Platform, we are often in situations where it’s hard to satisfy every customer’s resource needs. How do we optimize for the total utility of the limited hardware budget? Is the Dynamic_MAX with Rush Hour multiplier the best option?
We actually believe there are even better solutions. However, it will require us to come up with a more delicate pricing mechanism. Examples that we would like to consider include fake money that each team can spend on our cluster, boost credits which users can use to boost the priority of their jobs, etc.
In this blog post, we shared efforts and ideas in improving the platform efficiency of Uber’s Big Data Platform, including file format improvements, HDFS erasure coding, YARN scheduling policy improvements, load balancing, query engines, and Apache Hudi. These improvements have resulted in significant savings. In addition, we explored some open challenges like analytics and online colocation, and pricing mechanisms. However, as the framework outlined in our previous post established, platform efficiency improvements alone do not guarantee efficient operation. Controlling the supply and the demand of data is equally important, which we will address in an upcoming post.
Apache®, Apache Hadoop®, Apache Kafka®, Apache Hive, Apache ORC, Apache Parquet, Apache Spark, Apache YARN, Hadoop, Kafka, Hive, ORC, Parquet, Spark, YARN, are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.
Presto® is either a registered trademark of LF Projects, LLC in the United States and/or other countries. No endorsement by LF Projects, LLC is implied by the use of this mark.