Claude-skill-registry dbos-patterns
DBOS durable execution patterns and CRITICAL constraints for ChainGraph executor. Use when working on workflows, steps, execution, or any DBOS-related code. Contains MUST-FOLLOW constraints about what can be called from workflows vs steps. Triggers: dbos, workflow, step, durable, execution, startWorkflow, writeStream, recv, send, runStep, atomic, checkpoint, WorkflowQueue, queue, cancelWorkflow, Promise.allSettled. (project)
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/dbos-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-dbos-patterns && rm -rf "$T"
skills/data/dbos-patterns/SKILL.mdDBOS Patterns for ChainGraph
This skill covers DBOS (Database-Oriented Operating System) patterns used in the ChainGraph executor. CRITICAL: Contains constraints that agents MUST follow to avoid runtime errors.
CRITICAL Constraints
The Most Important Rule
DBOS context methods have strict calling restrictions based on WHERE you are:
// ============================================================ // WORKFLOW FUNCTIONS: All DBOS methods allowed // ============================================================ async function myWorkflow(task: Task): Promise<Result> { await DBOS.send(...) // ✅ Allowed await DBOS.recv(...) // ✅ Allowed await DBOS.startWorkflow(...) // ✅ Allowed await DBOS.writeStream(...) // ✅ Allowed await DBOS.setEvent(...) // ✅ Allowed await DBOS.sleep(...) // ✅ Allowed const result = await DBOS.runStep(() => myStep(task)) // ✅ Allowed return result } // ============================================================ // STEP FUNCTIONS: ONLY writeStream() allowed! // ============================================================ async function myStep(task: Task): Promise<StepResult> { await DBOS.writeStream(...) // ✅ ONLY THIS ONE! // ❌ NOT ALLOWED - Will throw runtime error: // await DBOS.send(...) // ❌ Error! // await DBOS.recv(...) // ❌ Error! // await DBOS.startWorkflow(...) // ❌ Error! // await DBOS.setEvent(...) // ❌ Error! // await DBOS.sleep(...) // ❌ Error! return { data: ... } }
Constraint Reference Table
| DBOS Method | From Workflow | From Step |
|---|---|---|
| ✅ | ❌ |
| ✅ | ❌ |
| ✅ | ❌ |
/ | ✅ | ❌ |
| ✅ | ❌ |
| ✅ | ❌ |
| ✅ | ❌ |
| ✅ | ✅ |
| ✅ | ❌ |
Promise Handling
NEVER use
- it fails fast and leaves promises unresolved, risking unhandled rejections.Promise.all()
// ❌ BAD: Promise.all() fails fast, other promises left dangling const results = await Promise.all([step1(), step2(), step3()]) // ✅ GOOD: Promise.allSettled() waits for all, reports outcomes const results = await Promise.allSettled([step1(), step2(), step3()])
Memory Isolation
Workflows and steps should NOT have side effects outside their own scope:
- ✅ Can READ global variables
- ❌ Must NOT create or update global variables
- ❌ Must NOT modify shared state outside return values
Queue Initialization Order
CRITICAL: WorkflowQueue MUST be created before
DBOS.launch() is called!
// File: server/dbos/queue.ts:17-35 // Queue is created at module level BEFORE DBOS.launch() export const executionQueue = new WorkflowQueue(QUEUE_NAME, { workerConcurrency: config.dbos.workerConcurrency ?? 5, concurrency: config.dbos.queueConcurrency ?? 100, }) // If created AFTER DBOS.launch(), queue will NOT dequeue tasks!
Design Patterns
Pattern 1: Signal Pattern (Race Condition Fix)
Problem: Client subscribes to events before the stream exists.
Solution: Workflow writes initialization event BEFORE waiting for start signal.
File:
packages/chaingraph-executor/server/dbos/workflows/ExecutionWorkflows.ts
Timeline: 1. create execution (tRPC) └─ Workflow starts → writes EXECUTION_CREATED → stream exists! ✅ └─ Workflow waits for START_SIGNAL... ⏸️ 2. subscribe events (tRPC) └─ Stream already exists → immediately receives EXECUTION_CREATED ✅ 3. start execution (tRPC) └─ Sends START_SIGNAL → workflow continues ▶️
Implementation Pattern:
async function executionWorkflow(task: ExecutionTask): Promise<ExecutionResult> { // Write event BEFORE waiting - stream now exists! await DBOS.writeStream('events', { executionId: task.executionId, event: 'EXECUTION_CREATED', timestamp: Date.now(), }) // Now safe to wait - clients can subscribe const signal = await DBOS.recv<string>('START_SIGNAL', 300) if (!signal) { throw new Error('Execution start timeout') } // Continue with execution... }
Pattern 2: Shared State Pattern (Command System)
Problem: Cannot call
DBOS.recv() from steps, but need to check for commands.
Solution: Workflow polls messages, updates shared state object that step reads.
Files:
- Workflow:
server/dbos/workflows/ExecutionWorkflows.ts - Step:
server/dbos/steps/ExecuteFlowAtomicStep.ts
// Shared state object (passed from workflow to step) interface CommandController { currentCommand: 'PAUSE' | 'RESUME' | 'STEP' | null } // WORKFLOW LEVEL: Poll DBOS.recv() every 500ms async function executionWorkflow(task: ExecutionTask) { const commandController: CommandController = { currentCommand: null } const abortController = new AbortController() // Start polling loop (runs concurrently with step) const pollCommands = async () => { while (!abortController.signal.aborted) { const cmd = await DBOS.recv<{ command: string }>('COMMAND', 0.5) if (cmd) { if (cmd.command === 'STOP') { abortController.abort() } else { commandController.currentCommand = cmd.command } } } } // Run step with shared state const result = await DBOS.runStep(() => executeFlowAtomic(task, abortController, commandController) ) return result } // STEP LEVEL: Check shared state every 100ms (no DBOS calls!) async function executeFlowAtomic( task: ExecutionTask, abortController: AbortController, commandController: CommandController ) { const checkCommands = setInterval(() => { if (commandController.currentCommand === 'PAUSE') { debugger.pause() } else if (commandController.currentCommand === 'RESUME') { debugger.continue() } commandController.currentCommand = null }, 100) // Execute flow... // Step reads shared state, never calls DBOS.recv() }
Pattern 3: Collect & Spawn Pattern (Child Executions)
Problem: Cannot call
DBOS.startWorkflow() from steps, but Event Emitter nodes need to spawn children.
Solution: Step collects child tasks and returns them, workflow spawns them.
Files:
- Step:
server/dbos/steps/ExecuteFlowAtomicStep.ts:346-401 - Workflow:
server/dbos/workflows/ExecutionWorkflows.ts
// STEP: Collect child tasks (don't spawn!) async function executeFlowAtomic(task: ExecutionTask): Promise<ExecutionResult> { const collectedChildTasks: ExecutionTask[] = [] // Execute flow, capture emitted events await engine.execute() // After execution, collect child tasks from emitted events for (const event of context.emittedEvents.filter(e => !e.processed)) { event.processed = true // Create child execution row in DB (allowed in step) const childTask = await createChildTask(instance, event, store) collectedChildTasks.push(childTask) } // Return child tasks for workflow-level spawning return { status: 'completed', childTasks: collectedChildTasks, // ← Workflow will spawn these } } // WORKFLOW: Spawn collected children (DBOS.startWorkflow allowed here!) async function executionWorkflow(task: ExecutionTask) { const result = await DBOS.runStep(() => executeFlowAtomic(task)) // Spawn children at workflow level if (result.childTasks?.length > 0) { for (const childTask of result.childTasks) { await DBOS.startWorkflow(executionWorkflow, { workflowID: childTask.executionId })(childTask) } } return result }
Pattern 4: Auto-Start Pattern (Child Execution Lifecycle)
Problem: Children need manual start call, slowing down execution tree.
Solution: Children skip the signal wait entirely and start immediately.
File:
server/dbos/workflows/ExecutionWorkflows.ts:192-214
async function executionWorkflow(task: ExecutionTask) { const executionRow = await store.get(task.executionId) const isChildExecution = !!executionRow.parentExecutionId // Write EXECUTION_CREATED first (Signal Pattern) await DBOS.writeStream('events', { event: 'EXECUTION_CREATED', ... }) // Auto-start for children! if (!isChildExecution) { // Parents: wait for signal from tRPC (timeout: 5 minutes) const startSignal = await DBOS.recv<string>('START_SIGNAL', 300) if (!startSignal) { throw new Error('Execution start timeout') } } else { // Children: skip waiting, start immediately DBOS.logger.info(`Child execution auto-start, beginning execution`) } // Continue execution... }
Child Execution Lifecycle:
Parent spawns child via DBOS.startWorkflow() └─ Child workflow starts ├─ Writes EXECUTION_CREATED event ├─ Detects parentExecutionId ├─ Skips signal wait (auto-start) └─ Executes flow immediately
Pattern 5: WorkflowQueue Pattern (Managed Concurrency)
Problem: Need to manage concurrency and ensure idempotent workflow spawning.
Solution: Use WorkflowQueue with concurrency limits and deduplication.
File:
server/dbos/queue.ts
import { WorkflowQueue } from '@dbos-inc/dbos-sdk' // Create at module level BEFORE DBOS.launch() export const executionQueue = new WorkflowQueue('chaingraph-executions', { workerConcurrency: 5, // Max concurrent per worker process concurrency: 100, // Max concurrent globally }) // Use with deduplication to prevent duplicate workflows await DBOS.startWorkflow(ExecutionWorkflows, { queueName: executionQueue.name, workflowID: childTask.executionId, // Unique ID enqueueOptions: { deduplicationID: childTask.executionId, // Idempotency key }, }).executeChainGraph(childTask)
Pattern 6: Parent Monitoring Pattern (Child Stops if Parent Dies)
Problem: Child executions should stop if their parent completes or fails.
Solution: Background checker monitors parent workflow status.
File:
server/dbos/workflows/ExecutionWorkflows.ts
async function monitorParentWorkflow( parentExecutionId: string, abortController: AbortController ) { while (!abortController.signal.aborted) { const parentStatus = await DBOS.getWorkflowStatus(parentExecutionId) if (parentStatus?.status === 'COMPLETED' || parentStatus?.status === 'ERROR' || parentStatus?.status === 'CANCELLED') { abortController.abort('Parent workflow has ended') break } await DBOS.sleep(5) // Check every 5 seconds } }
Three-Phase Workflow Structure
ChainGraph executions follow a three-phase structure:
┌──────────────────────────────────────────────────────────────┐ │ PHASE 1: Stream Initialization (Lines 148-214) │ │ ├─ Create CommandController │ │ ├─ Write EXECUTION_CREATED event (stream exists!) │ │ ├─ Auto-start children (send START_SIGNAL to self) │ │ └─ Wait for START_SIGNAL │ ├──────────────────────────────────────────────────────────────┤ │ PHASE 2: Execution (Lines 216-374) │ │ ├─ Step 1: updateToRunning() │ │ ├─ Step 2: executeFlowAtomic() ← Core execution │ │ └─ Spawn children via DBOS.startWorkflow() │ ├──────────────────────────────────────────────────────────────┤ │ PHASE 3: Cleanup (Lines 376-423) │ │ ├─ Step 3: updateToCompleted() │ │ ├─ Stop command polling │ │ └─ DBOS auto-closes event stream │ └──────────────────────────────────────────────────────────────┘
Key Files
| File | Purpose | Critical? |
|---|---|---|
| Main orchestration workflow | ⭐⭐⭐ |
| Core execution step | ⭐⭐⭐ |
| Queue initialization (MUST be before DBOS.launch) | ⭐⭐⭐ |
| DBOS initialization | ⭐⭐ |
| Worker lifecycle | ⭐⭐ |
| Status updates | ⭐ |
| Event streaming via DBOS.writeStream() | ⭐⭐ |
| Environment config | ⭐⭐ |
Environment Variables
# Enable DBOS mode (default: false) ENABLE_DBOS_EXECUTION=true # DBOS Admin UI DBOS_ADMIN_ENABLED=true DBOS_ADMIN_PORT=3022 # Access at http://localhost:3022 # Concurrency Limits DBOS_QUEUE_CONCURRENCY=100 # Global across all workers DBOS_WORKER_CONCURRENCY=5 # Per worker process # DBOS Conductor (optional, for production monitoring) DBOS_CONDUCTOR_URL=https://conductor.dbos.dev DBOS_APPLICATION_NAME=chaingraph-executor DBOS_CONDUCTOR_KEY=your-api-key-here
Anti-Patterns
Anti-Pattern #1: Calling DBOS methods from steps
// ❌ BAD: Will throw runtime error async function myStep(data: string) { await DBOS.send('other-workflow', 'message', 'TOPIC') // ❌ Error! } // ✅ GOOD: Return data, let workflow send async function myStep(data: string): Promise<{ toSend: Message }> { return { toSend: { target: 'other-workflow', message: 'hello' } } } async function myWorkflow() { const result = await DBOS.runStep(() => myStep(data)) await DBOS.send(result.toSend.target, result.toSend.message, 'TOPIC') // ✅ }
Anti-Pattern #2: Splitting atomic execution
// ❌ BAD: State lost between steps await DBOS.runStep(() => loadFlow()) await DBOS.runStep(() => executeFlow()) // ❌ Flow state lost! // ✅ GOOD: Single atomic step await DBOS.runStep(() => executeFlowAtomic(task)) // ✅ All in one step
Anti-Pattern #3: Making children wait for START_SIGNAL
// ❌ BAD: Children timeout waiting for signal that never comes async function executionWorkflow(task: ExecutionTask) { const isChild = !!executionRow.parentExecutionId // Always waiting - children have no one to send them the signal! await DBOS.recv('START_SIGNAL', 300) // ❌ Times out for children } // ✅ GOOD: Children skip signal wait async function executionWorkflow(task: ExecutionTask) { const isChild = !!executionRow.parentExecutionId if (!isChild) { // Only parents wait for signal (from tRPC start() call) await DBOS.recv('START_SIGNAL', 300) } // Children start immediately - no signal wait! }
Anti-Pattern #4: Using Promise.all() for parallel steps
// ❌ BAD: Promise.all() fails fast, leaving other promises dangling const results = await Promise.all([ DBOS.runStep(() => step1()), DBOS.runStep(() => step2()), DBOS.runStep(() => step3()), ]) // ✅ GOOD: Promise.allSettled() waits for all, handles all outcomes const results = await Promise.allSettled([ DBOS.runStep(() => step1()), DBOS.runStep(() => step2()), DBOS.runStep(() => step3()), ])
Anti-Pattern #5: Memory side effects in workflows/steps
// ❌ BAD: Modifying global state let globalCounter = 0 async function myWorkflow() { globalCounter++ // ❌ Side effect outside scope! } // ✅ GOOD: Return values instead of mutating globals async function myWorkflow(): Promise<{ count: number }> { const count = calculateCount() return { count } // ✅ Pure function, no side effects }
Anti-Pattern #6: Creating queue after DBOS.launch()
// ❌ BAD: Queue created after DBOS is initialized await DBOS.launch() const queue = new WorkflowQueue('my-queue') // ❌ Won't dequeue! // ✅ GOOD: Queue created at module level BEFORE DBOS.launch() const queue = new WorkflowQueue('my-queue') // ✅ Module level // ... later in main() await DBOS.launch()
Quick Reference
| Need | Pattern | Where |
|---|---|---|
| Stream exists before subscribe | Signal Pattern | Write event before recv() |
| Commands during step execution | Shared State | Workflow polls, step reads object |
| Spawn child workflows | Collect & Spawn | Step collects, workflow spawns |
| Children start immediately | Auto-Start | Skip signal wait |
| Real-time events from step | | Only stream method allowed in steps |
| Managed concurrency | WorkflowQueue | Queue with workerConcurrency/concurrency |
| Child stops if parent dies | Parent Monitoring | Background status checker |
| Parallel steps safely | Promise.allSettled() | Never use Promise.all() |
DBOS Workflow Architecture
┌─────────────────────────────────────────────────────────────┐ │ WORKFLOW (can call ALL DBOS methods) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ DBOS.send() │ │ DBOS.recv() │ │startWorkflow│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ DBOS.runStep(() => ...) │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ │ │ STEP (ONLY writeStream allowed) │ │ │ │ │ │ │ │ │ │ │ │ ✅ DBOS.writeStream() │ │ │ │ │ │ ❌ DBOS.send/recv/startWorkflow/sleep/... │ │ │ │ │ │ │ │ │ │ │ │ return { childTasks: [...] } │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ // After step completes: │ │ for (child of result.childTasks) { │ │ await DBOS.startWorkflow(...)(child) // ✅ Allowed here│ │ } │ └─────────────────────────────────────────────────────────────┘
Advanced DBOS Features
For advanced DBOS features not currently used in ChainGraph (Debouncer, forkWorkflow, versioning, rate limiting, partitioned queues), see
dbos-advanced.md in this skill directory.
Related Skills
- Package overviewexecutor-architecture
- Core domain conceptschaingraph-concepts
- Event streaming patternssubscription-sync
- Execution tRPC procedurestrpc-execution
- General tRPC framework patternstrpc-patterns