Skip to main content

Stay up to date with the latest from Uber Engineering

Follow us on LinkedInFollow us on LinkedIn

Stay up to date with the latest from Uber Engineering

Follow us on LinkedInFollow us on LinkedIn
Engineering, Data / ML

Building Uber’s Data Lake: Batch Data Replication Using HiveSync

September 4 / Global
Featured image for Building Uber’s Data Lake: Batch Data Replication Using HiveSync

Introduction

Uber’s massive 350 PB dual-region data lake, among the largest globally, is meticulously designed for high availability through an active/standby configuration. At Uber, HiveSync plays a critical role in this architecture by ensuring cross-region data consistency, a cornerstone of Uber’s disaster recovery strategy. It diligently manages data replication and synchronization, providing robust protection against potential hardware failures and regional disruptions. This blog post delves into the technical intricacies of HiveSync and its pivotal contribution to safeguarding Uber’s multi-region data ecosystem.

A Tale of Two Regions 

Uber’s batch data infrastructure historically ran across two data center regions (primary and secondary) to ensure redundancy. However, the secondary region sat idle—incurring costs equal to the primary—just to maintain high availability. To fix this inefficiency, Uber launched the SRC (Single Region Compute) program, consolidating batch compute operations into a single primary region while still guaranteeing data reliability for disasters. Eliminating batch compute in the secondary region significantly reduced Uber’s hardware footprint.

Under SRC, all batch processing runs in the primary region, and HiveSync replicates the data to the secondary region. ETL jobs write data exclusively to the primary data lake (DC1), so every byte must be copied to the secondary data center (DC2) for robust redundancy and disaster recovery. This also improves data locality for analytics in DC2, enabling faster cross-region access. We developed HiveSync to meet these needs, ensuring synchronized, consistent data availability across distributed regions.

Image
Figure 1: Bi-regional architecture of the Uber data lake.

Figure 1 shows that ETL (Extract-Transform-Load) processes run continuously in data center 1 (DC1), while HiveSync replicates data from DC1 to data center 2 (DC2) to ensure data proximity, allowing reader pipelines to operate in both DC1 and DC2.

Understanding HiveSync 

Uber’s HiveSync service is a bi-directional, permissions-aware data replication system that enables active-active synchronization across regions. It was initially built on the open-source Airbnb® ReAir project, but Uber extensively redesigned HiveSync for seamless replication, atomic copying, and real-time consistency monitoring to meet the company’s unique requirements.

Launched in 2016, HiveSync has scaled to handle Uber’s entire data lake, supporting both bulk and incremental replication on Apache HDFS. The core architecture has remained stable even as the data lake grew hundredfold, making HiveSync a robust solution for Uber’s ever-evolving big data infrastructure.

Image
     Figure 2: HiveSync architecture: control plane and data plane separation. 

Figure 2 shows HiveSync’s Control Plane and Data Plane. The Control Plane handles task orchestration, scheduling, and resilient failure recovery (like server memory pressure, network slowdowns, or Oracle MySQL®/ Apache Hive Metastore – HMS outages) via horizontal scaling and exponential-backoff retries. The Data Plane performs the actual data transfer and copy operations.

Architecture Overview

HiveSync comprises two primary components—the HiveSync Replication Service and the Data Reparo Service—and also exposes an HTTP interface widely used by platform teams.

HiveSync Replication Service 

Image
                             Figure 3: HiveSync replication service.

HiveSync operates as an event-driven system. Here’s how its data processing works:

  1. Event trigger: Any DDL/DML operation on the source Hive cluster, which modifies HDFS data, triggers an event in the HMS.
  2. Event logging: A custom hook (Hive Metastore Event Listener) listens to this HMS event and logs it in a MySQL database as an audit entry.
  3. Job creation: HiveSync’s back-end service, also known as the Replication Server, periodically polls the audit log database. It then converts these events into replication jobs.
  4. Job execution: Each job is processed by an async executor, serving as an FSM (Finite State Machine) and managing the job’s lifecycle asynchronously. Replication job states are stored in the MySQL database for persistence.
  5. Data copying: Hive metadata and HDFS data are synchronized across clusters.
    • RPC (Remote Procedure Calls): For smaller jobs, HiveSync uses direct Apache Hadoop® filesystem calls from the server (avoiding YARN overhead). This method handles ~90% of jobs, especially those with many small files.
    • Hadoop distributed copy: For larger jobs, HiveSync submits a distributed copy (MapReduce) job to the Hadoop YARN cluster (progress is monitored via YARN)
  6. Job completion: Once data replication is done (either successful or failed), the FSM completes the remaining job states, marking it as completed.
  7. Data access: With replication completed, users can query the latest data on the destination Hive cluster.

