Claude-skill-registry dagster-local

Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dagster-local" ~/.claude/skills/majiayu000-claude-skill-registry-dagster-local && rm -rf "$T"
manifest: skills/data/dagster-local/SKILL.md
source content

Dagster Local Skill

Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs.

Quick Start

from scripts.dagster_client import DagsterClient

client = DagsterClient()  # defaults to http://localhost:3000/graphql

# List all assets
assets = client.list_assets()

# Get recent runs
runs = client.get_recent_runs(limit=10)

# Get logs for a specific run
logs = client.get_run_logs(run_id="abc123")

Configuration

client = DagsterClient(graphql_url="http://localhost:3000/graphql")

Available Operations

Query Operations

FunctionPurpose
list_repositories()
List all code locations/repositories
list_jobs(repo_location, repo_name)
List jobs in a repository
list_assets(repo_location, repo_name)
List assets in a repository
get_recent_runs(limit)
Get recent run history
get_run_info(run_id)
Get detailed run info and status
get_run_logs(run_id)
Get event logs for a run
get_asset_info(asset_key)
Get asset details and dependencies

Mutation Operations

FunctionPurpose
launch_job(repo_location, repo_name, job_name, config)
Launch a job run
materialize_asset(asset_key, repo_location, repo_name)
Materialize an asset
terminate_run(run_id)
Terminate an in-progress run

Kubernetes Integration

When Dagster runs on K8s (each run = separate pod):

from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods

# Get pod logs for a specific run
logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster")

# List all Dagster-related pods
pods = get_dagster_pods(namespace="dagster")

Debugging Workflow

  1. Check recent runs:
    get_recent_runs()
    to find failed runs
  2. Get run details:
    get_run_info(run_id)
    for status and error summary
  3. Get Dagster logs:
    get_run_logs(run_id)
    for step-level events
  4. Get pod logs (K8s):
    get_pod_logs_for_run(run_id)
    for stdout/stderr

Common Patterns

Wait for Run Completion

import time

def wait_for_run(client, run_id, timeout=300):
    start = time.time()
    while time.time() - start < timeout:
        info = client.get_run_info(run_id)
        status = info.get("status")
        if status in ["SUCCESS", "FAILURE", "CANCELED"]:
            return info
        time.sleep(5)
    raise TimeoutError(f"Run {run_id} did not complete")

Get Failure Reason

def get_failure_reason(client, run_id):
    logs = client.get_run_logs(run_id)
    failures = [e for e in logs.get("events", []) if "error" in e]
    return failures[-1] if failures else None

GraphQL Reference

For advanced queries, see

references/graphql_queries.md
.