Skills dlt
install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/dlt" ~/.claude/skills/terminalskills-skills-dlt && rm -rf "$T"
manifest:
skills/dlt/SKILL.mdsafety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- pip install
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
dlt (Data Load Tool) — Python-First Data Ingestion
You are an expert in dlt, the open-source Python library for building data pipelines. You help developers load data from any API, file, or database into warehouses and lakes using simple Python decorators — with automatic schema inference, incremental loading, and built-in data contracts. dlt is the "requests library for data pipelines."
Core Capabilities
Basic Pipeline
import dlt # Simplest pipeline: Python generator → warehouse @dlt.resource(write_disposition="append") def github_events(): """Load GitHub events for a repository.""" import requests response = requests.get("https://api.github.com/repos/org/repo/events") yield from response.json() # Run pipeline pipeline = dlt.pipeline( pipeline_name="github_events", destination="bigquery", # or: postgres, snowflake, duckdb, motherduck dataset_name="raw_github", ) load_info = pipeline.run(github_events()) print(load_info) # Schema inferred automatically
Incremental Loading
@dlt.resource( write_disposition="merge", # Upsert: update existing, insert new primary_key="id", ) def orders( updated_at=dlt.sources.incremental( "updated_at", initial_value="2025-01-01T00:00:00Z" ) ): """Load orders incrementally — only new/changed since last run. dlt tracks the cursor automatically between runs. No need to store state manually. """ import requests page = 1 while True: response = requests.get("https://api.shop.com/orders", params={ "updated_after": updated_at.last_value, "page": page, "per_page": 100, }) data = response.json() if not data: break yield from data page += 1
REST API Source (Declarative)
from dlt.sources.rest_api import rest_api_source # Declarative API source — no code needed for standard REST APIs source = rest_api_source({ "client": { "base_url": "https://api.hubspot.com/crm/v3/", "auth": { "type": "bearer", "token": dlt.secrets["hubspot_token"] }, "paginator": { "type": "offset", "limit": 100, "offset_param": "offset" }, }, "resources": [ { "name": "contacts", "endpoint": { "path": "objects/contacts" }, "write_disposition": "merge", "primary_key": "id", }, { "name": "deals", "endpoint": { "path": "objects/deals" }, "write_disposition": "merge", "primary_key": "id", }, ], }) pipeline = dlt.pipeline(destination="bigquery", dataset_name="raw_hubspot") pipeline.run(source)
Data Contracts
# Enforce schema contracts — fail loudly on unexpected changes @dlt.resource( write_disposition="merge", primary_key="id", columns={ "id": {"data_type": "bigint", "nullable": False}, "email": {"data_type": "text", "nullable": False}, "plan": {"data_type": "text", "nullable": False}, "mrr_cents": {"data_type": "bigint"}, }, schema_contract="evolve", # "freeze" | "evolve" | "discard_value" | "discard_row" ) def customers(): # If API returns unexpected fields, dlt handles per contract setting yield from fetch_customers()
Installation
pip install dlt[bigquery] # + destination adapter # Other destinations: dlt[snowflake], dlt[postgres], dlt[duckdb], dlt[motherduck]
Best Practices
- Start with DuckDB — Develop locally with
, switch to BigQuery/Snowflake for productiondestination="duckdb" - Incremental for APIs — Use
for stateful loading; dlt tracks cursor between runsdlt.sources.incremental - REST API source — Use the declarative
for standard REST APIs; write custom resources only for complex APIsrest_api_source - Merge for entities — Use
withwrite_disposition="merge"
for entity tables;primary_key
for event streamsappend - Schema contracts — Set
in production to catch breaking API changes immediatelyschema_contract="freeze" - Secrets management — Use
backed by environment variables ordlt.secrets["key"].dlt/secrets.toml - Transformations — Use
for row-level transforms during loading; heavier transforms belong in dbtadd_map() - Deploy anywhere — dlt is a library, not a service; deploy in cron, Airflow, Dagster, GitHub Actions, or Lambda