Claude-Skills senior-data-engineer

install
source · Clone the upstream repo
git clone https://github.com/borghei/Claude-Skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/borghei/Claude-Skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/engineering/senior-data-engineer" ~/.claude/skills/borghei-claude-skills-senior-data-engineer && rm -rf "$T"
manifest: engineering/senior-data-engineer/SKILL.md
source content

Senior Data Engineer

The agent generates pipeline configurations (Airflow, Prefect, Dagster), validates data quality with profiling and anomaly detection, and optimizes SQL/Spark performance with actionable recommendations.


Quick Start

# Generate an Airflow DAG for incremental PostgreSQL -> Snowflake
python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --destination snowflake \
  --tables orders,customers --mode incremental --schedule "0 5 * * *"

# Validate data quality against a schema
python scripts/data_quality_validator.py validate data.csv \
  --schema schema.json --detect-anomalies --json

# Profile a dataset
python scripts/data_quality_validator.py profile data.csv --json

# Optimize a slow SQL query
python scripts/etl_performance_optimizer.py analyze-sql query.sql \
  --warehouse snowflake --json

# Estimate query cost
python scripts/etl_performance_optimizer.py estimate-cost query.sql \
  --warehouse bigquery --stats data_stats.json --json

Tools Overview

ToolSubcommandsPurpose
pipeline_orchestrator.py
generate
,
validate
,
template
Generate Airflow/Prefect/Dagster pipeline code, validate DAGs
data_quality_validator.py
validate
,
profile
,
generate-suite
,
contract
,
schema
Schema validation, profiling, anomaly detection, Great Expectations
etl_performance_optimizer.py
analyze-sql
,
analyze-spark
,
optimize-partition
,
estimate-cost
,
template
SQL/Spark optimization, partition strategy, cost estimation

All subcommands support

--json
for machine-readable output and
--output
for file writing.


Workflow 1: Batch ETL Pipeline (PostgreSQL -> dbt -> Snowflake)

Step 1 -- Generate extraction config.

python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --tables orders,customers,products \
  --mode incremental --watermark updated_at --output dags/extract_source.py

Step 2 -- Create dbt staging model.

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
)
SELECT order_id, customer_id, order_date, total_amount, status, _extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)

Step 3 -- Create incremental mart model.

-- models/marts/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id', cluster_by=['order_date']) }}

SELECT o.order_id, o.customer_id, c.customer_segment, o.order_date, o.total_amount, o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}

Step 4 -- Wire into Airflow DAG.

with DAG('daily_etl', schedule_interval='0 5 * * *', catchup=False, tags=['etl']) as dag:
    extract = BashOperator(task_id='extract', bash_command='python scripts/extract.py --date {{ ds }}')
    transform = BashOperator(task_id='dbt_run', bash_command='dbt run --select marts.*')
    test = BashOperator(task_id='dbt_test', bash_command='dbt test --select marts.*')
    extract >> transform >> test

Step 5 -- Validate.

python scripts/data_quality_validator.py validate --table fct_orders --checks all --output report.json

Validation checkpoint: DAG runs end-to-end. Data quality report shows 0 failures on uniqueness, completeness, and freshness.


Workflow 2: Real-Time Streaming (Kafka -> Spark -> Delta Lake)

Step 1 -- Define event schema and Kafka topic.

kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --topic user-events --partitions 12 --replication-factor 3 \
  --config retention.ms=604800000

Step 2 -- Implement Spark Structured Streaming.

events_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest").load()

parsed_df = events_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

