Claude-skill-registry Data Quality Checks

Comprehensive guide to data quality validation, testing frameworks, anomaly detection, and data observability for production data pipelines

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-quality-checks" ~/.claude/skills/majiayu000-claude-skill-registry-data-quality-checks && rm -rf "$T"
manifest: skills/data/data-quality-checks/SKILL.md
source content

Data Quality Checks

What is Data Quality?

Data Quality: Ensuring data is accurate, complete, consistent, and reliable for business use.

Dimensions of Data Quality

Accuracy:     Data is correct
Completeness: No missing values
Consistency:  Same across systems
Timeliness:   Data is fresh
Validity:     Conforms to rules
Uniqueness:   No duplicates

Why Data Quality Matters

  • Bad decisions: Wrong data → wrong insights
  • Broken pipelines: Invalid data breaks downstream
  • Lost trust: Users stop trusting data
  • Compliance: Regulations require data quality

Types of Data Quality Checks

Schema Validation

# Check column exists
assert "customer_id" in df.columns

# Check data type
assert df.schema["customer_id"].dataType == IntegerType()

# Check column count
assert len(df.columns) == 10

Completeness Checks

# No nulls in required columns
null_count = df.filter(col("customer_id").isNull()).count()
assert null_count == 0, f"Found {null_count} null customer_ids"

# Minimum row count
row_count = df.count()
assert row_count > 1000, f"Expected >1000 rows, got {row_count}"

Uniqueness Checks

# No duplicates
total_rows = df.count()
unique_rows = df.select("customer_id").distinct().count()
assert total_rows == unique_rows, f"Found {total_rows - unique_rows} duplicates"

Range Checks

# Values within expected range
invalid_ages = df.filter((col("age") < 0) | (col("age") > 120)).count()
assert invalid_ages == 0, f"Found {invalid_ages} invalid ages"

# Amount is positive
invalid_amounts = df.filter(col("amount") < 0).count()
assert invalid_amounts == 0, f"Found {invalid_amounts} negative amounts"

Referential Integrity

# Foreign key exists
orders_df = spark.read.table("orders")
customers_df = spark.read.table("customers")

orphan_orders = orders_df.join(
    customers_df,
    orders_df.customer_id == customers_df.customer_id,
    "left_anti"
).count()

assert orphan_orders == 0, f"Found {orphan_orders} orders without customers"

Format Validation

# Email format
import re

email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
invalid_emails = df.filter(
    ~col("email").rlike(email_pattern)
).count()

assert invalid_emails == 0, f"Found {invalid_emails} invalid emails"

# Date format
invalid_dates = df.filter(
    ~col("order_date").cast("date").isNotNull()
).count()

assert invalid_dates == 0, f"Found {invalid_dates} invalid dates"

Great Expectations

Setup

from great_expectations.dataset import SparkDFDataset

# Wrap DataFrame
ge_df = SparkDFDataset(df)

Common Expectations

# Column exists
ge_df.expect_column_to_exist("customer_id")

# No nulls
ge_df.expect_column_values_to_not_be_null("customer_id")

# Unique values
ge_df.expect_column_values_to_be_unique("customer_id")

# Values in set
ge_df.expect_column_values_to_be_in_set(
    "status",
    ["pending", "completed", "cancelled"]
)

# Range
ge_df.expect_column_values_to_be_between(
    "age",
    min_value=0,
    max_value=120
)

# Regex
ge_df.expect_column_values_to_match_regex(
    "email",
    r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)

# Row count
ge_df.expect_table_row_count_to_be_between(
    min_value=1000,
    max_value=1000000
)

Validation

# Run all expectations
results = ge_df.validate()

# Check if passed
if results["success"]:
    print("All checks passed!")
else:
    print("Some checks failed:")
    for result in results["results"]:
        if not result["success"]:
            print(f"  - {result['expectation_config']['expectation_type']}")

Data Docs

# Generate HTML documentation
great_expectations docs build

dbt Tests

Schema Tests

# models/schema.yml
version: 2

