Claude-skills flowing
Lightweight DAG workflow runner with checkpoint resume and detachable tasks. Use when orchestrating 3+ sequential or parallel tool calls into a single invocation, or when pipelines need resume-from-failure without re-running succeeded steps.
install
source · Clone the upstream repo
git clone https://github.com/oaustegard/claude-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/oaustegard/claude-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/flowing" ~/.claude/skills/oaustegard-claude-skills-flowing && rm -rf "$T"
manifest:
flowing/SKILL.mdsource content
Flowing — DAG Workflow Runner
Batch independent operations into one
python3 invocation. Declare steps, wire dependencies, run once.
Quick Start
from flowing import task, Flow @task def fetch_data(): return {"items": [1, 2, 3]} @task(depends_on=[fetch_data]) def process(fetch_data): return sum(fetch_data["items"]) @task(depends_on=[process]) def store(process): print(f"Result: {process}") Flow(store).run()
Core API
@task
decorator
@task@task( depends_on=[other_task], # DAG edges retry=2, # retry count (0 = no retry) retry_backoff_base_ms=1000, retry_max_backoff_ms=30_000, timeout_s=60.0, detached=True, # non-blocking side-effect name="custom_name", # override function name ) def my_step(other_task): # param name = dependency task name return result
Flow
class
Flowflow = Flow(terminal_task, max_workers=5, fail_fast=True) results = flow.run() # execute full DAG flow.summary() # human-readable status flow.value(some_task) # get succeeded task's return value
Resume from failure
When a step fails mid-pipeline, fix the issue and continue without re-running succeeded steps:
flow = Flow(terminal) results = flow.run() # step_3 fails flow.override(step_3, corrected_value) # inject fix results = flow.resume() # step_1, step_2 cached; step_4+ runs
: Resets FAILED/SKIPPED tasks, keeps SUCCEEDED results cachedflow.resume()
: Manually inject a succeeded resultflow.override(task_def, value)
Detached tasks (non-blocking side-effects)
@task(depends_on=[create_issue], detached=True) def store_memory(create_issue): remember(create_issue["url"], ...)
- Run in a final layer after the main DAG completes
- Failures collected in
, never triggerflow.detached_failuresfail_fast - Dependencies must all be SUCCEEDED (same as normal tasks)
When to use
- 3+ independent operations (recall, SQL, web search) that can parallelize
- Multi-step pipelines where late failures shouldn't waste early work
- Side-effects (memory storage, notifications) that shouldn't block the critical path
When NOT to use
- Next step depends on reasoning about prior result (use a think loop)
- Single sequential operation
- Async/distributed workflows (this is single-container, ThreadPoolExecutor)