Claude-skill-registry dagster-local
Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dagster-local" ~/.claude/skills/majiayu000-claude-skill-registry-dagster-local && rm -rf "$T"
manifest:
skills/data/dagster-local/SKILL.mdsource content
Dagster Local Skill
Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs.
Quick Start
from scripts.dagster_client import DagsterClient client = DagsterClient() # defaults to http://localhost:3000/graphql # List all assets assets = client.list_assets() # Get recent runs runs = client.get_recent_runs(limit=10) # Get logs for a specific run logs = client.get_run_logs(run_id="abc123")
Configuration
client = DagsterClient(graphql_url="http://localhost:3000/graphql")
Available Operations
Query Operations
| Function | Purpose |
|---|---|
| List all code locations/repositories |
| List jobs in a repository |
| List assets in a repository |
| Get recent run history |
| Get detailed run info and status |
| Get event logs for a run |
| Get asset details and dependencies |
Mutation Operations
| Function | Purpose |
|---|---|
| Launch a job run |
| Materialize an asset |
| Terminate an in-progress run |
Kubernetes Integration
When Dagster runs on K8s (each run = separate pod):
from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods # Get pod logs for a specific run logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster") # List all Dagster-related pods pods = get_dagster_pods(namespace="dagster")
Debugging Workflow
- Check recent runs:
to find failed runsget_recent_runs() - Get run details:
for status and error summaryget_run_info(run_id) - Get Dagster logs:
for step-level eventsget_run_logs(run_id) - Get pod logs (K8s):
for stdout/stderrget_pod_logs_for_run(run_id)
Common Patterns
Wait for Run Completion
import time def wait_for_run(client, run_id, timeout=300): start = time.time() while time.time() - start < timeout: info = client.get_run_info(run_id) status = info.get("status") if status in ["SUCCESS", "FAILURE", "CANCELED"]: return info time.sleep(5) raise TimeoutError(f"Run {run_id} did not complete")
Get Failure Reason
def get_failure_reason(client, run_id): logs = client.get_run_logs(run_id) failures = [e for e in logs.get("events", []) if "error" in e] return failures[-1] if failures else None
GraphQL Reference
For advanced queries, see
references/graphql_queries.md.