models:
  - name: customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null
      
      - name: email
        tests:
          - not_null
          - unique
      
      - name: status
        tests:
          - accepted_values:
              values: ['active', 'inactive', 'suspended']
      
      - name: created_at
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "<= current_date"

Custom Tests

-- tests/assert_positive_revenue.sql
select *
from {{ ref('fct_orders') }}
where total_amount < 0

Relationships Test

models:
  - name: orders
    columns:
      - name: customer_id
        tests:
          - relationships:
              to: ref('customers')
              field: customer_id

Anomaly Detection

Statistical Anomalies

from scipy import stats
import numpy as np

# Z-score (standard deviations from mean)
df_pd = df.toPandas()
z_scores = np.abs(stats.zscore(df_pd['amount']))
anomalies = df_pd[z_scores > 3]  # >3 std devs

print(f"Found {len(anomalies)} anomalies")

Time Series Anomalies

# Detect sudden spikes/drops
daily_counts = df.groupBy("date").count().orderBy("date")

# Calculate moving average
from pyspark.sql.window import Window

window = Window.orderBy("date").rowsBetween(-7, 0)
daily_counts = daily_counts.withColumn(
    "moving_avg",
    avg("count").over(window)
)

# Flag anomalies (>2x moving average)
anomalies = daily_counts.filter(
    col("count") > col("moving_avg") * 2
)

Null Rate Anomalies

# Track null rate over time
null_rates = df.groupBy("date").agg(
    (sum(when(col("email").isNull(), 1).otherwise(0)) / count("*")).alias("null_rate")
)

# Alert if null rate > 5%
high_null_days = null_rates.filter(col("null_rate") > 0.05)

Data Observability

Freshness Checks

from datetime import datetime, timedelta

# Check data freshness
latest_timestamp = df.agg(max("created_at")).collect()[0][0]
now = datetime.now()
age = now - latest_timestamp

# Alert if data is >24 hours old
assert age < timedelta(hours=24), f"Data is {age} old (stale!)"

Volume Checks

# Check row count is within expected range
today_count = df.filter(col("date") == current_date()).count()
expected_min = 10000
expected_max = 100000

assert expected_min <= today_count <= expected_max, \
    f"Row count {today_count} outside expected range [{expected_min}, {expected_max}]"

Distribution Checks

# Check value distribution
status_dist = df.groupBy("status").count().collect()

# Expected distribution
expected = {
    "active": (0.7, 0.9),      # 70-90%
    "inactive": (0.05, 0.2),   # 5-20%
    "suspended": (0.0, 0.1)    # 0-10%
}

total = df.count()
for row in status_dist:
    status = row["status"]
    count = row["count"]
    pct = count / total
    
    if status in expected:
        min_pct, max_pct = expected[status]
        assert min_pct <= pct <= max_pct, \
            f"{status}: {pct:.1%} outside expected range [{min_pct:.1%}, {max_pct:.1%}]"

Data Quality Frameworks

Great Expectations

import great_expectations as ge

# Create context
context = ge.get_context()

# Create expectation suite
suite = context.create_expectation_suite("my_suite")

# Add expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="my_suite"
)

validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_unique("customer_id")

# Save suite
validator.save_expectation_suite()

# Run validation
results = context.run_checkpoint(checkpoint_name="my_checkpoint")

dbt Tests

# Run all tests
dbt test

# Run tests for specific model
dbt test --select customers

# Run tests in CI/CD
dbt test --fail-fast

Monte Carlo

# SaaS data observability platform
# Automatic anomaly detection
# No code required

Soda

# checks.yml
checks for customers:
  - row_count > 1000
  - missing_count(customer_id) = 0
  - duplicate_count(customer_id) = 0
  - invalid_count(email) = 0:
      valid format: email
  - freshness(created_at) < 24h
# Run checks
soda scan -d my_datasource -c configuration.yml checks.yml

Implementing Data Quality Pipeline

Pipeline Structure

