
Introduction
In 2017, we introduced Horovod, an open source framework for scaling deep learning training across hundreds of GPUs in parallel. At the time, most of the deep learning use cases at Uber were related to the research and development of self-driving vehicles, while in Michelangelo, the vast majority of production machine learning models were tree models based on XGBoost.
Now in 2021, the state of deep learning is changing. Increasingly, deep learning models are outperforming tree-based models for tabular data problems, and more deep learning models are moving from research to production. As a result, we’ve had to rethink our existing deep learning platform to accommodate for increasing demand and new requirements:
- Autoscaling and Fault Tolerance
- Hyperparameter Search
- Unified Compute Infrastructure
Autoscaling and Fault Tolerance
Our previous deep learning platform allocated a fixed-size Horovod cluster for training an individual model. When running on-premise, users would frequently find themselves unable to start large training jobs due to lack of resources. We provide the option to train in cloud data centers, but lack of autoscaling means running on dedicated instances, which are often 3-5x the cost of running on preemptible or “spot” instances.
Even when capacity is not a primary concern, fault tolerance often is. As jobs train for more epochs with more machines, the chance of failure increases. Frequent checkpointing can help mitigate this, but requires a lot of additional bespoke platform tooling to support.
Elastic Horovod
In Horovod v0.20, we introduced Elastic Horovod to allow distributed training that scales the number of workers dynamically throughout the training process. With a few lines of code added to an existing Horovod training script, jobs can now continue training with minimal interruption when machines come and go from the job.
From an API level of abstraction, Elastic Horovod solves the problem of autoscaling the training process, but does not operationalize it. We wrote Elastic Horovod to be generic of any particular orchestration system or cloud provider, which meant leaving the following components as interfaces to be implemented:
- Discovery of hosts that can be added to (or should be removed from) the training job
- Resource requests for additional workers as they become available and the job can make use of them
Our original assumption was that we would need to implement each of these interfaces within separate components for each major cloud provider (e.g., AWS, Azure, GCP) or orchestration system (e.g., Kubernetes, Peloton) we wanted to support for our internal and open source users.
As it turned out, there was already an open source solution to this problem of multi-cloud distributed computing: Ray.
Elastic Horovod on Ray
Ray is a distributed execution engine for parallel and distributed programming. Developed at UC Berkeley, Ray was initially built to scale out machine learning workloads and experiments with a simple class/function-based Python API.
Since its inception, the Ray ecosystem has grown to include a variety of features and tools useful for training ML models on the cloud, including Ray Tune for distributed hyperparameter tuning, the Ray Cluster Launcher for cluster provisioning, and load-based autoscaling. Now, Ray also integrates with a variety of machine learning libraries, such as RLLib, XGBoost and PyTorch.
With the new ElasticRayExecutor API, Horovod is able to leverage Ray to simplify the discovery and orchestration of the underlying hosts. To leverage Ray with Horovod for elastic training, you first need to start a Ray runtime consisting of multiple nodes (a Ray cluster)
Ray Runtime/Clusters: Ray programs are able to parallelize and distribute by leveraging an underlying Ray runtime. The Ray runtime can be started on one or multiple nodes, forming a Ray cluster.
Ray is bundled with a lightweight cluster launcher that simplifies the provision of a cluster on any cloud (AWS, Azure, GCP, or even cluster managers like Kubernetes and YARN.). The cluster launcher provisions clusters according to a given cluster configuration, like the example shown below:
To start a Ray cluster on AWS, you can simply save the above configuration as `cfg.yaml` and then call `ray up cfg.yaml`. In the above example, note that the head node is a CPU-only node, while the workers are GPU preemptible instances.
Further, Ray clusters can be configured to be “autoscaling”, meaning that Ray can transparently provision new nodes under different workload requirements. For example, if a Ray program needs a GPU in the middle of the application, Ray can provision a cheap GPU instance, run the Ray task or actor on the instance, then terminate the instance when finished.
You can check out the Ray documentation for an example of creating an autoscaling Ray program on AWS/GCP.
ElasticRayExecutor API: ElasticRayExecutor can leverage an autoscaling Ray cluster to simplify the discovery of hosts and request more resources when appropriate. To use this API, define a training function that includes a higher level function that uses Horovod Elastic state synchronization decorator:
You can then attach to the underlying Ray cluster and execute the training function:
In this example, the ElasticRayExecutor will create multiple GPU workers, each participating in a data-parallel training routine. The number of workers will be automatically determined by the number of GPU worker nodes in the cluster — meaning that training will not be stopped even when nodes are removed arbitrarily via pre-emption.
As you can see, the integration of Ray with Horovod results in a solution that simplifies the operation of running elastic training jobs with Horovod across major cloud providers.
Benchmarks and Experiments
One of the biggest unknowns for us when considering adopting elastic training was convergence. Specifically, we were unsure if there was a limit to how often and to what extent the job could be resized during training, Would we need to make any adjustments during training to smooth out the effects of scaling up and down the number of workers at runtime?
To measure the effects of adjusting the number of workers dynamically on model convergence, we ran three jobs training a ResNet50 model on the Cifar10 dataset using 8x v100 GPUs in AWS (p3.2xlarge instances) over a fixed number of 90 epochs. We adjusted the frequency with which the number of workers would be adjusted up and down, and how many workers would be added / removed at a time, across 3 experiment configurations:
- Fixed number of 8 GPU workers with no autoscaling (8gpu_fixed).
- Initial 8 GPU workers scaled up or down by 1 every 60 seconds (8gpu_60_1).
- Initial 8 GPU workers scaled up or down by 3 every 180 seconds (8gpu_180_3).
For every resize event, if the number of workers is already at the max (8) we always scale down, and if the number of workers is at the min (2) we always scale up. Otherwise, we randomly choose to increase or decrease the number of workers with equal probability.

