Skip to main content
April 9, 2026
Accelerating Deep Learning: How Uber Optimized Petastorm for High-Throughput and Reproducible GPU Training

Introduction

At Uber, we train massive deep learning models to power our marketplace and core services. As these models grow in complexity, the infrastructure required to train them becomes a significant cost and performance factor. Specifically, maximizing the utilization of our high-performance GPUs during training, without sacrificing reproducibility, is a constant engineering challenge.

Recently, one of our core machine learning teams identified a critical bottleneck in their training pipeline. They were working with a massive dataset—tens of terabytes in size, containing tens of billions of rows and hundreds of features. At this scale, the data loading pipeline couldn’t keep up with the compute speed, causing GPUs to idle at low utilization rates (10-15%) while waiting for data.

At the same time, we observed another issue: reproducibility. As the training stack scaled to larger models and higher parallelism, we saw a significant increase in variance in the key evaluation metrics—even with identical configurations and seeds.. This made it harder to compare model architectures reliably and to ensure consistency across production training jobs.

In this blog, we first walk through how we engineered a solution to resolve these data bottlenecks within the Petastorm data loader. In our case study, these optimizations drastically improved throughput for these massive datasets, increasing GPU utilization to over 60%, reducing end-to-end training time from 22 hours to 3 hours, and slashing compute costs by nearly 80%.

In the second part, we dive into how we identified and eliminated hidden sources of randomness in Petastorm to enable reproducible training under fixed seeds.

Background

Our training stack relies on Petastorm, an open-source library developed at Uber for reading Apache Parquet™ datasets, and Horovod for distributed training.

For the large-scale deep learning models used in this case study, the standard data flow was as follows:

  1. Reading data. The worker pool reads Parquet row groups from HDFS™.
  2. Transforming and batching. Batch Dataloaders fetch these results, apply necessary transformations (like PyArrow to NumPy™), and batch them.
  3. Feeding the GPU. Async Dataloaders feed these batches to the GPU.

Data pipeline diagram showing Parquet files in HDFS loaded by a worker pool, results placed in a RAM-based result queue, processed by a reader, then passed to an AsyncDataLoader background thread, which puts data into another RAM queue. The AsyncDataLoader main thread retrieves batches from this queue and sends them to GPUs. Color legend indicates HDFS, Petastorm, RAM, and GPUs.

Figure 1: Distributed training data flow.


From this data flow, we faced problems with CPU bottlenecks and reproducibility gaps. 

We observed that for every training epoch, the system was re-reading the exact same row groups from HDFS and re-applying the same CPU-intensive transformations. This created a massive I/O and CPU bottleneck. The GPUs were starving for data, spending the vast majority of their time waiting for the next batch rather than computing.

We also found that even when using the same seed, multi‑worker training jobs could observe different data orders across runs, leading to significant variance in offline metrics for large models.

Why not just load into RAM? For CPU bottlenecks, we considered alternative approaches, such as loading the entire dataset into memory or switching data formats. However, given that our datasets span tens of terabytes, loading everything into RAM was impossible. Furthermore, changing the underlying data format from Parquet would have required a massive migration of our data pipelines. We needed a solution that optimized the existing Parquet-based pipeline to work within the constraints of our infrastructure.

Challenge#1: Improving GPU Utilization

Profiling and Analysis: The Hunt for the Bottleneck

When we first observed the low GPU utilization (10-15%), we didn’t immediately assume it was a data loading issue. We needed to systematically rule out other potential causes to ensure we were solving the right problem.

Our first suspicion was that the model simply wasn’t complex enough to saturate high-performance GPUs. To test this, we ran synthetic benchmarks with significantly larger model architectures (deepening the hidden layers and increasing width). With this approach, utilization jumped to ~80% on synthetic data. This confirmed the GPUs were starving. With a larger model, the GPU spent more time computing each batch, effectively masking the data loading delay. Since we couldn’t arbitrarily increase the size of our production model, this proved that the data pipeline wasn’t delivering batches fast enough to keep up with the GPU’s speed.

Next, we tested if the network speed from HDFS was the culprit. We attempted to force a small subset of the dataset into RAM using an in-memory cache. We found that utilization spiked to over 60%, confirming that when data is available instantly, the GPUs remain busy. This convinced us that Network I/O was the villain. We hypothesized that if we simply cached the data on the local disk (avoiding HDFS), we’d match the in-memory performance.

