Three years ago, Uber Engineering adopted Hadoop as the storage (HDFS) and compute (YARN) infrastructure for our organization’s big data analysis. This analysis powers our services and enables the delivery of more seamless and reliable user experiences.
We use Hadoop for both batch and streaming analytics across a wide range of use cases, such as fraud detection, machine learning, and ETA calculation. As Uber’s business grew over the last few years, our volume of data and the associated access loads increased exponentially; in 2017 alone, the amount of data stored on HDFS grew more than 400 percent.
Scaling our infrastructure while maintaining high performance was no easy feat. To accomplish this, Uber’s Data Infrastructure team massively overhauled our approach to scaling our HDFS by implementing several new adjustments and features, including View File System (ViewFs), frequent HDFS version upgrades, NameNode garbage collection tuning, limiting the number of small files that filter through the system, an HDFS load management service, and a read-only NameNode replica. Read on to learn how Uber implemented these improvements to facilitate the continued growth, stability, and reliability of our storage system.
HDFS was designed as a scalable distributed file system to support thousands of nodes within a single cluster. With enough hardware, scaling to over 100 petabytes of raw storage capacity in one cluster can be easily—and quickly—achieved.
For Uber, however, the rapid growth of our business made it difficult to scale reliably without slowing down data analysis for our thousands of users making millions of Hive or Presto queries each week.
Currently, Presto accounts for more than half of the access to HDFS, and 90 percent of Presto queries take ~100 seconds to process. If our HDFS infrastructure is overloaded, Presto queries in the queue pile up, resulting in query completion delays. On top of this, one queried, we need data to be available on HDFS as soon as possible.
For our original storage infrastructure, we engineered extract, transform, and load (ETL) to occur in the same clusters where users run queries to reduce replication latency. The dual duties of these clusters results in the generation of small files to accommodate frequent writes and updates, which further clogs the queue.
On top of these challenges, multiple teams require a large percentage of stored data, making it impossible to split clusters by use case or organization without duplications and in turn decreasing efficiency while increasing costs.
The root of these slowdowns—the main bottleneck of our ability to scale our HDFS without compromising the UX—was the performance and throughput of the NameNode, the directory tree of all files in the system that tracks where data files are kept. Since all metadata is stored in the NameNode, client requests to an HDFS cluster must first pass through it. Further complicating this, a single ReadWriteLock on the NameNode namespace limits the maximum throughput the NameNode can support because any write request will exclusively hold the write lock and force any other requests to wait in the queue.
In late 2016, we started experiencing high NameNode remote procedure call (RPC) queue time as a result of this combination. At times, NameNode queue time could exceed 500 milliseconds per request (with the slowest queue time reaching nearly a second), meaning that every single HDFS request waited for at least half a second in the queue—a stark slowdown compared to our normal process time of under 10 milliseconds.
Enabling scaling & improving performance
Ensuring the high performance of our HDFS operations while continuing to scale led us to develop several solutions in parallel to avoid outages in the short term. At the same time, these solutions let us build a more reliable and extensible system capable of supporting future growth in the long term.
Below, we outline some of the improvements that enabled us to expand our HDFS infrastructure more than 400 percent while at the same time improving the system’s overall performance:
Scaling out using ViewFs
Inspired by a similar effort at Twitter, we utilized View File System (ViewFs) to split our HDFS into multiple physical namespaces and used ViewFs mount points to present a single virtual namespace to users.
To accomplish this, we separated our HBase from the same HDFS cluster as our YARN and Presto operations. This adjustment not only greatly reduced the load on the main cluster, but also made our HBase much more stable, reducing the HBase cluster restart from hours to minutes.
We also created a dedicated HDFS cluster for aggregated YARN application logs. YARN-3269 is needed to make log aggregation support ViewFs. Our Hive scratch directory was also moved to this cluster. The results of this added functionality were very satisfactory; currently, the new cluster serves around 40 percent of the total write requests and most files on it are small files which also relieve the file count pressure on the main cluster. Because no client-side changes were needed for existing user applications, this transition was very smooth.
And finally, we implemented separate HDFS clusters behind ViewFs instead of the infrastructure’s HDFS Federation. With this set-up, HDFS upgrades can be gradually rolled out to minimize the risk of large-scale outages; additionally, complete isolation also helps improve system reliability. One downside to this fix, however, is that maintaining separate HDFS clusters leads to slightly higher operational costs.
A second solution for our scaling challenges was to upgrade our HDFS to keep up with the latest release versions. We installed two major upgrades in one year, first from CDH 5.7.2 (HDFS 2.6.0 with a lot of patches) to Apache 2.7.3, and then to 2.8.2. To execute on this, we also had to rebuild our deployment framework on top of Puppet and Jenkins to replace third party cluster management tools.
The upgrades brought us critical scalability improvements, including HDFS-9710, HDFS-9198, and HDFS-9412. For instance, after upgrading to Apache 2.7.3, the amount of incremental block reports decreased, leading to a reduction in NameNode’s load.
Upgrading HDFS can be risky because it can cause downtime, performance degradation, or data loss. To counter these possible issues, we spent several months validating 2.8.2 before deploying it in production. However, there was still one bug (HDFS-12800) that caught us by surprise while we were upgrading our largest production cluster. Even though we caught it late, having isolated clusters, a staged upgrade process, and contingency rollback plans helped us mitigate its effects.
The ability to run different versions of YARN and HDFS on the same servers at the same time also turned out to be very critical to our scaling efforts. Since both YARN and HDFS are part of Hadoop, they are normally upgraded together. However, major YARN upgrades take much longer to fully validate and roll out because some production applications running on YARN might need to be updated due to YARN API changes or JAR dependency conflicts between YARN and these applications. While YARN scalability has not been an issue in our environment, we did not want the critical HDFS upgrades to be blocked by YARN upgrades. To prevent possible blockages, we currently run an earlier version of YARN than HDFS, which works well for our use case. (However, this strategy might not work when we adopt features like Erasure Coding because of the required client-side changes.)
NameNode garbage collection tuning
Garbage collection (GC) tuning has also played an important role in our optimization and given us much needed breathing room as we continue to scale out our storage infrastructure.
We prevent long GC pauses by forcing Concurrent Mark Sweep collectors (CMS) to do more aggressive old generation collections by tuning CMS parameters like CMSInitiatingOccupancyFraction, UseCMSInitiatingOccupancyOnly, and CMSParallelRemarkEnabled. While this increases CPU utilization, we are fortunate to have enough spare CPU cycles to support this functionality.
During heavy Remote Procedure Call (RPC) loads, a large number of short-lived objects are created in young generation, which forces the young generation collector to perform stop-the-world collection frequently. By increasing the young generation size from 1.5GB to 16GB and tuning the ParGCCardsPerStrideChunk value (set at 32,768), the total time our production NameNode spent on GC pause decreased from 13 percent to 1.7 percent, increasing throughput by more than 10 percent. Benchmark results (Figure 3) show additional improvements in read-only scenarios.
For reference, our customized GC-related Java Virtual Machine (JVM) arguments for NameNode with 160GB heap size are:
- XX:ParGCCardsPerStrideChunk=32768 -XX:+UseParNewGC
- XX:+UseConcMarkSweepGC -XX:+CMSConcurrentMTEnabled
- XX:+CMSParallelRemarkEnabled -XX:+UseCondCardMark
We are also in the process of evaluating whether or not to integrate the Garbage-First Garbage Collector (G1GC) with our system. Although we did not see advantages when using G1GC in the past, new versions of the JVM bring additional garbage collector performance improvements, so revisiting the choice of our collector and configuration is occasionally necessary.
Controlling the number of small files
Because NameNode loads all file metadata in memory, storing small files increases memory pressure on the NameNode. In addition, small files lead to an increase of read RPC calls for accessing the same amount of data when clients are reading the files, as well as an increase in RPC calls when files are generated. To reduce the number of small files in our storage, we took two main approaches.
First, our Hadoop Data Platform team built new ingestion pipelines based on our Hoodie library that generates much bigger files than those created by our original data pipelines. As an interim solution before these were available, however, we also built a tool (referred to internally as a ‘stitcher’) that merges together small files into larger ones, mostly greater than 1GB in size.
Second, we set a strict namespace quota on Hive databases and application directories. To enforce this, we created a self-service tool for users to manage quotas within their organizations. The quota is allocated at the ratio of 256MB per file to encourage users to optimize their output file size. Hadoop teams also provide optimization guides and file merge tools to help users adopt these best practices. For example, turning on auto-merge on Hive and tuning the number of reducers can greatly reduce number of files generated by Hive insert-overwrite queries.
HDFS load management service
One of the biggest challenges of running a large multi-tenant infrastructure like HDFS is detecting which applications are causing unusually large loads, and from there, taking quick action to fix them. We built an in-house HDFS load management service, referred to as Spotlight, for this purpose.
In our current implementation of Spotlight, an audit log is streamed from active NameNode and processed in real time through a backend based on Flink and Kafka. The analysis output is then shown through a dashboard and used to automatically disable accounts or kill workflows that are causing HDFS slowdown.
We are working on the development of Observer NameNode (HDFS-12975), a new HDFS feature designed as a read-only NameNode replica aimed at reducing the load on the active NameNode cluster. Because more than half of our HDFS RPC volume and growth comes from read-only Presto queries, we expect Observer NameNodes to help us scale the overall NameNode throughput by close to 100 percent in the first release. We have finished validation on this tool and are in the process of putting it into production.
As we scaled our HDFS infrastructure, we picked up a few best practices that might be valuable for other organizations facing similar issues, outlined below:
- Layer your solutions: Implementing large scalability improvements like Observer NameNode or splitting HDFS into more clusters takes significant effort. Short-term measures like GC tuning and merging smaller files into larger ones via our stitcher gave us a lot of breathing room to develop long-term solutions.
- Bigger is better: Since small files are a threat to HDFS, it is better to tackle them earlier rather than later. Providing tools, documents, and training to users are all very effective approaches to help enforce best practices critical to running a multi-tenant HDFS infrastructure.
- Participate in the community: Hadoop has been around for more than 10 years and its community is more active than ever before, leading to scalability and functionality improvements introduced in nearly every release. Participating in the Hadoop community by contributing your own discoveries and tools is very important for the continued scaling of your infrastructure.
While we have made great progress over the last couple of years, there is always more to be done to further improve our HDFS infrastructure.
For instance, in the near future we plan to integrate various new services into our storage system, as depicted in Figure 6. These additions will enable us to further scale our infrastructure and make Uber’s storage ecosystem more efficient and easier to use.
Below, we highlight what is in store for two of our main projects, a Router-based HFDS Federation and tiered storage:
Router-based HDFS Federation
We currently utilize ViewFs to scale out HDFS when subclusters become overloaded. The main problem with this approach is that client configuration changes are required every time we add or replace a new mount point on ViewFs, and it is very difficult to roll out these adjustments without affecting production workflows. This predicament is one of the main reasons we currently only split the data that does not require large-scale client-side changes, e.g. YARN log aggregation.
Microsoft’s new initiative and implementation of Router-based Federation (HDFS-10467, HDFS-12615), which is currently included in the HDFS 2.9 release, is a natural extension to a ViewFs-based partitioned federation. This federation adds a layer of software capable of centralizing HDFS namespaces. By offering the same interface (a combination of RPC and WebHDFS), its extra layer gives users transparent access to any subclusters, and lets subclusters manage their data independently.
By providing a rebalancing tool, the federation layer would also support transparent data movement across subclusters for balancing workloads and implementing tiered storage. The federation layer maintains the state of the global namespace in a centralized state store and allows multiple active routers to get up and running while directing user requests to the correct subclusters.
We are actively working on bringing Router-based HDFS Federation to production at Uber while closely collaborating with the Hadoop community on further open source improvements, including WebHDFS support.
As our infrastructure’s scale increases, so too does the importance of reducing storage costs. Research conducted among our technical teams shows that users access recent data (“hot” data) more frequently than old data (“warm” data). Moving old data to a separate, less resource-intensive tier will significantly reduce our storage costs. HDFS Erasure Coding, Router-based Federation, high density (more than 250TB) hardware, and a data mover service (to handle moving data between “hot” tier clusters and “warm” tier clusters) are key components of our forthcoming tiered storage design. We plan to share our experience on our tiered storage implementation in a future article.
If you are interested in scaling large-scale distributed systems, consider applying for a role on our team!