Claude-skill-registry dagster-development

Expert guidance for Dagster data orchestration including assets, resources, schedules, sensors, partitions, testing, and ETL patterns. Use when building or extending Dagster projects, writing assets, configuring automation, or integrating with dbt/dlt/Sling.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dagster-development" ~/.claude/skills/majiayu000-claude-skill-registry-dagster-development && rm -rf "$T"
manifest: skills/data/dagster-development/SKILL.md
source content

Dagster Development Expert

Quick Reference

If you're writing...Check this section/reference
@dg.asset
Assets or
references/assets.md
ConfigurableResource
Resources or
references/resources.md
@dg.schedule
or
ScheduleDefinition
Automation or
references/automation.md
@dg.sensor
Sensors or
references/automation.md
PartitionsDefinition
Partitions or
references/automation.md
Tests with
dg.materialize()
Testing or
references/testing.md
@asset_check
references/testing.md#asset-checks
@dlt_assets
or
@sling_assets
references/etl-patterns.md
@dbt_assets
dbt Integration or
dbt-development
skill
Definitions
or code locations
references/project-structure.md

Core Concepts

Asset: A persistent object (table, file, model) that your pipeline produces. Define with

@dg.asset
.

Resource: External services/tools (databases, APIs) shared across assets. Define with

ConfigurableResource
.

Job: A selection of assets to execute together. Create with

dg.define_asset_job()
.

Schedule: Time-based automation for jobs. Create with

dg.ScheduleDefinition
.

Sensor: Event-driven automation that watches for changes. Define with

@dg.sensor
.

Partition: Logical divisions of data (by date, category). Define with

PartitionsDefinition
.

Definitions: The container for all Dagster objects in a code location.


Assets Quick Reference

Basic Asset

import dagster as dg

@dg.asset
def my_asset() -> None:
    """Asset description appears in the UI."""
    # Your computation logic here
    pass

Asset with Dependencies

@dg.asset
def downstream_asset(upstream_asset) -> dict:
    """Depends on upstream_asset by naming it as a parameter."""
    return {"processed": upstream_asset}

Asset with Metadata

@dg.asset(
    group_name="analytics",
    key_prefix=["warehouse", "staging"],
    description="Cleaned customer data",
)
def customers() -> None:
    pass

Naming: Use nouns describing what is produced (

customers
,
daily_revenue
), not verbs (
load_customers
).


Resources Quick Reference

Define a Resource

from dagster import ConfigurableResource

class DatabaseResource(ConfigurableResource):
    connection_string: str
    
    def query(self, sql: str) -> list:
        # Implementation here
        pass

Use in Assets

@dg.asset
def my_asset(database: DatabaseResource) -> None:
    results = database.query("SELECT * FROM table")

Register in Definitions

dg.Definitions(
    assets=[my_asset],
    resources={"database": DatabaseResource(connection_string="...")},
)

Automation Quick Reference

Schedule

import dagster as dg
from my_project.defs.jobs import my_job

my_schedule = dg.ScheduleDefinition(
    job=my_job,
    cron_schedule="0 0 * * *",  # Daily at midnight
)

Common Cron Patterns

PatternMeaning
0 * * * *
Every hour
0 0 * * *
Daily at midnight
0 0 * * 1
Weekly on Monday
0 0 1 * *
Monthly on the 1st
0 0 5 * *
Monthly on the 5th

Sensors Quick Reference

Basic Sensor Pattern

@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
    # 1. Read cursor (previous state)
    previous_state = json.loads(context.cursor) if context.cursor else {}
    current_state = {}
    runs_to_request = []
    
    # 2. Check for changes
    for item in get_items_to_check():
        current_state[item.id] = item.modified_at
        if item.id not in previous_state or previous_state[item.id] != item.modified_at:
            runs_to_request.append(dg.RunRequest(
                run_key=f"run_{item.id}_{item.modified_at}",
                run_config={...}
            ))
    
    # 3. Return result with updated cursor
    return dg.SensorResult(
        run_requests=runs_to_request,
        cursor=json.dumps(current_state)
    )

Key: Use cursors to track state between sensor evaluations.


Partitions Quick Reference

Time-Based Partition

weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")

@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
    partition_key = context.partition_key  # e.g., "2023-01-01"
    # Process data for this partition

Static Partition

region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])

@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
    region = context.partition_key

Partition Types

TypeUse Case
DailyPartitionsDefinition
One partition per day
WeeklyPartitionsDefinition
One partition per week
MonthlyPartitionsDefinition
One partition per month
StaticPartitionsDefinition
Fixed set of partitions
MultiPartitionsDefinition
Combine multiple partition dimensions

Testing Quick Reference

