D3: An Automated System to Detect Data Drifts
23 February 2023 / GlobalIntroduction
Data powers almost all critical, customer-facing flows at Uber. Bad data quality impacts our ML models, leading to a bad user experience (incorrect fares, ETAs, products, etc.) and revenue loss.
Still, many data issues are manually detected by users weeks or even months after they start. Data regressions are hard to catch because the most impactful ones are generally silent. They do not impact metrics and ML models in an obvious way until someone notices something is off, which finally unearths the data issue. But by that time, bad decisions are already made, and ML models have already underperformed.
This makes it critical to monitor data quality thoroughly so that issues are caught proactively.
Example of a Data Issue
Let’s take an example to understand the impact of a data incident.
Incident
Uber fares are composed of different components like surges, toll fees, etc. Riders’ reaction to these different components and trip conversion rates are critical to building fares ML models. We had an incident where fare component “X” was missing in the critical fares dataset for 10% of the sessions across key US cities.
Root Cause
The root cause was an app experiment that started logging fares differently.
How was it Detected?
This incident was detected after 45 days manually by one of the data scientists.
Impact
- This dataset is used to train critical fares ML models. Fares components are an important feature used to train this model. To quantify the impact of such data incidents, the Fares data science team has built a simulation framework that replicates corrupted data from real production incidents and assesses the impact on the fares data model performance. Corrupted fares component in 30% of the sessions severely impacts the model performance leading to a 0.23% decline in the incremental gross booking. An incident that corrupts the fare component data in 10% of sessions across major US cities for 45 days would translate to millions of dollars of lost revenue.
- The percentage of sessions with a particular fare component is a key metric that leadership and global ops use to make important decisions and understand the marketplace’s health.
- Additionally, when a data incident happens, multiple teams across data science, data engineering, and product engineering lose productivity as they try to determine the root cause of the data incident.
Data Incident Categories
It was important to dive deep into these incidents to devise a strategy to catch these issues faster. We analyzed data incidents at Uber in 2022 and divided them into various categories. We filtered out the incidents unrelated to data quality in datasets (e.g., access control and query layer issues).
We further divided the remaining incidents into ones that led to partially missing data and the others that led to complete data missing. The latter are usually caused by ETL pipeline failure or infra failures (e.g., Spark, Yarn, Hive outages); these issues are easier to detect and mitigate. But incidents that lead to partial/incomplete data are generally silent and are harder to detect.
The difficulty in monitoring and detecting partial data incidents is reflected by the fact that average TTD (Time to Detect) for these incidents is 5X more than that of data incidents caused by ETL/infra failures.
The goal of the framework we were designing was to bring down the TTD for partial data incidents to the same level as infra/ETL outages.
Detection Strategies
The reasons for incomplete data are many and spread throughout the stack:
New Features and Experiments
New features may alter the logs making the backend datasets inaccurate (e.g., a field may be omitted due to schema changes, or the meaning of a field may be changed).
ETL Logic Changes
Upstream ETL changes may corrupt the data introducing inaccuracies in a dataset (e.g., an erroneous change in join implementation, because of which data may start missing).
Upstream Outages and Migrations
An upstream service might stop logging records due to a bug or a large-scale migration.
Quality Problems in Third-Party Data Sources
Often, datasets ingest logs from third-party data sources whose quality is outside our control. These third-party data sources may produce inconsistencies in the data.
We looked at various approaches to monitor and detect these issues. Following is the distribution of the monitoring approaches by the number of incidents the approach would have caught:
We identified that a column-level test suite that checks for drift in column values could have detected many (almost 50%) incidents.
What is a Monitor?
Monitor, a term used frequently throughout this blog, denotes the computed stats values based on a certain type of aggregation, which is used to identify the drift in the column value.
Examples of Monitors
- Null Percentage
- False Percentage
- Percentile (P50, P75, P99, P1)
- Standard Deviation, Mean, Median
- Count Distinct
What Monitors Did We Require?
Based on our experience managing critical datasets and handling data incidents, we found the following monitors useful:
Null Checks
Check for drift in the percentage of rows with a null column value.
Foreign Key (FK) Tests (cross-dataset column comparison)
Foreign-key tests check for entity count consistency across datasets (e.g., compare trips in table X against the Source of Truth (SOT) table for trips).
Check for Drift in Numeric Columns Using Percentile Checks
A drastic change in numeric column values indicates the column’s meaning has changed.
Distribution in Categorical Columns
Check for drift in enum column value distribution e.g. a column with values X, Y, and Z appearing uniformly has had a disproportionately high count of Xs.
Other Requirements
Monitoring by Dimensions
Data issues generally start with a city or newly released app version. Monitoring by dimensions catches these issues faster.
Why Manually Applying Column Level Monitors Does Not Scale
- It requires thousands of person-weeks worth of effort
- Manually set static thresholds do not scale for the dynamic and trendy data we have at Uber
- Disparate tests across datasets do not scale for petabytes of data
- Keeping up with schema updates (new columns) is hard
- Setting up dimension based monitors for hundreds of cities is infeasible
Introducing D3 (Dataset Drift Detector)
From the above sections, it is clear that we needed a robust automated system to measure and monitor column-level data quality. D3 or Dataset Drift Detector was built with this goal in mind.
D3 Features
Automated Onboarding. The framework determines important columns in a dataset based on offline usage and applies monitors on these columns. It requires little to no configuration from the dataset owners to onboard.
Automated monitoring across dimensions. The framework monitors by dimensions (like app version and city_id) on its own. This helps detect data issues faster. This also gives a more accurate view of the data quality because a data issue that severely impacts a few cities still affects data consumers even if the overall data quality did not move much.
Automated anomaly detection. No need to manually set thresholds.
Architecture
The following diagram represents the high-level component view in the D3 architecture:
There are some core systems in this architecture managed by Uber Data Platform which are relevant to us. We refer to these systems occasionally in this blog:
- Databook: It’s an internal portal inside Uber to explore datasets, their columns, lineage, data quality tests, metrics, SLA, data consistency, duplicates, etc.
- UDQ: UDQ stands for Unified Data Quality. It’s a centralized system at Uber that takes care of how to define, execute, and maintain data quality tests on datasets and alerting mechanisms for those tests at scale. Dataset owners can create UDQ tests through Databook.
- Michelangelo: It’s a platform where ML models are deployed for real-time predictions across datasets inside Uber.
D3 architecture consists of 3 main components, which we will cover in detail:
- Compute Layer
- Anomaly Detection
- Orchestrator
Compute Layer
Compute layer forms the crux of the D3 framework. Any onboarded D3 datasets lifecycle will have two types of jobs that get executed as part of the compute layer:
- A one-time data profiler job that computes historic column monitor stats for the dataset for the past 90 days. This forms the training data for the anomaly detection model.
- A daily scheduled job that computes the column monitor stats for the latest day and uses an anomaly detection algorithm to predict the stats threshold and identify column drift for the day.
These compute jobs are generic Spark jobs that get executed against each dataset. Monitors are SQL statements expressed as Jinja templates. The D3 dataset config is used to translate the templates to real SQL queries executed inside the Spark application. The computed stats from the data profiler and daily scheduled jobs are persisted in the Hive D3 stats table. Alerting and monitoring is set up on top of this table.
Pluggable Monitors
Each dataset’s D3 config consists of a list of columns that needs to be part of monitoring and the type of monitors those columns are to be configured on. Some examples of monitor types are Null percentage, False percentage, Zero percentage, Percentile checks(25th percentile, 50th percentile and 75th percentile values), Mean, Standard deviation etc.
The new monitor addition in the D3 framework and in the compute layer is a pluggable component using the jinja template. Once the new monitors are added to the framework, any dataset can be configured with its own set of D3 supported monitor types for the column monitors.
Dynamic Filters
There is also a config option for providing dynamic filtering for each of the column monitors. The filters can be any valid hive SQL expressions that are applied in a where clause statement.
Dimensions
Having dimension-based stats computation is another critical requirement in the framework. We often see issues in all our upstream data whenever a new version of an app gets released or when a launch happens in a particular city with a bug. In such scenarios, there will not be any drift in the dataset columns at an overall level, but we can see significant spikes in the computed stats against those dimension cuts, such as city level or at the app version and device OS level.
But there are a few challenges in having dimension based stats compute support in the framework:
- First are the scalability challenges that arise if a dimension has high cardinality, resulting in high resource consumption for computing.
- Second, is the risk of getting high false positive alerts due to lower sampling data available to learn the trend.
Currently, we are supporting some fixed sets of dimensions to start with. One is a single dimension support against city_id, and the other is a two dimension support against app version and device OS, as these dimension use cases are common in identifying the data drift. But there are plans to support any ad hoc dimensions in our future release and have up to 5 dimension column combination support in the framework.
Anomaly Detection
As of today, data observability at Uber is based on manually curated SQL-based tests with static alerting thresholds. This requires constant attention and re-calibration to accommodate ever-changing data trends. With Anomaly Detection in place, we could have more flexible and dynamic alerting. For the D3 use case, we are tuning the models for high precision in order to reduce false alerts.
Anomaly Detection Integration
Integration of any anomaly detection model into the D3 framework is plug and play. It is based on generic UDF interfaces and any model can have its own implementation defined. While configuring, users can select anomaly detection of their choice without worrying about what’s happening under the hood.
For any anomaly detection model, the input is the time series data, and the output expected is the predicted limits within which the monitor value should fall. Base limits can sometimes be more aggressive and generate many false positives. As we want a high-precision model, typically, we define conservative alerting limits on top of the base ones. These alerting limits are a function of base limits and vary dynamically.
Prophet
We have integrated Prophet as default anomaly detection for D3. Prophet is a nonlinear regression model with a step improvement over intuitive techniques like moving averages. It is fully automated and adjusts well for changing trends and seasonality. It works well even with less training data compared to some other anomaly detection models.
Dealing with Outliers (Feedback)
Real data often contain outlying observations. Dealing with them can sometimes be troublesome and can mess up the forecasts. Outliers are observations that are very different from the majority of the observations in the time series. They may be errors or issues seen in case of any failures or bugs. In order to minimize the long-term effect of these observations propagating into forecasts, a feedback mechanism is in place. Here users are enabled options to manually tag outliers as true or false. These inputs are fed to the anomaly detection models where the data points corresponding to true alerts are dropped from considerations for future predictions.
Dealing with Noise (Diagnosis)
We have seen many time series that are naturally noisy (dips and spikes) and result in lots of alerts. To address this, we have an explicit diagnosis job that helps identify and filter noisy time series where the percentage error in predictions is high. The monitors corresponding to these time series are disabled for alerting.
Orchestrator
Orchestrator is a service component through which D3 capabilities are exposed to the outside world. It acts as a mediator between the Uber data platform and D3.
Role of the Orchestrator
Resource Management
The orchestrator manages two important resources:
Metadata: Every dataset has some metadata: dimensions, aggregators, supported monitor types, excluded columns, dataset partition-related information, etc. Metadata determines what monitors can be defined on the dataset. It also helps in one-click automated onboarding of the datasets.
Monitor: The orchestrator exposes gRPC endpoints to fetch and upsert monitor-level information for a given dataset.
Lifecycle Management
The orchestrator:
- Manages D3 monitor lifecycle–profiling data, stats compute, anomaly detection, status update to the Uber Data Platform components
- Keeps D3 in sync with dataset schema changes
- Supports scheduled and ad hoc trigger-based stats computation
- Facilities monitor update: if there is some metadata change (e.g., dimension or aggregator change) or monitor level attribute updates (e.g., threshold or monitor type updates, etc.), the corresponding monitors have to be updated, and stats should be updated.
Orchestrator Integration
Since D3 is targeted to find data quality issues at scale as fast as possible, an effective way to utilize D3 is to create D3-enabled tests through the Databook UI. For that to happen, the orchestrator has been integrated with the Uber Data Platform.
Generic Data Platform Contract
Uber Data platform (more specifically, UDQ) has a generic API contract using which any system can integrate, create and maintain data quality tests and their life cycle. The orchestrator implements these functionalities to serve the data consumers’ need:
- Suggest Test: recommend monitors to onboard
- CRUD APIs
- Backfill Test: recompute stats for past days
Challenges
Scale
To give a brief description of the scaling complexity, Uber has more than 1000+ tier 1 datasets, and on average, each dataset has 50+ columns. Considering at least two monitor types (such as Null Percentage and P75) getting added to a monitored column and if the dataset has at least 1 dimension and 1 aggregator, this can easily translate into 100+ monitors per dataset and 100k+ monitors for Uber-wide datasets.
This results in some of the below complexities that need to be addressed:
- High resource utilization for both data profiler and daily scheduled jobs
- Tedious manual process in onboarding monitors for a dataset
- Data profiling job trigger deduplication for different scenarios, such as during backfilling, schema evolution, and ad-hoc triggers
Compute Layer Optimization
Given that the compute jobs are generic Spark jobs that get applied against any dataset in Uber and with the above-mentioned scale, this can result in at least 100+ monitors per dataset and 100+ queries per Spark job. This translates into millions of queries for Uber-wide datasets. Both data profiler and daily scheduled Spark jobs are optimized and fine-tuned to have minimal shuffle and data transfer by clubbing these multiple SQL queries into a fixed logical set of templates based on the type of projection, grouping, and filtering expressions (sample example shown in the above diagram). This optimization resulted in reducing the number of queries per dataset from 200+ to 8 queries and in turn, reduced significant shuffle and data parsing, resulting in a 100x improvement in resource consumption. The average cost of computing per dataset was reduced from $1.5 to $0.01.
Another major bottleneck in this compute layer is the data profiler job or the historical data computation for high volume datasets. We need to tune the data profiler job to support datasets that have >1TB data per day volume for historical computation of 90+ days. For such datasets, the jobs are executed in smaller chunks and in parallel to reduce the amount of shuffle that gets transferred at each stage. Along with a few other Spark parameters tuning, we could support any high volume dataset data profiler job with optimal resource utilization.
Automated Onboarding
When we started onboarding datasets, we noticed that onboarding 100+ monitors per dataset is a tedious process. This required a lot of manual user input, such as reaching out to the owners to find out which monitors should be implemented. It was taking days. We wanted D3 to support single-click onboarding with minimal user intervention.
We currently support all the datasets which are partitioned on date in YYYY-MM-DD format and are updated at least on a daily basis. We start onboarding by:
- Fetching Top X% important columns based on usage
- Configuring monitors based on column datatype
- Configuring default monitor types and dimension mapping for the dataset
- Handling data/partition delays due to late arriving data by running D3 on T – X partition
With this approach, we reduced the onboarding time from days to a few seconds and also made it a fully automated, single-click operation.
Handling Stats Recomputation
Multiple scenarios result in stats recomputations. It could be due to the addition of new monitors or updates of existing ones or custom reruns, or incident backfills requested by users for true positive alerts. Users are expected to do source backfill and request for recomputation of stats and thresholds.
One of the main challenges here is that multiple recomputation requests can lead to multiple jobs processing the same data repeatedly. This could be highly inefficient, as the number of monitors can be overwhelming. To address this, we devised a batching-based solution that asynchronously processes recomputation requests (triggers).
A periodic Piper task is scheduled to hit the Orchestrator trigger handler endpoint every two hours and asynchronously process the triggers. The trigger handler internally fetches all the pending requests and batches them together into a single trigger request per dataset. It further initiates the stats computation pipeline if not already running for the dataset.
Data Quality Alerting and Monitoring
The computed stats value and the dynamic thresholds generated from the data profiler and daily scheduled jobs are persisted in a Hive-based stats table. The stats table is used in validating the data quality and used in tagging the completeness uptime for the dataset. Alerting is integrated with databook monitoring service and pager duty oncall like any other uber service. An alert is triggered whenever the threshold breach occurs in the persisted stats table. The stats table is also used in configuring the tableau dashboard for visualization, which is integrated with the databook.
Alerting View
Dashboard Views
Sample Time Series Trend in a Dataset Monitor
Sample Alerts Trend in a Dataset
Our Progress So Far
The fares data incident mentioned at the beginning was detected manually after 45 days. After the release of the D3 framework, we have monitored more than 300 Tier 1 datasets. The TTD has reduced drastically by more than 20 X, the average being 2 days. We have detected more than 6 production issues on marketplace/fares datasets with high accuracy (95.23%) on fact tables. The kind of incidents we have seen –
- critical column being null due to upstream changes or faulty releases
- critical column being null because of the corrupt source data
- spike in unexpected values in the numeric column
- heavily queried dataset (about 13K queries per week) having missing data
These columns are used to derive critical metrics, analyze rider behavior, and compute the fares riders see on the app. According to simulations described at the blog’s beginning, detecting these data issues early saved us tens of millions of dollars in incremental revenue.
What’s Next
Motivated by the need for better data quality, we built the D3 framework as the one-stop solution for monitoring and measuring column-level data quality. There is a lot of scope to enhance this framework and apply it in novel scenarios.
ML Model Quality
The features used to train ML models must be of high quality. It’s important to detect bad features whose values drift and filter them out. Also, the data used to train an ML model must not be drastically different from the online data used while serving it. Maintaining high-quality ML models by monitoring data drift is fast becoming table stakes. We plan to onboard these use cases to the D3 framework.
Custom Dimensions
As noted earlier, dimension-based monitoring is key to faster time to detection. We plan to support custom, dataset-specific dimensions on D3 metrics.
Support Non-Partitioned or Differently Partitioned Datasets
D3 supports only date-partitioned datasets. This is because we can easily extract time series to monitor from such datasets. But there are many critical datasets like dimension tables that are not partitioned. We plan to onboard such datasets into the D3 framework in the future.
New Anomaly Detection Algorithms
As mentioned above, D3 currently supports only Prophet’s anomaly detection library, but in the future, we can integrate with new anomaly detection libraries like ARIMA and others built in house.
New Monitors and Profiling Methodologies
We will incrementally add new monitors to make the D3 framework more effective at detecting anomalies. We will also adopt new ways of profiling data (like data sketches) instead of raw monitors. Data consumers can merge multiple data sketches to answer broader questions than just a raw metric.
Anshal Shukla
Anshal Shukla is a Senior Staff Engineer on the Data Intelligence team. He is focused on systems that improve data reliability and simplify data consumption and production at Uber.
Vineeth Tatipathri
Vineeth Tatipathri is an engineer on the Data Intelligence team. He is one of the initial developers to work on D3 from the ground up, evolving it to scale and generating Uber wide impact.
Nipun Vats
Nipun Vats is an engineer on the Data Intelligence team at Uber. He is a main contributor to the Dataset Drift Detector framework. He is currently working on enhancing and scaling the framework to save millions of dollars.
Dinesh Jagannathan
Dinesh Jagannathan is a Staff Engineer on the Data Intelligence team. He is focused on building scalable data products to improve data quality, standardizing best practices, and improving developer productivity.
Kousik Nath
Kousik Nath is a Senior Engineer on the Data Intelligence team. He is one of the contributors to the D3 framework. Apart from D3, he also contributes to fares related systems.
Posted by Anshal Shukla, Vineeth Tatipathri, Nipun Vats, Dinesh Jagannathan, Kousik Nath
Related articles
Enabling Security for Hadoop Data Lake on Google Cloud Storage
30 July / Global
Most popular
Differential Backups in MyRocks Based Distributed Databases at Uber
Upgrading Uber’s MySQL Fleet to version 8.0
Sparkle: Standardizing Modular ETL at Uber
Charting the mobility evolution: excerpts from Uber’s latest industry paper
Products
Company