def run_quality_checks(df, checks):
    """Run data quality checks and return results"""
    results = []
    
    for check in checks:
        try:
            check(df)
            results.append({
                "check": check.__name__,
                "status": "PASS",
                "error": None
            })
        except AssertionError as e:
            results.append({
                "check": check.__name__,
                "status": "FAIL",
                "error": str(e)
            })
    
    return results

# Define checks
def check_no_nulls(df):
    null_count = df.filter(col("customer_id").isNull()).count()
    assert null_count == 0, f"Found {null_count} nulls"

def check_no_duplicates(df):
    total = df.count()
    unique = df.select("customer_id").distinct().count()
    assert total == unique, f"Found {total - unique} duplicates"

def check_row_count(df):
    count = df.count()
    assert count > 1000, f"Expected >1000 rows, got {count}"

# Run checks
checks = [check_no_nulls, check_no_duplicates, check_row_count]
results = run_quality_checks(df, checks)

# Log results
for result in results:
    if result["status"] == "FAIL":
        print(f"❌ {result['check']}: {result['error']}")
    else:
        print(f"✅ {result['check']}")

# Fail pipeline if any check failed
if any(r["status"] == "FAIL" for r in results):
    raise Exception("Data quality checks failed")

Quarantine Bad Data

# Separate good and bad data
good_data = df.filter(
    col("customer_id").isNotNull() &
    col("email").rlike(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
)

bad_data = df.subtract(good_data)

# Write good data to production
good_data.write.format("delta").mode("append").save("/prod/customers")

# Write bad data to quarantine
bad_data.write.format("delta").mode("append").save("/quarantine/customers")

# Alert on bad data
if bad_data.count() > 0:
    send_alert(f"Found {bad_data.count()} bad records in quarantine")

Monitoring and Alerting

Metrics to Track

Freshness: How old is the data?
Volume: Row count per day
Completeness: % of non-null values
Uniqueness: % of unique values
Validity: % of values passing validation
Schema changes: Columns added/removed/changed

Alerting Rules

# Alert if data is stale
if data_age > timedelta(hours=24):
    send_alert("Data is stale")

# Alert if volume drops >50%
if today_count < yesterday_count * 0.5:
    send_alert("Volume dropped significantly")

# Alert if null rate increases >10%
if today_null_rate > yesterday_null_rate + 0.1:
    send_alert("Null rate increased")

Dashboard

Metrics Dashboard:
- Data freshness (last updated)
- Row count trend (7 days)
- Null rate trend (7 days)
- Test pass rate (% passing)
- Failed tests (list)

Best Practices

1. Test Early and Often

Bronze layer: Schema validation
Silver layer: Completeness, uniqueness
Gold layer: Business logic validation

2. Fail Fast

# Stop pipeline if critical checks fail
if critical_check_failed:
    raise Exception("Critical check failed, stopping pipeline")

3. Quarantine Bad Data

Don't drop bad data
→ Quarantine for investigation
→ Fix and reprocess

4. Monitor Trends

Track metrics over time
Alert on anomalies
Investigate root cause

5. Document Expectations

# Document what "good" data looks like
customers:
  - customer_id: unique, not null
  - email: valid format, unique
  - age: 0-120
  - status: active, inactive, suspended

Summary

Data Quality: Ensuring data is accurate, complete, consistent

Dimensions:

  • Accuracy, Completeness, Consistency
  • Timeliness, Validity, Uniqueness

Check Types:

  • Schema validation
  • Completeness checks
  • Uniqueness checks
  • Range checks
  • Referential integrity
  • Format validation

Frameworks:

  • Great Expectations (Python)
  • dbt tests (SQL)
  • Soda (YAML)
  • Monte Carlo (SaaS)

Anomaly Detection:

  • Statistical (Z-score)
  • Time series (moving average)
  • Null rate tracking

Data Observability:

  • Freshness checks
  • Volume checks
  • Distribution checks

Best Practices:

  • Test early and often
  • Fail fast on critical checks
  • Quarantine bad data
  • Monitor trends
  • Document expectations