We implemented a local disk cache to solve the I/O problem, expecting a massive win. But nothing happened. Utilization remained low. This failure forced us to dig deeper. We realized that our RAM Cache experiment had hidden a secondary bottleneck. When data is loaded into RAM, it’s transformed once and stored as ready-to-train NumPy arrays. However, our Disk/HDFS pipeline was performing this transformation (PyArrow → NumPy) just-in-time for every single batch in the main thread.

We had solved the Latency (Network I/O) problem, but we ran straight into a Compute (CPU transformation) wall. Even with the data on the local disk, the CPU couldn’t convert it fast enough to feed the GPU. We realized we needed to cache the transformed data, not just the raw files.

Optimization

Our training infrastructure runs on Ray™  clusters, using Horovod for distributed training and Petastorm for data loading. Because Ray manages the resources, we had the opportunity to leverage the local disk storage available on the worker nodes.

To solve the double bottleneck of network latency and CPU contention, we architected a solution focused on two pillars: push-down transformations and local disk caching (FanoutCache). 

In the original architecture, the transformation of data from PyArrow (the format used to read Parquet) to NumPy (the format needed for training) happened in the main thread of the dataloader. As our profiling showed, this blocked the propagation of batches to the GPU.

To fix this, we pushed this transformation down to the Petastorm worker pool.Instead of returning raw Arrow tables, the worker threads now apply the transformation immediately after reading the row group. The results stored in the queue are pre-transformed NumPy arrays.

The advantage of this approach is that it parallelizes the CPU-heavy transformation across the worker pool, ensuring the main thread does nothing but pass ready-to-eat batches to the GPU.

However, the tradeoff is that PyArrow is more memory-efficient than NumPy. Storing transformed data in the queue increased RAM usage, but this was an acceptable trade-off for the throughput gains.

We also focused on local disk caching. Since we read the same row groups every epoch, paying the network I/O cost repeatedly was inefficient. To fix this, we implemented a Local Disk Cache using the open-source diskcache.FanoutCache library.

The optimized flow within the Petastorm worker pool is now:

  1. A Petastorm worker thread receives a task to read a specific row group.
  2. It checks the local FanoutCache on the node.
  3. If hit, it reads the pre-transformed data directly from the local disk (extremely fast).
  4. If miss, it fetches the row group from HDFS, applies the transformation, writes the result to the cache, and then returns it.

Data pipeline diagram showing Parquet files in a local disk cache, processed by a worker pool and transformed from pyarrow to numpy. Results are placed in a queue, read by a Reader, and passed to an AsyncDataLoader background thread. Data is queued again, then retrieved by the AsyncDataLoader main thread, and finally sent in batches to GPUs. Color legend indicates HDFS, Petastorm, RAM, and GPUs.

Figure 2: The optimized architecture moves transformation to the worker pool and caches the result locally.


Crucially, because we combined this with the push-down transformation, we’re caching the pre-processed data. This means on a cache hit, the CPU doesn’t even need to perform the PyArrow → NumPy conversion; it simply loads the ready-to-use array.

Development Challenges

Implementing these optimizations wasn’t without hurdles. Pushing the hardware to its limit exposed constraints we hadn’t hit before, requiring us to rethink how we handle storage, randomness, and concurrency.

The first challenge was handling data sets larger than the local disk. Our datasets are massive—often larger than the available local storage on a single worker node. This posed a cache management dilemma. Standard eviction policies (like LRU) work well for web services but poorly for training epochs where every data point must be read exactly once per cycle. Evicting a row group just to read it again in the next epoch would defeat the purpose of the cache.

To solve this, we implemented a quota management strategy instead of eviction. We cache row groups until the disk quota is hit, and then fall back to HDFS for the remainder. While this doesn’t remove the bottleneck entirely for the largest datasets, it significantly reduces it. 

The second challenge was preserving training randomness. Deep learning models require data to be fed in a random order to learn effectively. Caching introduces a risk: if we cache pre-batched data, we might feed the model the exact same sequence every epoch.

To fix this, we cache row groups, not batches. A row group is a smaller unit of data. Since the worker pool has multiple threads fetching these row groups in parallel, the order in which they arrive in the queue naturally varies. We also added a final shuffle step on the constructed batch in memory before sending it to the GPU. This ensures high throughput without compromising the stochastic nature of the training.

The third challenge was thread pool stability and zombie threads. To maximize performance, we switched from a process pool to a thread pool. A Process Pool requires serializing (pickling) data to send it between processes, which adds CPU overhead. A Thread Pool shares memory, avoiding this cost.

Python threads are notoriously difficult to kill if they are blocked on an I/O operation (like a slow HDFS read). We started seeing "Abrupt Teardown Failures" where daemon threads stuck on network calls would prevent the job from exiting cleanly.