Is Copying Enough? 

Data replication alone isn’t enough to guarantee consistency across regions. For example, if someone manually deletes a partition’s files directly in HDFS, the HMS might still think the partition exists (since no Hive event was logged). Other such scenarios can occur when users directly write to HDFS paths, bypassing HMS-triggering mechanisms, leaving HiveSync unaware of new data. With HDFS quota limitations, insufficient quota in the destination region (enforced by Uber at the database level) can also cause replication failures.

Uber’s Data Reparo service scans DC1 and DC2 for anomalies (missing or extra partitions) and fixes any mismatches to ensure cross-region consistency, targeting over 99.99% accuracy.    

Image
          Figure 4: Data Reparo analyzes and resolves inconsistencies across data centers (DC1) and (DC2).

Copying with Strict SLAs

Copying data from DC1 to DC2 isn’t a leisurely task—it must meet a strict SLA. DC2 isn’t just a backup—it supports active analytics, so its data is as critical as DC1. HiveSync needs to replicate data to DC2 almost immediately after it lands in DC1 to keep pipelines in the secondary region in sync. 

HiveSync currently upholds a four-hour SLA for replication, with the 99th percentile lag around 20 minutes (latency varies with event rates and data sizes). To achieve this, the HiveSync service maintains about 99.99% availability, in line with the uptime of the underlying data infrastructure. 

Replication Service

Now we’ll examine the core components of the HiveSync replication service in more detail. HiveSync monitors Hive for changes and manages replication jobs reliably.

Hive Metastore Event Listener

HiveSync uses a Hive Metastore Event Listener (a Hive hook) to monitor metadata changes on Hive tables and partitions in real time. Whenever a table or partition is created, altered, or deleted, the listener serializes the full metadata of the affected object into an audit log.

These serialized logs are stored in a MySQL table. These audit logs are written asynchronously to avoid slowing down Hive operations. In rare cases, an event might not get logged due to this asynchronicity—the Data Reparo service later detects such missed events.

Image
         Figure 5: Creation of audit logs for replication via Hive Metastore Event Listener. 

The Hive Metastore listener writes a detailed audit log entry to MySQL for each Hive DDL/DML event. HiveSync’s replication server uses a dedicated Audit Log Reader thread to continually fetch new audit log entries and record the last processed ID. If the service restarts and the last ID wasn’t saved, it may re-read some entries (ensuring at-least-once delivery). Each audit log entry is then transformed into a replication job for further processing.


Job

A job is represented as a finite-state machine (FSM), with its state stored in MySQL to ensure it can resume reliably after a failure or service restart.

Image
    Figure 6: FSM for a Copy Partition job.

For example, copying a Hive table partition from source to destination involves a Copy Partition job that passes through four states: begin, copy data (replicate the HDFS files), copy metadata (replicate the Hive metastore information), and end. These states execute sequentially as the job progresses.

Replication Server

Image
Figure 7: Replication server workflow.

The Replication server polls the audit log table for new events and creates replication jobs accordingly. An executor pool schedules jobs and ensures that jobs for the same table run in the correct order. HiveSync uses a directed acyclic graph (DAG) manager to manage job dependencies and locking so that no two jobs conflict on the same table or partition.

Each replication job is executed by worker threads following the FSM. When a job reaches the data copy step, it’s dispatched to a specialized executor pool based on the data volume:

  • RPC Executor: A pool of 100 threads handles small-to-medium file transfers directly. If a job can’t be completed immediately due to resource limits, it goes into a retry queue with a delayed retry time. 
  • DistCp Executor: Larger data jobs are offloaded to Hadoop via distributed copy (DistCp) on YARN. If the YARN queue is full, the job is retried with exponential backoff. Too many failed attempts will mark the job as failed, and a circuit breaker will pause new submissions if failures spike. An async thread monitors YARN jobs and updates each job’s status when it finishes. 

