Claude-skill-registry Data Quality Checks
Comprehensive guide to data quality validation, testing frameworks, anomaly detection, and data observability for production data pipelines
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-quality-checks" ~/.claude/skills/majiayu000-claude-skill-registry-data-quality-checks && rm -rf "$T"
manifest:
skills/data/data-quality-checks/SKILL.mdsource content
Data Quality Checks
What is Data Quality?
Data Quality: Ensuring data is accurate, complete, consistent, and reliable for business use.
Dimensions of Data Quality
Accuracy: Data is correct Completeness: No missing values Consistency: Same across systems Timeliness: Data is fresh Validity: Conforms to rules Uniqueness: No duplicates
Why Data Quality Matters
- Bad decisions: Wrong data → wrong insights
- Broken pipelines: Invalid data breaks downstream
- Lost trust: Users stop trusting data
- Compliance: Regulations require data quality
Types of Data Quality Checks
Schema Validation
# Check column exists assert "customer_id" in df.columns # Check data type assert df.schema["customer_id"].dataType == IntegerType() # Check column count assert len(df.columns) == 10
Completeness Checks
# No nulls in required columns null_count = df.filter(col("customer_id").isNull()).count() assert null_count == 0, f"Found {null_count} null customer_ids" # Minimum row count row_count = df.count() assert row_count > 1000, f"Expected >1000 rows, got {row_count}"
Uniqueness Checks
# No duplicates total_rows = df.count() unique_rows = df.select("customer_id").distinct().count() assert total_rows == unique_rows, f"Found {total_rows - unique_rows} duplicates"
Range Checks
# Values within expected range invalid_ages = df.filter((col("age") < 0) | (col("age") > 120)).count() assert invalid_ages == 0, f"Found {invalid_ages} invalid ages" # Amount is positive invalid_amounts = df.filter(col("amount") < 0).count() assert invalid_amounts == 0, f"Found {invalid_amounts} negative amounts"
Referential Integrity
# Foreign key exists orders_df = spark.read.table("orders") customers_df = spark.read.table("customers") orphan_orders = orders_df.join( customers_df, orders_df.customer_id == customers_df.customer_id, "left_anti" ).count() assert orphan_orders == 0, f"Found {orphan_orders} orders without customers"
Format Validation
# Email format import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' invalid_emails = df.filter( ~col("email").rlike(email_pattern) ).count() assert invalid_emails == 0, f"Found {invalid_emails} invalid emails" # Date format invalid_dates = df.filter( ~col("order_date").cast("date").isNotNull() ).count() assert invalid_dates == 0, f"Found {invalid_dates} invalid dates"
Great Expectations
Setup
from great_expectations.dataset import SparkDFDataset # Wrap DataFrame ge_df = SparkDFDataset(df)
Common Expectations
# Column exists ge_df.expect_column_to_exist("customer_id") # No nulls ge_df.expect_column_values_to_not_be_null("customer_id") # Unique values ge_df.expect_column_values_to_be_unique("customer_id") # Values in set ge_df.expect_column_values_to_be_in_set( "status", ["pending", "completed", "cancelled"] ) # Range ge_df.expect_column_values_to_be_between( "age", min_value=0, max_value=120 ) # Regex ge_df.expect_column_values_to_match_regex( "email", r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' ) # Row count ge_df.expect_table_row_count_to_be_between( min_value=1000, max_value=1000000 )
Validation
# Run all expectations results = ge_df.validate() # Check if passed if results["success"]: print("All checks passed!") else: print("Some checks failed:") for result in results["results"]: if not result["success"]: print(f" - {result['expectation_config']['expectation_type']}")
Data Docs
# Generate HTML documentation great_expectations docs build
dbt Tests
Schema Tests
# models/schema.yml version: 2 models: - name: customers columns: - name: customer_id tests: - unique - not_null - name: email tests: - not_null - unique - name: status tests: - accepted_values: values: ['active', 'inactive', 'suspended'] - name: created_at tests: - not_null - dbt_utils.expression_is_true: expression: "<= current_date"
Custom Tests
-- tests/assert_positive_revenue.sql select * from {{ ref('fct_orders') }} where total_amount < 0
Relationships Test
models: - name: orders columns: - name: customer_id tests: - relationships: to: ref('customers') field: customer_id
Anomaly Detection
Statistical Anomalies
from scipy import stats import numpy as np # Z-score (standard deviations from mean) df_pd = df.toPandas() z_scores = np.abs(stats.zscore(df_pd['amount'])) anomalies = df_pd[z_scores > 3] # >3 std devs print(f"Found {len(anomalies)} anomalies")
Time Series Anomalies
# Detect sudden spikes/drops daily_counts = df.groupBy("date").count().orderBy("date") # Calculate moving average from pyspark.sql.window import Window window = Window.orderBy("date").rowsBetween(-7, 0) daily_counts = daily_counts.withColumn( "moving_avg", avg("count").over(window) ) # Flag anomalies (>2x moving average) anomalies = daily_counts.filter( col("count") > col("moving_avg") * 2 )
Null Rate Anomalies
# Track null rate over time null_rates = df.groupBy("date").agg( (sum(when(col("email").isNull(), 1).otherwise(0)) / count("*")).alias("null_rate") ) # Alert if null rate > 5% high_null_days = null_rates.filter(col("null_rate") > 0.05)
Data Observability
Freshness Checks
from datetime import datetime, timedelta # Check data freshness latest_timestamp = df.agg(max("created_at")).collect()[0][0] now = datetime.now() age = now - latest_timestamp # Alert if data is >24 hours old assert age < timedelta(hours=24), f"Data is {age} old (stale!)"
Volume Checks
# Check row count is within expected range today_count = df.filter(col("date") == current_date()).count() expected_min = 10000 expected_max = 100000 assert expected_min <= today_count <= expected_max, \ f"Row count {today_count} outside expected range [{expected_min}, {expected_max}]"
Distribution Checks
# Check value distribution status_dist = df.groupBy("status").count().collect() # Expected distribution expected = { "active": (0.7, 0.9), # 70-90% "inactive": (0.05, 0.2), # 5-20% "suspended": (0.0, 0.1) # 0-10% } total = df.count() for row in status_dist: status = row["status"] count = row["count"] pct = count / total if status in expected: min_pct, max_pct = expected[status] assert min_pct <= pct <= max_pct, \ f"{status}: {pct:.1%} outside expected range [{min_pct:.1%}, {max_pct:.1%}]"
Data Quality Frameworks
Great Expectations
import great_expectations as ge # Create context context = ge.get_context() # Create expectation suite suite = context.create_expectation_suite("my_suite") # Add expectations validator = context.get_validator( batch_request=batch_request, expectation_suite_name="my_suite" ) validator.expect_column_values_to_not_be_null("customer_id") validator.expect_column_values_to_be_unique("customer_id") # Save suite validator.save_expectation_suite() # Run validation results = context.run_checkpoint(checkpoint_name="my_checkpoint")
dbt Tests
# Run all tests dbt test # Run tests for specific model dbt test --select customers # Run tests in CI/CD dbt test --fail-fast
Monte Carlo
# SaaS data observability platform # Automatic anomaly detection # No code required
Soda
# checks.yml checks for customers: - row_count > 1000 - missing_count(customer_id) = 0 - duplicate_count(customer_id) = 0 - invalid_count(email) = 0: valid format: email - freshness(created_at) < 24h
# Run checks soda scan -d my_datasource -c configuration.yml checks.yml
Implementing Data Quality Pipeline
Pipeline Structure
def run_quality_checks(df, checks): """Run data quality checks and return results""" results = [] for check in checks: try: check(df) results.append({ "check": check.__name__, "status": "PASS", "error": None }) except AssertionError as e: results.append({ "check": check.__name__, "status": "FAIL", "error": str(e) }) return results # Define checks def check_no_nulls(df): null_count = df.filter(col("customer_id").isNull()).count() assert null_count == 0, f"Found {null_count} nulls" def check_no_duplicates(df): total = df.count() unique = df.select("customer_id").distinct().count() assert total == unique, f"Found {total - unique} duplicates" def check_row_count(df): count = df.count() assert count > 1000, f"Expected >1000 rows, got {count}" # Run checks checks = [check_no_nulls, check_no_duplicates, check_row_count] results = run_quality_checks(df, checks) # Log results for result in results: if result["status"] == "FAIL": print(f"❌ {result['check']}: {result['error']}") else: print(f"✅ {result['check']}") # Fail pipeline if any check failed if any(r["status"] == "FAIL" for r in results): raise Exception("Data quality checks failed")
Quarantine Bad Data
# Separate good and bad data good_data = df.filter( col("customer_id").isNotNull() & col("email").rlike(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') ) bad_data = df.subtract(good_data) # Write good data to production good_data.write.format("delta").mode("append").save("/prod/customers") # Write bad data to quarantine bad_data.write.format("delta").mode("append").save("/quarantine/customers") # Alert on bad data if bad_data.count() > 0: send_alert(f"Found {bad_data.count()} bad records in quarantine")
Monitoring and Alerting
Metrics to Track
Freshness: How old is the data? Volume: Row count per day Completeness: % of non-null values Uniqueness: % of unique values Validity: % of values passing validation Schema changes: Columns added/removed/changed
Alerting Rules
# Alert if data is stale if data_age > timedelta(hours=24): send_alert("Data is stale") # Alert if volume drops >50% if today_count < yesterday_count * 0.5: send_alert("Volume dropped significantly") # Alert if null rate increases >10% if today_null_rate > yesterday_null_rate + 0.1: send_alert("Null rate increased")
Dashboard
Metrics Dashboard: - Data freshness (last updated) - Row count trend (7 days) - Null rate trend (7 days) - Test pass rate (% passing) - Failed tests (list)
Best Practices
1. Test Early and Often
Bronze layer: Schema validation Silver layer: Completeness, uniqueness Gold layer: Business logic validation
2. Fail Fast
# Stop pipeline if critical checks fail if critical_check_failed: raise Exception("Critical check failed, stopping pipeline")
3. Quarantine Bad Data
Don't drop bad data → Quarantine for investigation → Fix and reprocess
4. Monitor Trends
Track metrics over time Alert on anomalies Investigate root cause
5. Document Expectations
# Document what "good" data looks like customers: - customer_id: unique, not null - email: valid format, unique - age: 0-120 - status: active, inactive, suspended
Summary
Data Quality: Ensuring data is accurate, complete, consistent
Dimensions:
- Accuracy, Completeness, Consistency
- Timeliness, Validity, Uniqueness
Check Types:
- Schema validation
- Completeness checks
- Uniqueness checks
- Range checks
- Referential integrity
- Format validation
Frameworks:
- Great Expectations (Python)
- dbt tests (SQL)
- Soda (YAML)
- Monte Carlo (SaaS)
Anomaly Detection:
- Statistical (Z-score)
- Time series (moving average)
- Null rate tracking
Data Observability:
- Freshness checks
- Volume checks
- Distribution checks
Best Practices:
- Test early and often
- Fail fast on critical checks
- Quarantine bad data
- Monitor trends
- Document expectations