Claude-skill-registry etl-incremental-patterns
Incremental data loading patterns including backfill strategies, CDC, timestamp-based loads, and pipeline orchestration.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/etl-incremental-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-etl-incremental-patterns && rm -rf "$T"
manifest:
skills/data/etl-incremental-patterns/SKILL.mdsource content
ETL Incremental Patterns
Patterns for incremental data loading and backfill operations.
Backfill Strategy
from datetime import date, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed def backfill_date_range( start: date, end: date, process_fn: callable, parallel: int = 4 ) -> None: """Backfill data for a date range.""" dates = [] current = start while current <= end: dates.append(current) current += timedelta(days=1) # Process in parallel with controlled concurrency with ThreadPoolExecutor(max_workers=parallel) as executor: futures = {executor.submit(process_fn, d): d for d in dates} for future in as_completed(futures): d = futures[future] try: future.result() print(f"Completed: {d}") except Exception as e: print(f"Failed: {d} - {e}") # Usage backfill_date_range( start=date(2024, 1, 1), end=date(2024, 3, 31), process_fn=process_daily_data, parallel=4 )
Incremental Load Patterns
Timestamp-Based Incremental
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame: last_run = get_last_run_timestamp(table) query = f""" SELECT * FROM {table} WHERE {timestamp_col} > :last_run ORDER BY {timestamp_col} """ df = pd.read_sql(query, engine, params={'last_run': last_run}) if not df.empty: set_last_run_timestamp(table, df[timestamp_col].max()) return df
Change Data Capture (CDC)
def process_cdc_events(events: list[dict]) -> None: for event in events: op = event['operation'] # INSERT, UPDATE, DELETE data = event['data'] if op == 'DELETE': soft_delete(data['id']) else: upsert(data)
Full Refresh with Swap
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None: temp_table = f"{table}_temp" df.to_sql(temp_table, engine, if_exists='replace', index=False) with engine.begin() as conn: conn.execute(text(f"DROP TABLE IF EXISTS {table}_old")) conn.execute(text(f"ALTER TABLE {table} RENAME TO {table}_old")) conn.execute(text(f"ALTER TABLE {temp_table} RENAME TO {table}")) conn.execute(text(f"DROP TABLE {table}_old"))
Pipeline Orchestration
from enum import Enum from dataclasses import dataclass, field class StepStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped" @dataclass class PipelineStep: name: str func: callable dependencies: list[str] = field(default_factory=list) status: StepStatus = StepStatus.PENDING error: str | None = None class Pipeline: def __init__(self, name: str): self.name = name self.steps: dict[str, PipelineStep] = {} def add_step(self, name: str, func: callable, depends_on: list[str] = None): self.steps[name] = PipelineStep(name, func, depends_on or []) def run(self) -> bool: for step in self._topological_sort(): # Skip if dependencies failed if any(self.steps[d].status == StepStatus.FAILED for d in step.dependencies): step.status = StepStatus.SKIPPED continue step.status = StepStatus.RUNNING try: step.func() step.status = StepStatus.SUCCESS except Exception as e: step.status = StepStatus.FAILED step.error = str(e) return all(s.status == StepStatus.SUCCESS for s in self.steps.values()) def _topological_sort(self) -> list[PipelineStep]: # Implementation of topological sort for dependency ordering ...
Load Strategy Decision Matrix
| Scenario | Pattern | When to Use |
|---|---|---|
| Small tables (<100K rows) | Full refresh | Daily/hourly loads |
| Large tables with timestamps | Timestamp incremental | Continuous sync |
| Source supports CDC | CDC events | Real-time updates |
| One-time historical load | Parallel backfill | Initial migration |
| Critical tables | Swap pattern | Zero-downtime refresh |