After the data copy step (whether via RPC or DistCp), HiveSync verifies that the source and destination data are identical (by comparing checksums and permissions). The job is then marked SUCCESSFUL (or FAILED on error), and its status is recorded in MySQL. Because each job’s state is saved after every stage, if the server crashes mid-job, the job can resume from the last saved state without data loss or duplication.

Ensure Correct Ordering of Audit Log Events 

HiveSync must apply Hive events in the exact order they occurred to keep data consistent. If events are applied out of order (like renaming a partition before adding it), replication will fail because the expected state isn’t present. HMS’s asynchronous logging can occasionally produce out-of-order events, so HiveSync’s design uses careful synchronization (locks and sharding) to enforce the correct sequence of events.

Local Synchronization via Locks

Within each HiveSync server, an exclusive/shared locking scheme preserves consistency for jobs on the same table or partition. 

A job must acquire all its required locks before it runs. Exclusive locks exclude all other jobs on that resource, whereas shared locks are granted only if no exclusive lock is held or pending on the resource. HiveSync’s DAG Manager coordinates lock acquisition to avoid deadlocks, ensuring that a waiting job starts only after the jobs holding its needed locks have finished.

The DAG Manager is central to our system, managing lock assignments by tracking job dependencies. It grants locks, but if a job cannot immediately acquire them, it identifies parent jobs (those holding the necessary locks) and waits for their release before proceeding.

The DAG Manager tracks:

  1. Jobs holding shared locks: IDs of jobs currently sharing a lock.
  2. Jobs holding exclusive locks: ID of the job with an exclusive lock.
  3. Jobs ready for processing: IDs of jobs with acquired locks, ready for the FSM flow queue.
  4. Jobs waiting for locks: IDs of jobs awaiting lock release.

This tracking ensures smooth, efficient, in-order job execution, preventing deadlocks and conflicts.

Image
         Figure 8: Directed acyclic graph of replication jobs.

Figure 8 shows the DAG manager processing four jobs sequentially. All jobs operate on table (T) and partition (P) with specific lock needs. Job 1 runs (green) with shared access on T and exclusive access on P. Jobs 2, 3, and 4 wait (red) due to lock dependencies. Job 2 needs shared access on T and P, waiting for Job 1 to release P. Job 3 needs exclusive P and shared T, blocked by Job 2 for P. Job 4 waits for Job 3’s exclusive lock on P. The DAG manager atomically assigns all required locks or none, ensuring orderly job processing.

Global Synchronization via Sharding

ReAir, designed initially for single machines, can’t scale horizontally due to local synchronization limitations. Local synchronization’s reliance on in-memory lock states prevents job distribution across multiple service instances. Even with external lock state storage, distributed locks would be needed to prevent conflicts for related audit log events, complicating management. To overcome this, we adopted sharding for distributed synchronization and order maintenance.

The first level involves static sharding. Uber categorizes datasets into five tiers, with tier 1 being the highest priority. We initially used static sharding with five servers, each handling specific tiers by filtering audit logs. A default shard managed unassigned tables, automatically routing new ones.

Image
    Figure 9: Static sharding across multiple servers.

Figure 9 shows this with three shards. Although all shards received audit logs, irrelevant ones were filtered. This setup, however, was operationally complex for new shards or datasets and didn’t address data skew. To enhance scalability, we moved to dynamic sharding to horizontally scale individual shards during high load.

The second level involves dynamic sharding which allows servers to host multiple instances, each with a dataset assigned an instance ID via a hash-mod-n approach using a partition key. This distributes datasets, ensuring jobs are processed by their assigned instance.

Here’s the hash-mod-n approach calculation:

Image

This approach aligns with our earlier limitation: while jobs for the same table can’t be processed by multiple instances, jobs for different tables can.

Image
                                     Figure 10: Dynamic sharding for a single server.

Figure 10 shows 3 instances on the shard 1 server retrieving replication jobs from the JOBS 1 MySQL table via getJobsForInstanceId using instance ID. 

Static and dynamic sharding together enable HiveSync’s horizontal scalability.

Usage at Uber 

HiveSync is essential for managing large-scale data replication at Uber, and its scope has grown dramatically since its early days. Initially, HiveSync handled only about 1% of the data lake’s size. By 2023 it was replicating the entire data lake. This represents roughly a hundredfold increase in scale, alongside new features added over time.

Expanding Scale of Uber’s Data Lake

