Claude-skill-registry etl-patterns
Production ETL patterns orchestrator. Routes to core reliability patterns and incremental load strategies.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/etl-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-etl-patterns && rm -rf "$T"
manifest:
skills/data/etl-patterns/SKILL.mdsource content
ETL Patterns
Orchestrator for production-grade Extract-Transform-Load patterns.
Skill Routing
| Need | Skill | Content |
|---|---|---|
| Reliability patterns | | Idempotency, checkpointing, error handling, chunking, retry, logging |
| Load strategies | | Backfill, timestamp-based, CDC, pipeline orchestration |
Pattern Selection Guide
By Reliability Need
| Need | Pattern | Skill |
|---|---|---|
| Repeatable runs | Idempotency | |
| Resume after failure | Checkpointing | |
| Handle bad records | Error handling + DLQ | |
| Memory management | Chunked processing | |
| Network resilience | Retry with backoff | |
| Observability | Structured logging | |
By Load Strategy
| Scenario | Pattern | Skill |
|---|---|---|
| Small tables (<100K) | Full refresh | |
| Large tables | Timestamp incremental | |
| Real-time sync | CDC events | |
| Historical migration | Parallel backfill | |
| Zero-downtime refresh | Swap pattern | |
| Multi-step pipelines | Pipeline orchestration | |
Quick Reference
Idempotency Options
# Small datasets: Delete-then-insert # Large datasets: UPSERT on conflict # Change detection: Row hash comparison
Load Strategy Decision
Is table < 100K rows? → Full refresh Has reliable timestamp column? → Timestamp incremental Source supports CDC? → CDC event processing Need zero downtime? → Swap pattern (temp table → rename) One-time historical load? → Parallel backfill with date ranges
Common Pipeline Structure
# 1. Setup checkpoint = Checkpoint('.etl_checkpoint.json') processor = ETLProcessor() # 2. Extract (with incremental) df = incremental_by_timestamp(source_table, 'updated_at') # 3. Transform (with error handling) transformed = processor.process_batch(df.to_dict('records')) # 4. Load (with idempotency) upsert_records(pd.DataFrame(transformed)) # 5. Checkpoint checkpoint.set_last_processed('sync', df['updated_at'].max()) # 6. Handle failures processor.save_failures('failures/')
Related Skills
- Validate data quality during ETLdata-validation
- Monitor data quality metricsdata-quality
- DataFrame transformationspandas-coder