To fix this, we implemented robust sentinel logic in our ventilator to gracefully signal threads to stop, rather than relying on the interpreter to kill them. Additionally, we updated our HDFS configurations to enforce stricter timeouts and retry policies. This helped ensure that even if a thread was stuck on a network call, the main training process could still tear down resources without hanging the cluster.

Challenge#2: Improving Reproducibility

The other focus point was to ensure deterministic dataloading by eliminating sources of randomness in the Petastorm dataloader.

In large-scale distributed training, some randomness is expected and often beneficial for model generalization. But as our models grew larger and training scaled, we began to see something concerning. Offline evaluation metrics started showing significant run-to-run variance, even when nothing else had changed. This made it increasingly difficult to answer basic questions with confidence:

  • Is this new architecture actually better?
  • Did performance improve, or did we just get lucky this run?
  • Why do two identical training jobs produce different results?

Where Randomness Hid

To make data loading reproducible, we first needed to understand where randomness was actually coming from.

At first glance, Petastorm already handled randomness carefully. High-level operations—such as shuffling row groups and partitioning data across GPU workers—were explicitly seed-controlled and deterministic. Yet in practice, identical training runs still produced different data orders.

To pinpoint why, we ran a series of controlled experiments using Petastorm, systematically varying the number of workers, shuffle settings, and random seeds. The pattern was unmistakable: most of the randomness didn’t come from shuffling—it came from race conditions in multiple worker threads.

A Closer Look at the Petastorm Data Path 

At a high level, Petastorm processes data as shown in Figure 3.

Diagram illustrating a data processing pipeline for HDFS parquet files, showing partitions being shuffled and sharded, distributed to worker threads, rows shuffled, results queued, batched, and sent to GPUs across two hosts. Key steps are numbered and explained below the diagram, with emphasis on row group consumption and result queuing.

Figure 3: Petastorm architecture with shared ventilator and results queue.


Training data is stored as Parquet files on HDFS, each file containing multiple row groups. Before training starts, Petastorm builds a global list of row-group indices, shuffles it deterministically, and shards it across GPU workers so each worker sees a unique slice of the dataset.

The pipeline looks like this:

  1. Row-group shuffling and sharding
(1)). Deterministic and seed-controlled.
  2. Worker-thread processing. Worker threads consume row groups (2)), apply row-level shuffling (3)), and transform the data.
  3. Result collection and batching. Processed data is collected (4)), batched, and sent to the GPU.

While the shuffling steps were deterministic (1) and 3) ), the queues connecting them were not. Both the ventilator queue (handing out work) and the result queue (collecting outputs) were shared across workers. Which worker picked up which row group—and when results arrived—depended on thread scheduling, I/O timing, and execution speed, not just the random seed.

As we increased the number of workers to improve throughput, these race conditions became more pronounced. Faster training inadvertently meant less reproducible data ordering.

We also uncovered a subtler issue: some parts of the pipeline relied on np.random.RandomState, a legacy API that can behave inconsistently when reseeded across distributed components.

How We Fixed It

Diagram illustrating a data processing pipeline for HDFS parquet files, showing two parallel flows. Each flow starts with partitioned files, followed by shuffling and sharding row group indices, distributing tasks to ventilator queues, then to threads for processing. Threads shuffle rows, place results in results queues, and batches are sent to GPUs for further processing. Round robin assignment is used for both ventilator and results queues.

Figure 4: Petastorm architecture with dedicated ventilator and results queues and round-robin strategy.


With the root causes identified, we implemented two targeted changes:

  • Reliable randomness. We replaced the legacy np.random.RandomState with np.random.default_rng, ensuring all shuffle operations behave consistently under fixed seeds.

  • Deterministic worker scheduling. We redesigned how work is assigned and results are collected:
    • Each worker now has its own dedicated ventilator and results queues.
    • Row groups are assigned in a fixed round-robin order.
    • Results are merged in the same deterministic round-robin order.

This removes all race conditions. Data order is now fully deterministic, independent of which worker finishes first.

Importantly, because each worker has its own queue, this deterministic setup doesn’t affect throughput. Workers continue to run in parallel, keeping throughput high.

Impact and Results

While this work started as a targeted optimization, the results have been significant enough to warrant a broader rollout.

  • Training time: Reduced from ~22 hours to ~3 hours per run.
  • GPU utilization: Increased from 10-15% to 60%+.
  • Reproducibility: Data loading is now deterministic under fixed seeds, even with multiple workers.

Figure 5 shows the execution time before optimization. 