Direct Function Testing

def test_my_asset():
    result = my_asset()
    assert result == expected_value

Testing with Materialization

def test_asset_graph():
    result = dg.materialize(
        assets=[asset_a, asset_b],
        resources={"database": mock_database},
    )
    assert result.success
    assert result.output_for_node("asset_b") == expected

Mocking Resources

from unittest.mock import Mock

def test_with_mocked_resource():
    mocked_resource = Mock()
    mocked_resource.query.return_value = [{"id": 1}]
    
    result = dg.materialize(
        assets=[my_asset],
        resources={"database": mocked_resource},
    )
    assert result.success

Asset Checks

@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
    return dg.AssetCheckResult(
        passed=len(my_asset) > 0,
        metadata={"row_count": len(my_asset)},
    )

dbt Integration

For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the

dbt-development
skill.

Basic dbt Assets

from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path

dbt_project_dir = Path(__file__).parent / "dbt_project"

@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

dbt Resource

dg.Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)

Full patterns: See Dagster dbt docs


When to Load References

Load
references/assets.md
when:

  • Defining complex asset dependencies
  • Adding metadata, groups, or key prefixes
  • Working with asset factories
  • Understanding asset materialization patterns

Load
references/resources.md
when:

  • Creating custom
    ConfigurableResource
    classes
  • Integrating with databases, APIs, or cloud services
  • Understanding resource scoping and lifecycle

Load
references/automation.md
when:

  • Creating schedules with complex cron patterns
  • Building sensors with cursors and state management
  • Implementing partitions and backfills
  • Automating dbt or other integration runs

Load
references/testing.md
when:

  • Writing unit tests for assets
  • Mocking resources and dependencies
  • Using
    dg.materialize()
    for integration tests
  • Creating asset checks for data validation

Load
references/etl-patterns.md
when:

  • Using dlt for embedded ETL
  • Using Sling for database replication
  • Loading data from files or APIs
  • Integrating external ETL tools

Load
references/project-structure.md
when:

  • Setting up a new Dagster project
  • Configuring
    Definitions
    and code locations
  • Using
    dg
    CLI for scaffolding
  • Organizing large projects with Components

Project Structure

Recommended Layout

my_project/
├── pyproject.toml
├── src/
│   └── my_project/
│       ├── definitions.py     # Main Definitions
│       └── defs/
│           ├── assets/
│           │   ├── __init__.py
│           │   └── my_assets.py
│           ├── jobs.py
│           ├── schedules.py
│           ├── sensors.py
│           └── resources.py
└── tests/
    └── test_assets.py

Definitions Pattern (Modern)

# src/my_project/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder

@definitions
def defs():
    return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)

Scaffolding with dg CLI

# Create new project
uvx create-dagster my_project

# Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py

# Scaffold schedule
dg scaffold defs dagster.schedule schedules.py

# Scaffold sensor
dg scaffold defs dagster.sensor sensors.py

# Validate definitions
dg check defs

Common Patterns

Job Definition

trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    selection=["taxi_trips", "taxi_zones"],
)

Run Configuration

from dagster import Config

class MyAssetConfig(Config):
    filename: str
    limit: int = 100

@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
    print(f"Processing {config.filename} with limit {config.limit}")

Asset Dependencies with External Sources

@dg.asset(deps=["external_table"])
def derived_asset() -> None:
    """Depends on external_table which isn't managed by Dagster."""
    pass

Anti-Patterns to Avoid

Anti-PatternBetter Approach
Hardcoding credentials in assetsUse
ConfigurableResource
with env vars
Giant assets that do everythingSplit into focused, composable assets
Ignoring asset return typesUse type annotations for clarity
Skipping tests for assetsTest assets like regular Python functions
Not using partitions for time-seriesUse
DailyPartitionsDefinition
etc.
Putting all assets in one fileOrganize by domain in separate modules

CLI Quick Reference

# Development
dg dev                          # Start Dagster UI
dg check defs                   # Validate definitions

# Scaffolding
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py

# Production
dagster job execute -j my_job   # Execute a job
dagster asset materialize -a my_asset  # Materialize an asset

References

  • Assets:
    references/assets.md
    - Detailed asset patterns
  • Resources:
    references/resources.md
    - Resource configuration
  • Automation:
    references/automation.md
    - Schedules, sensors, partitions
  • Testing:
    references/testing.md
    - Testing patterns and asset checks
  • ETL Patterns:
    references/etl-patterns.md
    - dlt, Sling, file/API ingestion
  • Project Structure:
    references/project-structure.md
    - Definitions, Components
  • Official Docs: https://docs.dagster.io
  • API Reference: https://docs.dagster.io/api/dagster