Claude-skill-registry data-flow

Rooms as pipeline nodes, exits as edges, objects as messages

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/data-flow" ~/.claude/skills/majiayu000-claude-skill-registry-data-flow && rm -rf "$T"
manifest: skills/data/data-flow/SKILL.md
source content

Data Flow

"Rooms are nodes. Exits are edges. Thrown objects are messages."

MOOLLM's approach to building processing pipelines using rooms and objects. The filesystem IS the data flow network.

The Pattern

  • Rooms are processing stages (nodes)
  • Exits connect stages (edges)
  • Objects flow through as messages
  • THROW sends objects through exits
  • INBOX receives incoming objects
  • OUTBOX stages outgoing objects

Commands

CommandEffect
THROW obj exit
Send object through exit to destination
INBOX
List items waiting to be processed
NEXT
Get next item from inbox (FIFO)
PEEK
Look at next item without removing
STAGE obj exit
Add object to outbox for later throw
FLUSH
Throw all staged objects
FLUSH exit
Throw staged objects for specific exit

Room Structure

stage/
├── ROOM.yml       # Config and processor definition
├── inbox/         # Incoming queue (FIFO)
├── outbox/        # Staged for batch throwing
└── door-next/     # Exit to next stage

Processor Types

Script (Deterministic)

processor:
  type: script
  command: "python parse.py ${input}"

LLM (Semantic)

processor:
  type: llm
  prompt: |
    Analyze this document:
    - Extract key entities
    - Summarize in 3 sentences

Hybrid

processor:
  type: hybrid
  pre_process: "extract.py ${input}"
  llm_prompt: "Analyze extracted data"
  post_process: "format.py ${output}"

Mix and match. LLM for reasoning, scripts for transformation.

Example Pipeline

uploads/              # Raw files land here
├── inbox/
│   ├── doc-001.pdf
│   └── doc-002.pdf
└── door-parser/

parser/               # Extract text
├── script: parse.py
└── door-analyzer/

analyzer/             # LLM analyzes
├── prompt: "Summarize..."
├── door-output/
└── door-errors/

output/               # Final results
└── inbox/
    ├── doc-001-summary.yml
    └── doc-002-summary.yml

Processing Loop

> ENTER parser
Inbox: 2 items waiting.

> NEXT
Processing doc-001.pdf...
Text extracted.

> STAGE doc-001.txt door-analyzer
Staged.

> FLUSH
Throwing 2 items through door-analyzer...

Fan-Out (one-to-many)

routing_rules:
  - if: "priority == 'high'"
    throw_to: door-fast-track
  - if: "type == 'archive'"
    throw_to: door-archive
  - default: door-standard

Fan-In (many-to-one)

batch_size: 10
on_batch_complete: |
  Combine all results
  Generate summary report
  THROW report.yml door-output

Kilroy Mapping

MOOLLMKilroy
RoomNode
ExitEdge
THROWMessage passing
inbox/Input queue
Script processorDeterministic module
LLM processorLLM node