aggregated_df = parsed_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(window(col("event_timestamp"), "5 minutes"), col("event_type")) \
    .agg(count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users"))

aggregated_df.writeStream.format("delta").outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .trigger(processingTime="1 minute").start()

Step 3 -- Handle errors with dead letter queue.

def process_with_dlq(batch_df, batch_id):
    valid_df = batch_df.filter(col("event_id").isNotNull())
    invalid_df = batch_df.filter(col("event_id").isNull())
    valid_df.write.format("delta").mode("append").save("/data/lake/user_events")
    if invalid_df.count() > 0:
        invalid_df.withColumn("error_reason", lit("missing_event_id")) \
            .write.format("delta").mode("append").save("/data/lake/dlq/user_events")

Validation checkpoint: Consumer lag stays under threshold. DLQ table has < 0.1% of total events.


Workflow 3: Data Quality Framework

Step 1 -- Generate a Great Expectations suite from data.

python scripts/data_quality_validator.py generate-suite data.csv --output expectations.json

Step 2 -- Validate against a data contract.

# contracts/orders_contract.yaml
contract:
  name: orders_data_contract
  version: "1.0.0"
schema:
  properties:
    order_id: { type: string, format: uuid }
    total_amount: { type: decimal, minimum: 0 }
    status: { type: string, enum: [pending, confirmed, shipped, delivered, cancelled] }
sla:
  freshness: { max_delay_hours: 1 }
  completeness: { min_percentage: 99.9 }
  accuracy: { duplicate_tolerance: 0.01 }
python scripts/data_quality_validator.py contract data.csv --contract orders_contract.yaml --json

Step 3 -- Add dbt tests for ongoing validation.

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range: { min_value: 0, max_value: 1000000 }

Validation checkpoint: Quality score >= 95%. Zero duplicates. Freshness under SLA threshold.


Architecture Decision Framework

QuestionBatchStreaming
Latency requirementHours to daysSeconds to minutes
Processing complexityComplex transforms, MLSimple aggregations
Cost sensitivityMore cost-effectiveHigher infra cost
Error handlingEasy reprocessingRequires careful DLQ design

Decision tree:

Real-time insight needed?
  Yes -> Exactly-once needed?
    Yes -> Kafka + Flink/Spark Structured Streaming
    No  -> Kafka + consumer groups
  No  -> Daily volume > 1TB?
    Yes -> Spark/Databricks
    No  -> dbt + warehouse compute
FeatureWarehouse (Snowflake/BigQuery)Lakehouse (Delta/Iceberg)
Best forBI, SQL analyticsML, unstructured data
Storage costHigher (proprietary)Lower (open formats)
FlexibilitySchema-on-writeSchema-on-read

Anti-Patterns

  1. Full table reload on every run -- use incremental loads with watermark columns.
  2. No dead letter queue -- failed records silently dropped. Always route failures to a DLQ.
  3. Timezone mismatch -- normalize all timestamps to UTC at extraction.
  4. Missing freshness checks -- add
    dbt source freshness
    before transforms start.
  5. Skipping schema drift detection -- use
    mergeSchema
    option or data contracts to catch new columns.

Troubleshooting

ProblemCauseSolution
Pipeline silently produces zero rowsTimezone mismatch on watermark columnNormalize to UTC; add row-count assertion
Spark shuffle 10x slower than expectedData skew on join keySalt the key or broadcast the smaller table
Airflow shows "no tasks to run"Circular dependency or import error
airflow dags list-import-errors
; fix import
dbt succeeds but dashboards staleSource freshness not checkedAdd
dbt source freshness
as prerequisite task
Kafka consumer lag grows unboundedThroughput < producer rateIncrease partitions, scale consumers, batch
max.poll.records
Quality validator false-positive anomaliesZ-score threshold too tightRaise threshold or switch to IQR mode

References

GuidePath
Pipeline Architecture
references/data_pipeline_architecture.md
Data Modeling Patterns
references/data_modeling_patterns.md
DataOps Best Practices
references/dataops_best_practices.md

Integration Points

SkillIntegration
senior-data-scientist
Feature engineering consumes curated mart data
senior-ml-engineer
ML pipelines depend on feature store tables
senior-devops
CI/CD for dbt, Airflow deployment, container orchestration
senior-architect
Architecture reviews for lakehouse vs warehouse decisions
code-reviewer
Pipeline code reviews for DAGs, dbt models, Spark jobs

Last Updated: April 2026 Version: 1.1.0