As shown in the results above, we observed that as we increased the magnitude of the resize events (measured by number of hosts added / removed at once) and decreased the frequency proportionately, the overall variance in the model performance between epochs increased, and actually improved overall model generalization against the baseline.
An additional benefit of elastic training is that it can even be used to reduce the overall variance in the training procedure when the timing of resize events can be controlled by the training process. As indicated by Smith et al. in Don’t Decay the Learning Rate, Increase the Batch Size, we can exploit the fact that increasing the batch size results in a simulated annealing effect that moves the model training from an exploration phase during the earlier epochs into an exploitation phase towards the end.
In practice, this procedure of periodically scaling up the batch size by increasing the number of workers allows the model to train to convergence in much less wall clock time, often with higher generalization accuracy, than keeping with the initial number of workers for the duration of training.
To illustrate this effect, we ran an additional set of experiments: repeating the training procedure above but with two new configurations:
- Fixed number of 2 GPU workers (2gpu_fixed)
- Dynamic number of workers that doubles every 30 epochs, starting at 2 and ending with 8 workers (8gpu_exp).

As expected, decreasing the number of workers from 8 to 2 improved the overall model convergence. This reveals one of the common pitfalls of data parallel distributed training when the model has been tuned for a smaller batch size. In practice, it’s recommended to use techniques like learning rate warmup / decay, Hyperparameter Search (see below), and Adasum to offset these effects.
Another solution to achieve good convergence with high amounts of parallelism is illustrated by the third experiment shown above: scaling the parallelism up over time. Not only did this approach complete in less total wall clock time than the 2 GPU baseline, it did so with higher validation accuracy!
The intuition behind this, as explained in the aforementioned paper by Smith et al, is that the training process benefits from moving from an “exploration” phase in the beginning to an “exploitation” phase towards the end. Increasing the parallelism has the effect of increasing the batch size, which helps smooth out the variance between the training examples, leading to less exploration in the later stages of training.
Adding this capability to Elastic Horovod on Ray can be done using Horovod on Ray’s Callback API:


