Computing frameworks like Apache Spark have been widely adopted to build large-scale data applications. For Uber, data is at the heart of strategic decision-making and product development. To help us better leverage this data, we manage massive deployments of Spark across our global engineering offices.
While Spark makes data technology more accessible, right-sizing the resources allocated to Spark applications and optimizing the operational efficiency of our data infrastructure requires more fine-grained insights about these systems, namely their resource usage patterns.
To mine these patterns without changing user code, we built and open sourced JVM Profiler, a distributed profiler to collect performance and resource usage metrics and serve them for further analysis. Though built for Spark, its generic implementation makes it applicable to any Java virtual machine (JVM)-based service or application. Read on to learn how Uber uses this tool to profile our Spark applications, as well as how to use it for your own systems.
On a daily basis, Uber supports tens of thousands of applications running across thousands of machines. As our tech stack grew, we quickly realized that our existing performance profiling and optimization solution would not be able to meet our needs. In particular, we wanted the ability to:
Correlate metrics across a large number of processes at the application level
In a distributed environment, multiple Spark applications run on the same server and each Spark application has a large number of processes (e.g. thousands of executors) running across many servers, as illustrated in Figure 1:
Our existing tools could only monitor server-level metrics and did not gauge metrics for individual applications. We needed a solution that could collect metrics for each process and correlate them across processes for each application. Additionally, we do not know when these processes will launch and how long they will take. To be able to collect metrics in this environment, the profiler needs to be launched automatically with each process.
Make metrics metrics collection non-intrusive for arbitrary user code
In their current implementations, Spark and Apache Hadoop libraries do not export performance metrics; however, there are often situations where we need to collect these metrics without changing user or framework code. For example, if we experience high latency on a Hadoop Distributed File System (HDFS) NameNode, we want to check the latency observed from each Spark application to ensure that these issues haven’t been replicated. Since NameNode client codes are embedded inside our Spark library, it is cumbersome to modify its source code to add this specific metric. To keep up with the perpetual growth of our data infrastructure, we need to be able to take the measurements of any application at any time and without making code changes. Moreover, implementing a more non-intrusive metrics collection process would enable us to dynamically inject code into Java methods during load time.
Introducing JVM Profiler
To address these two challenges, we built and open sourced our JVM Profiler. There are some existing open source tools, like Etsy’s statsd-jvm-profiler, which could collect metrics at the individual application level, but they do not provide the capability to dynamically inject code into existing Java binary to collect metrics. Inspired by some of these tools, we built our profiler with even more capabilities, such as arbitrary Java method/argument profiling.
What does the JVM Profiler do?
The JVM Profiler is composed of three key features that make it easier to collect performance and resource usage metrics, and then serve these metrics (e.g. Apache Kafka) to other systems for further analysis:
- A java agent: By incorporating a Java agent into our profiler, users can collect various metrics (e.g. CPU/memory usage) and stack traces for JVM processes in a distributed way.
- Advanced profiling capabilities: Our JVM Profiler allows us to trace arbitrary Java methods and arguments in the user code without making any actual code changes. This feature can be used to trace HDFS NameNode RPC call latency for Spark applications and identify slow method calls. It can also trace the HDFS file paths each Spark application reads or writes to identify hot files for further optimization.
- Data analytics reporting: At Uber, we use the profiler to report metrics to Kafka topics and Apache Hive tables, making data analytics faster and easier.
Typical use cases
Our JVM Profiler supports a variety of use cases, most notably making it possible to instrument arbitrary Java code. Using a simple configuration change, the JVM Profiler can attach to each executor in a Spark application and collect Java method runtime metrics. Below, we touch on some of these use cases:
- Right-size executor: We use memory metrics from the JVM Profiler to track actual memory usage for each executor so we can set the proper value for the Spark “executor-memory” argument.
- Monitor HDFS NameNode RPC latency: We profile methods on the class org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB in a Spark application and identify long latencies on NameNode calls. We monitor more than 50 thousand Spark applications each day with several billions of such RPC calls.
- Monitor driver dropped events: We profile methods like org.apache.spark.scheduler.LiveListenerBus.onDropEvent to trace situations during which the Spark driver event queue becomes too long and drops events.
- Trace data lineage: We profile file path arguments on the method org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations and org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock to trace what files are read and written by the Spark application.
Implementation details and extensibility
To make implementation as seamless as possible, JVM Profiler has a very simple and extensible design. People can easily add additional profiler implementations to collect more metrics and also deploy their own custom reporters for sending metrics to different systems for data analysis.
The JVM Profiler code is loaded into a Java process via a Java agent argument once the process starts. It consists of three main parts:
- Class File Transformer: instruments Java method bytecode inside the process to profile arbitrary user code and save metrics in an internal metric buffer.
- Metric Profilers
- CPU/Memory Profiler: collects CPU/Memory usage metrics via JMX and sends them to the reporters.
- Method Duration Profiler: reads method duration (latency) metrics from the metrics buffer and sends to the reporters.
- Method Argument Profiler: reads method argument values from the metrics buffer and sends to the reporters.
- Console Reporter: writes metrics in the console output.
- Kafka Reporter: sends metrics to Kafka topics.
How to extend the JVM Profiler to send metrics via a custom reporter
Users could implement their own reporters and specify them with -javaagent option, like:
Integration with Uber’s data infrastructure
We integrated our JVM Profiler metrics with Uber’s internal data infrastructure to enable:
- Cluster-wide data analysis: Metrics are first fed to Kafka and ingested to HDFS, then users query with Hive/Presto/Spark.
- Real-time Spark application debugging: We use Flink to aggregate data for a single application in real time and write to our MySQL database, then users can view the metrics via a web-based interface.
Using the JVM Profiler
Below, we provide instructions for how to use our JVM Profiler to trace a simple Java application:
First, we git clone the project:
The we build the project by running the following command:
Next, we call the build result JAR file (e.g.target/jvm-profiler-0.0.5.jar) and run the application inside the JVM Profiler using the following command:
The command runs the sample Java application and reports its performance and resource usage metrics to the output console. For example:
The profiler can also send metrics to a Kafka topic via a command like the following:
Use the profiler to profile the Spark application
Now, let’s walkthrough how to run the profiler with the Spark application.
Assuming we already have an HDFS cluster, we upload the JVM Profiler JAR file to our HDFS:
Then we use the spark-submit command line to launch the Spark application with the profiler:
Metric query examples
At Uber, we send our metrics to Kafka topics and program background data pipelines to automatically ingest them to Hive tables. Users can set up similar pipelines and use SQL to query profiler metrics. They can also write their own reporters to send the metrics to a SQL database like MySQL and query from there. Below is an example of a Hive table schema:
Below, we offer an example result when running a previous SQL query, which shows the memory and CPU metrics for each process for the Spark executors:
Results and next steps
We applied the JVM Profiler to one of Uber’s biggest Spark applications (which uses 1,000-plus executors), and in the process, reduced the memory allocation for each executor by 2GB, going from 7GB to 5GB. For this Spark application alone, we saved 2TB of memory.
We also applied the JVM Profiler to all Hive on Spark applications inside Uber, and found some opportunities to improve memory usage efficiency. Figure 3, below, shows one result we found: around 70 percent of our applications used less than 80 percent of their allocated memory. Our findings indicated that we could allocate less memory for most of these applications and increase memory utilization by 20 percent.
As we continue to grow our solution, we look forward to additional memory reduction across our JVMs.
JVM Profiler is a self-contained open source project and we welcome other developers to use this tool and contribute (e.g. submit pull requests) as well!
Our Big Data Engineering team in Seattle, Palo Alto, and San Francisco is working on tools and systems to expand the entire Hadoop ecosystem, including HDFS, Hive, Presto, and Spark. We build technologies on top of this family of open source software to help us make better, data-driven business decisions. If this sounds appealing to you, check out our job opportunities and consider joining our team!
Photo Credit Header: Seahorse, Roatan, Honduras by Conor Myhrvold.
Subscribe to our newsletter to keep up with the latest innovations from Uber Engineering.
Bo Yang was a Senior Software Engineer II at Uber. Bo worked in the Big Data area for 10+ years in various companies building large-scale systems including a Kafka-based streaming platform and Spark-based batch processing service.
Nan Zhu is a software engineer in Uber’s Seattle office focused on Spark Observability.
Felix Cheung is a senior software engineer in Uber's Seattle office focused on Spark.
Xu Ning is a Senior Engineering Manager in Uber’s Seattle Engineering office, currently leading multiple development teams in Uber’s Michelangelo Machine Learning Platform. He previously led Uber's Cherami distributed task queue, Hadoop observability, and Data security teams.
Posted by Bo Yang, Nan Zhu, Felix Cheung, Xu Ning
Cinnamon: Using Century Old Tech to Build a Mean Load Shedder
November 22 / Global
Real-Time Analytics for Mobile App Crashes using Apache Pinot
November 2 / Global