Claude-skills flowing

Lightweight DAG workflow runner with checkpoint resume and detachable tasks. Use when orchestrating 3+ sequential or parallel tool calls into a single invocation, or when pipelines need resume-from-failure without re-running succeeded steps.

install
source · Clone the upstream repo
git clone https://github.com/oaustegard/claude-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/oaustegard/claude-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/flowing" ~/.claude/skills/oaustegard-claude-skills-flowing && rm -rf "$T"
manifest: flowing/SKILL.md
source content

Flowing — DAG Workflow Runner

Batch independent operations into one

python3
invocation. Declare steps, wire dependencies, run once.

Quick Start

from flowing import task, Flow

@task
def fetch_data():
    return {"items": [1, 2, 3]}

@task(depends_on=[fetch_data])
def process(fetch_data):
    return sum(fetch_data["items"])

@task(depends_on=[process])
def store(process):
    print(f"Result: {process}")

Flow(store).run()

Core API

@task
decorator

@task(
    depends_on=[other_task],  # DAG edges
    retry=2,                  # retry count (0 = no retry)
    retry_backoff_base_ms=1000,
    retry_max_backoff_ms=30_000,
    timeout_s=60.0,
    detached=True,            # non-blocking side-effect
    name="custom_name",       # override function name
)
def my_step(other_task):      # param name = dependency task name
    return result

Flow
class

flow = Flow(terminal_task, max_workers=5, fail_fast=True)
results = flow.run()          # execute full DAG
flow.summary()                # human-readable status
flow.value(some_task)         # get succeeded task's return value

Resume from failure

When a step fails mid-pipeline, fix the issue and continue without re-running succeeded steps:

flow = Flow(terminal)
results = flow.run()                    # step_3 fails
flow.override(step_3, corrected_value)  # inject fix
results = flow.resume()                 # step_1, step_2 cached; step_4+ runs
  • flow.resume()
    : Resets FAILED/SKIPPED tasks, keeps SUCCEEDED results cached
  • flow.override(task_def, value)
    : Manually inject a succeeded result

Detached tasks (non-blocking side-effects)

@task(depends_on=[create_issue], detached=True)
def store_memory(create_issue):
    remember(create_issue["url"], ...)
  • Run in a final layer after the main DAG completes
  • Failures collected in
    flow.detached_failures
    , never trigger
    fail_fast
  • Dependencies must all be SUCCEEDED (same as normal tasks)

When to use

  • 3+ independent operations (recall, SQL, web search) that can parallelize
  • Multi-step pipelines where late failures shouldn't waste early work
  • Side-effects (memory storage, notifications) that shouldn't block the critical path

When NOT to use

  • Next step depends on reasoning about prior result (use a think loop)
  • Single sequential operation
  • Async/distributed workflows (this is single-container, ThreadPoolExecutor)