Line graph showing GPU utilization over time, with an initial flat segment at 0% labeled 'Idle time' in large gray text and a black double-headed arrow, followed by fluctuating green line indicating active GPU usage between 10% and 30%.

Figure 5: Execution time: 22 hours, Avg utilization: 12%.


Figure 6 shows the execution time after optimization.

Line graph showing GPU utilization over time, peaking around 65-70% before dropping sharply. An arrow labeled "Training finished" points to the drop, indicating the end of a training process.

Figure 6: Execution time: 3 hours, Avg utilization: 60%+ (5-6× increase).


Two fluctuating lines, one blue and one orange, represent train_ads_cto_loss_step values plotted against step count, with the title 'train_ads_cto_loss_step VS step' at the top. The y-axis ranges from 0.3 to 0.5, and the x-axis ranges from 0 to over 40,000.

Figure 7: Train loss versus step with high variance between runs, Run-to-run MAP shift of ~0.5%.


Line graph titled 'train_ads_cto_loss_step VS step' with two fluctuating lines, one red and one blue, showing loss values between 0.3 and 0.46 across steps from 0 to 10,000.

Figure 8: Train loss versus step showing two highly reproducible runs, Run-to-run MAP shift reduced to ~0.13%.


Note: The reproducibility improvements come from a combination of dataloader determinism and additional training technique improvements, the latter being outside the scope of this blog.

By saturating the GPUs, we achieved a 6× speedup for this model. This goes beyond just saving ‌ compute credits; it fundamentally changes the developer experience. Engineers can now iterate on a model in an afternoon rather than waiting a full day for results.

Most importantly, these improvements aren’t limited to our specific use case. Any model training pipeline facing data I/O bottlenecks—where the producers (data loaders) are slower than the consumers (GPUs)—stands to benefit from this architectural shift. These changes have already been merged into our internal Petastorm codebase and are currently being rolled out to machine learning teams across Uber.

Next Steps 

Our immediate focus is ensuring a smooth rollout across Uber, making this the default behavior for all deep learning models. Furthermore, since Petastorm is open-source and widely used in the industry, we plan to work with the community to help external organizations adopt these optimizations. Efficient and reproducible data loading is a universal challenge, and we’re excited to share these learnings with the broader engineering ecosystem.

Conclusion

At Uber’s scale, efficiency isn’t just about writing cleaner code—it’s about understanding the interaction between hardware, data, and software.

This project highlights the importance of full-stack profiling. Sometimes the biggest gains in AI don’t come from a new model architecture, but from optimizing how you feed data to the one you already have.

Looking back, the solution—caching transformed data to remove I/O bottlenecks and eliminating race conditions—seems almost obvious. But that’s the nature of engineering. The complexity wasn’t in the concept of a cache or round-robin implementation, but in the detective work required to uncover the hidden bottlenecks and sources of randomness masking the real problem. It serves as a reminder that sometimes the most impactful solutions are simple in logic, but rigorous in execution.

Acknowledgments

We’d like to thank the Matching ML and Michelangelo leadership teams—specifically Mas-ud Hussain, Dorna Bandari and  Zhitao Li—for their guidance and support in prioritizing these infrastructure improvements. Their sponsorship was instrumental in turning a specific optimization into a platform-wide improvement.

Cover Photo Attribution: “Highway to the stars” by zeitfaenger.at is licensed under CC BY 2.0.

Apache®, Apache Parquet™, HDFS™, Apache Arrow, and the star logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

NumPy™ is a trademark of the NumFOCUS Foundation. 

Ray is a trademark of Anyscale, Inc. 


Stay up to date with the latest from Uber Engineering—follow us on LinkedIn for our newest blog posts and insights.

Category
Data / MLEngineering
Written by

Kashish Mittal

Staff Software Engineer

Kashish Mittal is a Staff Software Engineer and Tech Lead on the Matching ML team.

Di Yu

Staff Software Engineer

Di Yu is on Uber’s AI Platform team, working on large-scale distributed training infrastructure.

Roozbeh Ketabi

Sr Software Engineer

Ph.D. in applied ML in mobility modeling. Works in mobility matching and dispatch optimization.

Arushi Arora

Software Engineer

Software Engineer on Uber’s AI Platform, optimizing large-scale training systems and performance.

Brendon Lapp

Engineering Manager

Leads platform teams building scalable backend and ML systems for Uber’s Matching stack.

Peng Zhang

Engineering Manager

Leads AI Platform teams building training frameworks and GPU systems for efficient ML models.

Related articles
6 articles