Project Homepage: GitHub
Over the past several years, increasing processing power of computing machines has led to an increase in machine learning advances. More and more, algorithms exploit parallelism and rely on distributed training to process an enormous amount of data. However, the resulting need to increase both data and training impose great challenges on the software that manages and utilizes large-scale computational resources.
At Uber, we’ve developed algorithms such as POET, Go-Explore, and GTN that leverage a large amount of computation to train models on neural networks. To enable future generations of large-scale computation for algorithms like these, we developed Fiber, a new distributed computing library that helps users scale local computation methods to hundreds or even thousands of machines with ease. Fiber makes it fast, easy, and resource-efficient to power large-scale computation projects using Python, simplifying the ML model training process and leading to more optimal results.
The challenge of large-scale distributed computation
In an ideal world, scaling an application that runs on one machine to an application that runs on a cluster of machines should be as easy as changing a command-line argument. However, this isn’t such an easy task in the real world.
While working with many people who run large scale distributed computing jobs on a daily basis, we found that there are several reasons why it is so hard to harness distributed computing nowadays:
- There is a huge gap between making code work locally on laptops or desktops and running code on a production cluster. You can make MPI work locally but it’s a completely different process to run it on a computer cluster.
- Dynamic scaling isn’t available. If you launch a job that requires a large amount of resources, you’ll most likely need to wait until everything is allocated before you can run your job. This waiting period makes scaling up less efficient.
- Error handling is missing. While running, some jobs may fail. This could force you to recover part of the result or discard the whole run.
- There is a high learning cost. Each system has different APIs and conventions for programming. To launch jobs with a new system, a user has to learn a set of completely new conventions.
The new Fiber platform addresses each of these issues specifically. In doing so, it makes seamless large-scale distributed computing a possibility for a much wider population of users.
Fiber is a Python-based distributed computing library for modern computer clusters. Instead of programming only a single desktop or laptop, users can leverage this system to program the whole computer cluster. It was originally developed to power large-scale parallel scientific computation projects like POET, and Uber has used it to power similar projects. Fiber is so powerful because it:
- Is easy to use. Fiber allows users to write programs that run on a computer cluster without needing to dive into the details of the computer cluster.
- Is easy to learn. Fiber provides the same API as Python’s standard multiprocessing library. Engineers who know how to use multiprocessing can easily program a computer cluster with Fiber.
- Is fast and dependable. Fiber’s communication backbone is built on top of Nanomsg, a high-performance asynchronous messaging library that allows swift and reliable communication.
- Doesn’t require deployment. Fiber runs the same way as a normal application on a computer cluster. It handles resource allocation and communication for users automatically.
- Provides reliable computation. Fiber’s built-in error handling allows users to focus on writing actual application code instead of dealing with crashes. This is especially valuable when running a pool of workers.
In addition to these benefits, Fiber can be used in conjunction with other specialized frameworks in areas where performance is critical. For example, for Stochastic Gradient Descent (SGD), Fiber’s Ring feature can help set up distributed training jobs on computer clusters, allowing it to operate in conjunction with Horovod or torch.distributed.
Fiber helps users who are working on large-scale distributed computing reduce the time to go from having an idea to actually running distributed jobs on computation clusters. It can also shield users from dealing with the tedious details of configuration and resource allocation tasks, as well as enabling faster debug cycles and simplifying the transition from local to cluster development.
Fiber bridges the classical multiprocessing API with a flexible selection of back ends that can run on different cluster management systems. To achieve this integration, Fiber is split into three different layers: the API layer, back-end layer, and cluster layer. The API layer provides basic building blocks for Fiber like processes, queues, pools, and managers. These have the same semantics as in multiprocessing, but we’ve extended them to work in distributed environments. The back-end layer handles tasks like creating or terminating jobs on different cluster managers. When a user adds a new back end, all the other Fiber components (queues, pools, etc.) do not need to be changed. Finally, the cluster layer consists of different cluster managers. Although they are not part of Fiber itself, they help Fiber manage resources and keep track of different jobs, thereby reducing the number of items that Fiber needs to track. This overall architecture is summarized in Figure 2, below:
Fiber introduces a new concept called job-backed processes (also called a Fiber processes). These are similar to the processes in Python’s multiprocessing library, but more flexible: while a process in multiprocessing only runs on a local machine, a Fiber process can run remotely on a different machine or locally on the same machine. When starting a new Fiber process, Fiber creates a new job with the proper Fiber back end on the current computer cluster.
Fiber uses containers to encapsulate the running environment of current processes, as shown in Figure 3, above. All required files, input data, and other dependent program packages are included, ensuring every element is self-contained. All the child processes are started with the same container image as the parent process to guarantee a consistent running environment. Because each process is a cluster job, its life cycle is the same as any job on the cluster. To make it easy for users, Fiber is designed to directly interact with computer cluster managers. Because of this, Fiber doesn’t need to be set up on multiple machines or bootstrapped by any other mechanisms, unlike Apache Spark or ipyparallel. It only needs to be installed on a single machine as a normal Python pip package.
Fiber implements most multiprocessing APIs on top of Fiber processes, including pipes, queues, pools, and managers.
Queues and pipes in Fiber behave the same way as in multiprocessing. The difference is that queues and pipes in Fiber are shared by multiple processes running on different machines. Two processes can read from and write to the same pipe. Furthermore, queues can be shared among many processes on different machines, and each process can send to or receive from the same queue at the same time. Fiber’s queue is implemented with Nanomsg, a high-performance asynchronous message queue system.
Pools are also supported by Fiber, as shown in Figure 5, below. These allow the user to manage a pool of worker processes. Fiber extends pools with job-backed processes so that it can manage thousands of (remote) workers per pool. Users can also create multiple pools at the same time.
Managers and proxy objects enable Fiber to support shared storage, which is critical in distributed systems. Usually, computer cluster external storage systems such as Cassandra and Redis handle this function. Fiber instead provides built-in in-memory storage for applications to use. The interface is the same as that for manager types in multiprocessing systems.
Rings are an extension to the multiprocessing API that can be helpful in distributed computing settings. A Ring in Fiber refers to a group of processes that work collectively together as relative equals. Unlike a Pool, a Ring does not have the concept of a primary process and worker processes. All the members inside the Ring share roughly the same level of responsibility. Fiber’s Ring models a topology (shown in Figure 6, below) that is very common in machine learning when doing distributed SGD. Examples include torch.distributed and Horovod. Generally, it is very challenging to start this kind of workload on a computer cluster; Fiber provides this Ring feature to help set up such a topology.
With each of the flexible components described above, we can now build applications with Fiber. Here, we show two ways Fiber can help users build their distributed applications.
Powering new applications
In the example below, we demonstrate engineers can apply Fiber to enable large-scale distributed computation. This example is a demo of a reinforcement learning (RL) algorithm. The communication pattern for distributed RL usually involves sending different types of data between machines, including actions, neural network parameters, gradients, per-step/episode observations, and rewards.
Fiber implements pipes and Pools to transmit this data. Under the hood, Pools are normal Unix sockets, providing near line-speed communication for the applications using Fiber. Modern computer networking usually has bandwidth as high as hundreds of gigabits per second. Transmitting smaller amounts of data over a network is generally fast.
Additionally, the inter-process communication latency does not increase much if there are many different processes sending data to one process because data transfer can happen in parallel. This fact makes Fiber’s Pools suitable for providing the foundation of many RL algorithms because simulators can run in each Pool worker process and the results can be transmitted back in parallel.
The sample below shows simplified RL code implemented with Fiber:
# fiber.BaseManager is a manager that runs remotely
# gym env
# create a new policy model
def update_model(model, observations):
# update model with observed data
model = build_model()
manager = RemoteEnvManager()
num_envs = 10
envs = [manager.Env() for i in range(num_envs)]
handles = [envs[i].reset() for i in num_envs]
obs = [handle.get() for handle in handles]
for i in range(1000):
actions = model(obs)
handles = [env.step() for action in actions]
obs = [handle.get() for handle in handles]
model = update_model(model, obs)
Enabling existing multiprocessing applications
Many Python users leverage multiprocessing. Fiber opens up broad opportunities for such applications, which can with this system run in a distributed setup on a computer cluster, similar to Kubernetes, by changing only a few lines of code.
As an example, OpenAI Baselines is a very popular library for people doing RL and it has many reference algorithms, such as DQN and PPO. Its downside is that it only works on a single machine. If you want to train PPO on a large scale, you have to create your own MPI-based system and manually set up the cluster.
In contrast, with Fiber, things are much easier. It can seamlessly expand RL algorithms like PPO to leverage hundreds of distributed environment workers. Fiber provides the same API as multiprocessing, which is what OpenAI Baselines uses to harvest multicore CPU processing power locally. The change needed to make OpenAI Baselines work with Fiber is just one line:
With this code change, OpenAI Baselines can run on Kubernetes. We have provided a full guide for running OpenAI Baselines on Kubernetes here.
Fiber implements Pool-based error handling. When a new Pool is created, an associated task queue, result queue, and pending table are also created. Users can then add newly created tasks to the task queue, which is shared between the master process and worker processes. Each of the workers fetches a single task from the task queue and then runs task functions within that task. Each time a user removes a task from the task queue, Fiber adds an entry in the pending table. Once the worker finishes that task, it puts its results in the result queue. Fiber then removes the entry associated with that task from the pending table.
Figure 7: On the left is a normal Fiber Pool with four workers. On the right, Worker 3 fails and Fiber consequently starts a new worker process (Worker 5), which is then ready to be added to the pool.
If a Pool worker process fails in the middle of processing, as shown in Figure 7, above, that failure is detected by the parent Pool that serves as the process manager of all the worker processes. Then the parent Pool puts the pending task from the pending table back into the task queue if the previously failed process has a pending task. Next, it starts a new worker process to replace the previously failed process and binds the newly-created worker process to the task queue and the result queue.
One of the most important applications of Fiber is scaling the computation of algorithms like RL and population-based methods like ES. In these applications, latency is critical. RL and population-based methods are typically applied in a setup that requires frequent interaction with simulators (such as ALE, Gym, and Mujoco) to evaluate policies and collect experiences. The latency introduced by waiting for results from the simulators critically impacts the overall training performance.
In order to test Fiber, we evaluated its performance in comparison to other frameworks. We also added Ray in our framework overhead test to provide some preliminary results, and expect to add more detailed results in the future.
There are generally two ways to reduce the latency from RL algorithms and population-based methods. Either we can reduce the amount of data that needs to be transferred or we can make the communication channel between different processes faster. In order to speed up processing communication, Fiber implements pipes and Pools with Nanomsg. In addition, users can even further enhance their performance with libraries like speedus.
Components within a framework typically impact computing resources, so we tested Fiber’s overhead. We compared Fiber, the Python multiprocessing library, Apache Spark, Ray, and ipyparallel. For our testing procedure, we created a batch of workloads that took a fixed amount of time in total to finish. The duration of each single task ranged from one second to one millisecond.
We ran five workers for each framework locally and adjusted the batch size to make sure the total finish time for each framework was roughly one second (i.e., for one millisecond duration, we ran 5,000 tasks). Our hypothesis was that Fiber should perform similarly to multiprocessing because neither Fiber nor multiprocessing rely on complex scheduling mechanisms. In contrast, we thought Apache Spark, Ray, and ipyparallel would be slower than Fiber because they rely on schedulers in the middle.
Fiber showed almost no difference when task durations were 100 milliseconds or greater, and was much closer to the multiprocessing library than the other frameworks as the task duration dropped to ten milliseconds or one millisecond.
We used multiprocessing as a reference because it is very lightweight and does not implement any additional features beyond creating new processes and running tasks in parallel. Additionally, it exploits communication mechanisms only available locally (e.g., shared memory, Unix domain sockets, etc.). This makes it difficult for other frameworks that support distributed resource management systems to surpass multiprocessing, since these systems cannot exploit similar mechanisms.
Compared to Fiber, ipyparallel and Apache Spark fell well behind at each task duration. When the task duration was one millisecond, ipyparallel took almost 24 times longer than Fiber, and Apache Spark took 38 times longer. This result highlighted that both ipyparallel and Apache Spark introduce considerable overhead when the task duration is short, and are not as suitable as Fiber for RL and population-based methods, which use a simulator and have a response time of a couple of milliseconds. We also showed that Ray takes about 2.5 times longer than Fiber when running one millisecond tasks.
Distributed task test
To probe the scalability and efficiency of Fiber, we compared it with ipyparallel, leaving out Apache Spark due to the results of our earlier performance testing. We also left out the Python multiprocessing library because it does not scale beyond a single machine. We tested the scalability and efficiency of both Fiber and ipyparallel based on the time it took to run 50 iterations of evolution strategies (ES).
With the same workload, we expected Fiber to finish faster because it has much less overhead than ipyparallel, as shown in the previous test. For both Fiber and ipyparallel, we used a population size of 2,048, so that the total computation was fixed regardless of the number of workers. We also implemented the same shared noise table trick in both, where every eight workers share one noise table. The experimental domain in this work is a modified version of the Bipedal Walker Hardcore environment of the OpenAI Gym with modifications described here.
The main result was that Fiber scaled much better than ipyparallel and finished each test significantly faster. The length of time it took for Fiber to run gradually decreased with the increase of the number of workers from 32 to 1,024. In contrast, the time for ipyparallel to finish increased from 256 to 512 workers. ipyparallel does not finish the run with 1,024 workers due to communication errors between its processes. This failure undermined ipyparallel’s ability to run large-scale parallel computation. Due to Amdahl’s law, we saw diminishing returns for Fiber when the number of workers increased past 512. In that case, the speed with which the master process can process data beame the bottleneck.
Overall, Fiber’s performance exceeded ipyparallel for all numbers of workers tested. Additionally, unlike ipyparallel, Fiber also finished the run with 1,024 workers. This result highlighted Fiber’s better scalability compared to ipyparallel.
Fiber is a new Python distributed library that is now open-sourced. We designed it to enable users to implement large-scale computation easily on a computer cluster. The experiments here highlight that Fiber achieved many of our goals, including efficiently leveraging a large amount of heterogeneous computing hardware, dynamically scaling algorithms to improve resource usage efficiency, and reducing the engineering burden required to make complex algorithms work on computer clusters.
We hope that Fiber will further enable progress in solving difficult engineering problems by making it easier to develop methods and run them at the scale necessary to truly see them shine. For more details, please see the Fiber GitHub repository.