Claude-skill-registry executor-architecture
Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/executor-architecture" ~/.claude/skills/majiayu000-claude-skill-registry-executor-architecture && rm -rf "$T"
manifest:
skills/data/executor-architecture/SKILL.mdsource content
ChainGraph Executor Architecture
This skill provides architectural guidance for the
@badaitech/chaingraph-executor package - the execution engine that runs ChainGraph flows with durable execution via DBOS.
Package Overview
Location:
packages/chaingraph-executor/
Purpose: Flow execution engine with DBOS durable execution
Key Feature: Exactly-once execution semantics with automatic recovery
Directory Structure
packages/chaingraph-executor/ ├── server/ │ ├── index.ts # Main exports │ │ │ ├── dbos/ # DBOS durable execution ⭐ │ │ ├── config.ts # DBOS initialization │ │ ├── DBOSExecutionWorker.ts # Worker lifecycle │ │ ├── queue.ts # Queue management │ │ ├── workflows/ │ │ │ └── ExecutionWorkflows.ts # Main orchestration │ │ └── steps/ │ │ ├── ExecuteFlowAtomicStep.ts # Core execution │ │ └── UpdateStatusStep.ts # Status updates │ │ │ ├── services/ # Business logic layer │ │ ├── ExecutionService.ts # Execution instance management │ │ ├── RecoveryService.ts # Failure recovery │ │ └── ServiceFactory.ts # Service initialization │ │ │ ├── implementations/ # Interface implementations │ │ ├── dbos/ │ │ │ ├── DBOSEventBus.ts # DBOS event streaming │ │ │ └── DBOSTaskQueue.ts # DBOS task queue │ │ └── local/ │ │ ├── InMemoryEventBus.ts # Dev/test event bus │ │ └── InMemoryTaskQueue.ts │ │ │ ├── interfaces/ # Abstract interfaces │ │ ├── IEventBus.ts # Event streaming contract │ │ └── ITaskQueue.ts # Task queue contract │ │ │ ├── stores/ # Data access layer │ │ ├── execution-store.ts # Execution CRUD │ │ ├── flow-store.ts # Flow loading │ │ └── postgres/ │ │ ├── schema.ts # Drizzle schema │ │ └── postgres-execution-store.ts │ │ │ ├── trpc/ # API layer │ │ ├── router.ts # tRPC procedures │ │ └── context.ts # Request context │ │ │ └── utils/ # Utilities │ ├── config.ts # Environment config │ ├── db.ts # Database connection │ └── logger.ts # Logging │ ├── client/ # tRPC client exports └── types/ # TypeScript types
Architecture Layers
┌────────────────────────────────────────────────────────────┐ │ Layer 1: API (tRPC) │ │ ├─ create() → Start execution workflow │ │ ├─ start() → Send START_SIGNAL │ │ ├─ stop() → Cancel workflow │ │ ├─ pause() → Send PAUSE command │ │ └─ subscribeToExecutionEvents() → Stream events │ ├────────────────────────────────────────────────────────────┤ │ Layer 2: Services │ │ ├─ ExecutionService → Instance management │ │ ├─ RecoveryService → Failure recovery │ │ └─ ServiceFactory → Dependency injection │ ├────────────────────────────────────────────────────────────┤ │ Layer 3: DBOS (Durable Execution) │ │ ├─ ExecutionWorkflow → Orchestration + child spawning │ │ └─ ExecuteFlowAtomicStep → Core flow execution │ ├────────────────────────────────────────────────────────────┤ │ Layer 4: Implementations │ │ ├─ DBOSEventBus → PostgreSQL event streaming │ │ └─ DBOSTaskQueue → PostgreSQL task queue │ ├────────────────────────────────────────────────────────────┤ │ Layer 5: Stores │ │ ├─ ExecutionStore → Execution row CRUD │ │ └─ FlowStore → Flow definition loading │ └────────────────────────────────────────────────────────────┘
Two Execution Modes
The executor supports two modes controlled by
ENABLE_DBOS_EXECUTION:
DBOS Mode (Production)
ENABLE_DBOS_EXECUTION=true Features: ├─ Exactly-once execution via workflow IDs ├─ Automatic recovery from failures ├─ Real-time event streaming via PostgreSQL ├─ Durable task queue (no Kafka needed) └─ DBOS Admin UI at localhost:3022
Legacy/Local Mode (Development)
ENABLE_DBOS_EXECUTION=false Features: ├─ In-memory event bus ├─ In-memory task queue ├─ Simpler debugging └─ No durability guarantees
Key Files
| File | Purpose | Critical? |
|---|---|---|
| Main orchestration | ⭐⭐⭐ |
| Core execution step | ⭐⭐⭐ |
| Instance management | ⭐⭐ |
| Service initialization | ⭐⭐ |
| Event streaming | ⭐⭐ |
| API procedures | ⭐⭐ |
| Database schema | ⭐ |
| Environment config | ⭐ |
Execution Lifecycle
1. CREATE (tRPC) └─ ExecutionRow inserted → Workflow started └─ Workflow writes EXECUTION_CREATED event └─ Workflow waits for START_SIGNAL 2. SUBSCRIBE (tRPC) └─ Client subscribes to DBOS stream └─ Immediately receives EXECUTION_CREATED 3. START (tRPC) └─ Sends START_SIGNAL via DBOS.send() └─ Workflow continues 4. EXECUTE (Workflow) └─ Step 1: updateToRunning() └─ Step 2: executeFlowAtomic() │ └─ Load flow from DB │ └─ Create execution instance │ └─ Execute flow (up to 30min) │ └─ Stream events in real-time │ └─ Collect child tasks └─ Step 3: Spawn children └─ Step 4: updateToCompleted() 5. COMPLETE └─ DBOS auto-closes event stream └─ Client receives all events
Service Layer
ExecutionService
Manages execution instances with event streaming setup:
// server/services/ExecutionService.ts class ExecutionService { // Create execution instance with event handling async createExecutionInstance(params: { task: ExecutionTask flow: Flow executionRow: ExecutionRow abortController: AbortController }): Promise<ExecutionInstance> // Get event bus (DBOS or InMemory based on config) getEventBus(): IEventBus // Setup event handling (connects engine events → event bus) setupEventHandling(instance: ExecutionInstance): () => Promise<void> }
ServiceFactory
Initializes all services with proper dependency injection:
// server/services/ServiceFactory.ts async function initializeServices(): Promise<Services> { // 1. Create event bus (DBOS or InMemory) const eventBus = config.dbos.enabled ? new DBOSEventBus() : new InMemoryEventBus() // 2. Create task queue const taskQueue = config.dbos.enabled ? new DBOSTaskQueue() : new InMemoryTaskQueue() // 3. Create execution service const executionService = new ExecutionService(eventBus, taskQueue) // 4. Initialize DBOS steps (dependency injection) initializeExecuteFlowStep(executionService, executionStore) return { eventBus, taskQueue, executionService } }
tRPC Router
File:
server/trpc/router.ts
export const executionRouter = router({ // Create execution (starts workflow immediately) create: procedure .input(CreateExecutionInput) .mutation(async ({ input }) => { // 1. Create execution row in DB // 2. Start DBOS workflow (writes EXECUTION_CREATED) // 3. Return executionId }), // Start execution (sends START_SIGNAL) start: procedure .input(z.object({ executionId: z.string() })) .mutation(async ({ input }) => { await DBOS.send(input.executionId, 'API', 'START_SIGNAL') }), // Subscribe to execution events (real-time streaming) subscribeToExecutionEvents: procedure .input(z.object({ executionId: z.string(), fromIndex: z.number() })) .subscription(async function* ({ input }) { // Yields events from DBOS stream for await (const event of DBOS.readStream(input.executionId, 'events')) { yield event } }), // Control commands pause: procedure.mutation(...), resume: procedure.mutation(...), stop: procedure.mutation(...), })
Database Schema
File:
server/stores/postgres/schema.ts
export const executions = pgTable('executions', { id: text('id').primaryKey(), // EX123... flowId: text('flow_id').notNull(), ownerId: text('owner_id').notNull(), status: executionStatusEnum('status').notNull(), // Hierarchy rootExecutionId: text('root_execution_id'), parentExecutionId: text('parent_execution_id'), executionDepth: integer('execution_depth').default(0), // Timestamps createdAt: timestamp('created_at').notNull(), startedAt: timestamp('started_at'), completedAt: timestamp('completed_at'), // Error tracking errorMessage: text('error_message'), errorNodeId: text('error_node_id'), // Recovery failureCount: integer('failure_count').default(0), lastFailureAt: timestamp('last_failure_at'), // Context options: jsonb('options'), integration: jsonb('integration'), // archai context externalEvents: jsonb('external_events'), // events for children })
Environment Variables
# DBOS Mode ENABLE_DBOS_EXECUTION=true # Database DATABASE_URL_EXECUTIONS=postgres://... # DBOS Configuration DBOS_ADMIN_ENABLED=true DBOS_ADMIN_PORT=3022 DBOS_QUEUE_CONCURRENCY=100 DBOS_WORKER_CONCURRENCY=5 # Execution Limits EXECUTION_MAX_DEPTH=100 EXECUTION_DEFAULT_TIMEOUT_MS=3600000 # 1 hour
Quick Reference
| Need | Where | File |
|---|---|---|
| Add new tRPC procedure | API layer | |
| Modify execution logic | DBOS step | |
| Add orchestration logic | DBOS workflow | |
| Change event streaming | Implementation | |
| Modify schema | Store | |
| Add service | Service layer | |
Related Skills
- CRITICAL DBOS constraints and patternsdbos-patterns
- Core domain concepts (Flow, Node, Port)chaingraph-concepts
- Event streaming architecturesubscription-sync
- Execution types and eventstypes-architecture
- Execution tRPC procedures (API layer)trpc-execution
- General tRPC framework patternstrpc-patterns