Claude-skill-registry dlt
dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dlt" ~/.claude/skills/majiayu000-claude-skill-registry-dlt && rm -rf "$T"
manifest:
skills/data/dlt/SKILL.mdsource content
dlt Data Load Tool
Core Concepts
dlt handles extract, normalize, and load. You define sources and resources; dlt handles schema inference, table creation, and loading.
Source Structure
src/signalroom/sources/{source_name}/ └── __init__.py # Contains @dlt.source and @dlt.resource
Creating a New Source
import dlt from signalroom.common import settings @dlt.source(name="my_source") def my_source(): """Source docstring appears in dlt metadata.""" @dlt.resource(write_disposition="append", primary_key="id") def my_resource(): yield from fetch_data() return [my_resource]
Register in Pipeline Runner
Add to
src/signalroom/pipelines/runner.py:
SOURCES = { "my_source": "signalroom.sources.my_source:my_source", }
Write Dispositions
| Mode | Use Case | Behavior |
|---|---|---|
| Immutable events (clicks, conversions) | Always insert new rows |
| Mutable entities (campaigns, contacts) | Upsert by primary_key |
| Full refresh (feature flags, config) | Drop and recreate table |
Incremental Loading
Only fetch new data since last run:
@dlt.resource(write_disposition="append", primary_key="id") def events( updated_at: dlt.sources.incremental[str] = dlt.sources.incremental( "updated_at", initial_value="2024-01-01" ) ): # Only fetches records after last loaded timestamp yield from api.get_events(since=updated_at.last_value)
WARNING: High-Volume Sources
dlt.sources.incremental tracks every row for deduplication. If many rows share the same cursor value, this causes O(n²) performance.
| Rows per cursor value | Overhead | Recommendation |
|---|---|---|
| < 100 | Negligible | Use incremental |
| 100 - 1,000 | Noticeable | Monitor performance |
| > 1,000 | Severe | Use file-level state instead |
For high-volume sources (like S3 CSV imports), use
dlt.current.resource_state() for file-level tracking:
@dlt.resource(write_disposition="merge", primary_key=["file_name", "row_id"]) def csv_resource(): state = dlt.current.resource_state() last_date = state.get("last_file_date", "2024-01-01") for file in get_files_since(last_date): yield from process_file(file) state["last_file_date"] = file.date # Manual state update
Primary Keys
Required for
merge disposition:
# Single key @dlt.resource(primary_key="id") # Composite key @dlt.resource(primary_key=["date", "affiliate_id"])
Schema Evolution
dlt auto-evolves schemas. New columns added automatically. To see current schema:
SELECT * FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 5;
Debugging Failed Loads
Check dlt metadata tables
-- Recent loads SELECT load_id, schema_name, status, inserted_at FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 10; -- Pipeline state SELECT * FROM {schema}._dlt_pipeline_state;
Common Errors
"Primary key violation"
- Using
when you needappendmerge - Duplicate records in source data
"Column type mismatch"
- Schema evolved incompatibly
- Fix: Drop table or add explicit column hints
"Connection refused"
- Check Supabase pooler settings (port 6543, user format)
Drop Pending Packages
If pipeline is stuck:
dlt pipeline {pipeline_name} drop-pending-packages
SignalRoom Sources
| Source | Write Mode | Primary Key | State Tracking |
|---|---|---|---|
| merge | | File-level () |
| merge | | Row-level () |
| merge | | Row-level () |
Testing Locally
Use DuckDB for fast local testing:
pipeline = dlt.pipeline( pipeline_name="test", destination="duckdb", dataset_name="test" )
Resources
- dlt Documentation
- Write Dispositions
- Schema Evolution
- SignalRoom API Reference:
— Live docs, auth, request/response examplesdocs/API_REFERENCE.md