Claude-skill-registry dagster-orchestration
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dagster-skill" ~/.claude/skills/majiayu000-claude-skill-registry-dagster-orchestration && rm -rf "$T"
manifest:
skills/data/dagster-skill/SKILL.mdsource content
Dagster Orchestration with dbt Integration
Constitution Alignment
This skill enforces project principles:
- I. Technology Ownership: Dagster owns orchestration, dbt owns SQL
- II. Plugin-First Architecture: Orchestrator is pluggable (Dagster default, Airflow 3.x alternative)
- III. Enforced vs Pluggable: Iceberg format ENFORCED, compute engine PLUGGABLE
- VIII. Observability By Default: All operations emit OpenTelemetry traces and OpenLineage events
Related ADRs
| ADR | Decision | Relevance |
|---|---|---|
| ADR-0011 | Pluggable Orchestration | Dagster as default, Airflow 3.x as alternative |
| ADR-0009 | dbt Owns SQL | NEVER parse SQL in Python - dbt handles all transformations |
| ADR-0005 | Apache Iceberg Enforced | All tables MUST be Iceberg format |
| ADR-0010 | Target-Agnostic Compute | DuckDB default, Snowflake/Spark via dbt |
| ADR-0033 | Target Airflow 3.x | Alternative orchestrator option |
Critical Architecture: Catalog-as-Control-Plane
⚠️ NEVER write directly to storage. ALL table operations MUST flow through catalog:
┌─────────────────────────────────────────────────────────────┐ │ DAGSTER ORCHESTRATION │ │ (Schedule → Sensor → Asset Graph → Materialization) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ dbt TRANSFORMATIONS │ │ (SQL owns transformations - NEVER parse SQL in Python) │ └─────────────────────────────────────────────────────────────┘ │ ┌───────────────┼───────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ DuckDB │ │ Spark │ │Snowflake│ │(default)│ │ (scale) │ │(analytic)│ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └───────────────┼───────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ POLARIS / GLUE CATALOG (REST API) │ │ ⚡ CONTROL PLANE - ACID, Schema, Access, Governance ⚡ │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ ICEBERG TABLES │ │ (S3 / Azure / GCS) │ └─────────────────────────────────────────────────────────────┘
Why Catalog-as-Control-Plane?
- ACID transactions across ALL compute engines
- Schema evolution coordination (engines see same schema)
- Access control and governance (row/column masking)
- Multi-engine interoperability (DuckDB + Spark + dbt query same tables)
- See:
references/catalog-control-plane.md
Pluggable Orchestration Design
Design assets as pure functions runnable in ANY orchestrator:
| Concept | Dagster | Airflow | Prefect |
|---|---|---|---|
| Unit of Work | | | |
| Dependencies | Asset deps | Task deps | Task deps |
| Scheduling | | DAG schedule | Deployment |
| Event-driven | | Sensor | Event handlers |
| Configuration | | Connection/Variable | Block |
See:
references/orchestration-abstraction.md
Pre-Implementation Checklist
Step 1: Verify Runtime Environment
# ALWAYS run first python -c "import dagster; print(f'Dagster {dagster.__version__}')" python -c "import dagster_dbt; print(f'dagster-dbt {dagster_dbt.__version__}')" python -c "import dagster_iceberg; print(f'dagster-iceberg installed')" dbt --version
Step 2: Discover Existing Patterns
# Find Dagster definitions rg "@asset|@multi_asset|@dbt_assets" --type py rg "ConfigurableResource|IOManager" --type py rg "dg.Definitions|Definitions\(" --type py # Find dbt project find . -name "dbt_project.yml" find . -name "manifest.json" -path "*/target/*" # Check catalog configuration cat platform.yaml | grep -A 20 "catalogs:"
Step 3: Understand Platform Configuration
# Two-tier config: platform.yaml (credentials) + floe.yaml (logical refs) cat platform.yaml # Engineers NEVER see credentials in code cat floe.yaml # Data engineers reference: catalog: default
dbt Integration (Primary Pattern)
Pattern 1: Load dbt Assets from Manifest
from pathlib import Path from dagster import AssetExecutionContext, Definitions from dagster_dbt import DbtCliResource, DbtProject, dbt_assets dbt_project = DbtProject( project_dir=Path(__file__).parent / "dbt", packaged_project_dir=Path(__file__).parent / "dbt-project", ) dbt_project.prepare_if_dev() # Hot-reload in dev @dbt_assets(manifest=dbt_project.manifest_path) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): """Execute dbt build - yields Dagster events for each model.""" yield from dbt.cli(["build"], context=context).stream() defs = Definitions( assets=[my_dbt_assets], resources={"dbt": DbtCliResource(project_dir=dbt_project)}, )
Pattern 2: Custom DagsterDbtTranslator
from typing import Any, Mapping, Optional from dagster import AssetKey from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings class FloeDbTranslator(DagsterDbtTranslator): """Translator for floe-runtime architecture.""" def __init__(self): super().__init__( settings=DagsterDbtTranslatorSettings( enable_code_references=True, enable_source_tests_as_checks=True, ) ) def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: """Map dbt models to Dagster asset keys with namespace.""" schema = dbt_resource_props.get("schema", "default") name = dbt_resource_props["name"] return AssetKey([schema, name]) def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: """Group by dbt folder structure.""" fqn = dbt_resource_props.get("fqn", []) return fqn[1] if len(fqn) > 2 else None def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: """Extract governance metadata from dbt model meta.""" meta = dbt_resource_props.get("meta", {}) return { "classification": meta.get("classification"), "owner": meta.get("owner"), "sla": meta.get("sla"), }
See:
references/dbt-integration.md for complete patterns
Pattern 3: Partitioned dbt Assets
from dagster import DailyPartitionsDefinition daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01") @dbt_assets( manifest=dbt_project.manifest_path, partitions_def=daily_partitions, ) def partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): partition_date = context.partition_key yield from dbt.cli( ["build", "--vars", f'{{"run_date": "{partition_date}"}}'], context=context, ).stream()
Compute Target Integration
DuckDB (Default - Ephemeral Compute via dbt-duckdb)
DuckDB reads/writes via catalog ATTACH:
-- dbt-duckdb plugin automatically executes: ATTACH 'demo_catalog' AS polaris_catalog ( TYPE ICEBERG, CLIENT_ID '{{ env_var("POLARIS_CLIENT_ID") }}', CLIENT_SECRET '{{ env_var("POLARIS_CLIENT_SECRET") }}', ENDPOINT '{{ env_var("POLARIS_URI") }}' );
Snowflake (Analytical Compute)
-- External Iceberg via Polaris integration CREATE OR REPLACE ICEBERG TABLE gold.metrics CATALOG = 'polaris_catalog' EXTERNAL_VOLUME = 'iceberg_volume' AS SELECT * FROM silver.orders;
Spark (Distributed Compute)
@asset(kinds={"spark"}) def spark_asset(spark: SparkResource): spark.spark_session.sql(""" INSERT INTO polaris_catalog.gold.metrics SELECT * FROM polaris_catalog.silver.orders """)
See:
references/compute-abstraction.md
IO Manager Patterns
Iceberg IO Manager (Catalog-Controlled)
from dagster_iceberg.config import IcebergCatalogConfig from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager iceberg_io_manager = PyArrowIcebergIOManager( name="polaris_catalog", config=IcebergCatalogConfig( properties={ "type": "rest", "uri": "http://polaris:8181/api/catalog", "credential": f"{client_id}:{client_secret}", "warehouse": "demo_catalog", } ), namespace="default", )
See:
references/io-managers.md
Environment-Based Resource Switching
import os from dagster import EnvVar def get_resources_for_env() -> dict: env = os.getenv("DAGSTER_DEPLOYMENT", "local") base_resources = {"dbt": DbtCliResource(project_dir=dbt_project)} if env == "local": return {**base_resources, "io_manager": local_iceberg_io()} elif env == "production": return {**base_resources, "io_manager": prod_iceberg_io()}
Validation Workflow
Before Implementation
- Verified Dagster + dagster-dbt versions
- Located dbt project and manifest.json
- Understood catalog configuration (Polaris/Glue)
- Identified compute targets (DuckDB/Snowflake/Spark)
- Read
for CompiledArtifacts contract/docs/
During Implementation
- Using
for dbt models@dbt_assets - Custom DagsterDbtTranslator for metadata
- IO manager uses catalog (NOT direct storage writes)
- Resources configured per environment
- Partitions aligned with dbt vars
After Implementation
- Run
- verify assets appeardagster dev - Materialize assets manually
- Verify data lineage in UI
- Check Polaris catalog for table metadata
- Test schedules/sensors
Anti-Patterns to Avoid
❌ Don't write to Iceberg without going through catalog ❌ Don't hardcode compute logic (use dbt for SQL transforms) ❌ Don't mix Dagster partitions with dbt incremental without alignment ❌ Don't use deprecated
load_assets_from_dbt_manifest()
❌ Don't bypass DbtCliResource for dbt execution
❌ Don't store credentials in code (use EnvVar or secret_ref)
❌ Don't parse SQL in Python (dbt owns SQL)
Reference Documentation
| Document | Purpose |
|---|---|
| Complete dbt-Dagster patterns |
| DuckDB, Spark, Snowflake patterns |
| Iceberg IO managers, storage layer |
| Pluggable Airflow/Prefect patterns |
| CRITICAL architecture doc |
| Dagster SDK quick reference |
Quick Reference: Research Queries
When uncertain, search:
- "Dagster dbt_assets decorator examples 2025"
- "DagsterDbtTranslator custom implementation 2025"
- "dagster-iceberg PyArrowIcebergIOManager 2025"
- "DuckDB Iceberg REST catalog ATTACH 2025"
Remember: Design for abstraction. Dagster orchestrates, dbt owns SQL, catalog controls storage.