Today, HiveSync operates at a massive scale: 

  • Tables: 800,000 Hive tables (approximately 300 PB of data in total) 
  • Partitions: Ranging from a few hundred up to 1 million partitions per table
  • Table sizes: Varying from a few GB to tens of PB each 
  • Daily operations: Over 5 million Hive DDL/DML events per day (about 8 PB of data replicated daily)

Our goal is to onboard all on-prem datasets into HiveSync to enable automatic, user-transparent replication of new production tables across clusters.

Key Role in Single-Region Transition

In 2022, Uber shifted its batch data platform from active-active (two regions running jobs) to active-passive (one primary region for compute). The primary data center now handles almost all batch processing, and HiveSync continuously replicates data to the passive secondary region. This change doubled HiveSync’s workload and made the secondary copy more critical. With the primary now a single point of failure, a fully synced secondary copy ensures that if the primary fails, Uber’s data workflows can quickly fail over to the secondary region.

Support for One-Time Replication Service (OTRS)

Onboarding a large existing dataset (many terabytes of historical data) via incremental updates alone could leave the secondary far behind initially, breaching SLAs. To prevent this, HiveSync offers One-Time Replication support to bootstrap large datasets. OTRS performs a one-off bulk copy of all historical data from the source to the destination before incremental replication begins. 

OTRS bypasses the normal event stream by directly enqueuing replication jobs for the entire dataset. The replication server executes these jobs to copy everything in one pass. After this backfill, incremental replication can start with both regions already in sync, so the secondary won’t lag and freshness SLAs are maintained.

Next Steps 

In this blog, we introduced the HiveSync service at Uber, covering its architecture and core functions. We plan to open-source this replication service and will continue to develop new features to meet the increasing demands for scalability and lower latency. As Uber migrates its batch data analytics and ML training systems to Google Cloud Platform, HiveSync plays a critical role in replicating data seamlessly across both on-premise and cloud regions. We’ll cover this in detail in a separate blog.

Cover Photo Attribution: “2010-05-11 Centrale LAN Network Center” by orcmid is licensed under CC BY 2.0.

Apache®, Apache Hadoop®, Apache Hive, Apache Hudi, Apache Spark, Apache Hadoop YARN, and Apache Hadoop HDFS are registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Google Cloud Platformis a trademark of Google LLC and this blog post is not endorsed by or affiliated with Google in any way.

Java, MySQL, and NetSuite are registered trademarks of Oracle® and/or its affiliates. Airbnb® is a trademark of Airbnb, Inc. and this blog post is not endorsed by or affiliated with Airbnb in any way.

Stay up to date with the latest from Uber Engineering—follow us on LinkedIn for our newest blog posts and insights.

Radhika Patwari

Radhika Patwari

Radhika Patwari is a Software Engineer II on the HiveSync team at Uber. She has worked on enhancing DistCp performance and is currently focused on developing disaster recovery solutions for SRC (Single Region Compute) failover drills.

Trivedhi Talakola

Trivedhi Talakola

Trivedhi Talakola is a Software Engineer on the HiveSync team at Uber. He has worked on bootstrapping hudi datasets to secondary regions and performance issues in their incremental replication.

Rajan Jaiswal

Rajan Jaiswal

Rajan Jaiswal is a Software Engineer I on the HiveSync team at Uber. He has contributed to the cloud migration efforts for Uber’s batch data lake, authored the chain replication and topology analyzer, and is one of the lead contributors to the system’s reliability.

Chayanika Bhandary

Chayanika Bhandary

Chayanika Bhandary is a former Senior Software Engineer on the HiveSync team at Uber. She has contributed to improving the replication SLA of datasets and efficient fixing of any inconsistencies that might have crept in.

Mukesh Verma

Mukesh Verma

Mukesh Verma is a Staff Software Engineer on the HiveSync team at Uber. He re-architected it for horizontal scalability, kickstarted cloud replication, and led key infrastructure upgrades including DistCp 3.x migration.

Sanjay Sundaresan

Sanjay Sundaresan

Sanjay Sundaresan is a Senior Engineering Manager for the batch storage infra team. He leads the HiveSync and Cloud Migration teams at Uber.

Posted by Radhika Patwari, Trivedhi Talakola, Rajan Jaiswal, Chayanika Bhandary, Mukesh Verma, Sanjay Sundaresan