Vibecosystem data-pipeline-patterns

ETL/ELT patterns, batch vs streaming, idempotency, data quality framework, and pipeline orchestration

install
source · Clone the upstream repo
git clone https://github.com/vibeeval/vibecosystem
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/vibeeval/vibecosystem "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data-pipeline-patterns" ~/.claude/skills/vibeeval-vibecosystem-data-pipeline-patterns && rm -rf "$T"
manifest: skills/data-pipeline-patterns/SKILL.md
source content

Data Pipeline Patterns

ETL vs ELT Decision

KriterETLELT
Transform locationPipeline'daData warehouse'da
Data volumeKüçük-ortaBüyük
FlexibilityDüşükYüksek
CostCompute-heavyStorage-heavy
Use caseLegacy, complianceModern analytics

Batch vs Streaming

KriterBatchStreaming
LatencyDakika-saatSaniye-milisaniye
ComplexityDüşükYüksek
CostDüşükYüksek
Use caseReporting, ETLReal-time alerts, dashboards
ToolAirflow, dbtKafka Streams, Flink

Idempotency Patterns

# Pattern 1: Upsert
INSERT INTO target (id, name, updated_at)
VALUES (%(id)s, %(name)s, %(ts)s)
ON CONFLICT (id) DO UPDATE SET
  name = EXCLUDED.name,
  updated_at = EXCLUDED.updated_at

# Pattern 2: Partition overwrite
DELETE FROM target WHERE partition_date = '2026-03-14';
INSERT INTO target SELECT * FROM staging WHERE partition_date = '2026-03-14';

# Pattern 3: Checkpoint
last_checkpoint = get_checkpoint('pipeline_x')
new_data = source.query(f"WHERE updated_at > '{last_checkpoint}'")
process(new_data)
save_checkpoint('pipeline_x', max(new_data.updated_at))

Data Quality Framework

import pandera as pa

schema = pa.DataFrameSchema({
    "user_id": pa.Column(int, pa.Check.gt(0), nullable=False),
    "email": pa.Column(str, pa.Check.str_matches(r'^.+@.+\..+$')),
    "age": pa.Column(int, pa.Check.in_range(0, 150), nullable=True),
    "created_at": pa.Column(pa.DateTime, pa.Check.less_than_or_equal_to(pd.Timestamp.now()))
})

validated_df = schema.validate(df)  # Fail on invalid data

Quality Dimensions

DimensionKontrolTool
CompletenessNULL ratio < thresholdGreat Expectations
AccuracyValue range checkspandera
FreshnessLast update < SLAAirflow sensor
UniquenessDuplicate checkSQL DISTINCT
ConsistencyCross-table referential integritydbt test

Pipeline Orchestration

# Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG('daily_etl', schedule='0 6 * * *', catchup=False) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_fn)
    transform = PythonOperator(task_id='transform', python_callable=transform_fn)
    load = PythonOperator(task_id='load', python_callable=load_fn)
    validate = PythonOperator(task_id='validate', python_callable=validate_fn)

    extract >> transform >> load >> validate

Checklist

  • Pipeline idempotent (rerun safe)
  • Data quality checks her adımda
  • Dead letter queue (failed records)
  • Monitoring + alerting aktif
  • Schema evolution handled
  • Backfill mekanizması var
  • Retry logic (exponential backoff)
  • Data lineage tracked

Anti-Patterns

  • Pipeline'da hardcoded credentials
  • Idempotent olmayan transform
  • Data quality check'siz load
  • Monolithic pipeline (parçala)
  • Silent failure (error swallowing)