Orchestrator-supaconductor parallel-dispatch
Parallel execution engine for dispatching worker agents. Used by conductor-orchestrator to spawn multiple workers simultaneously from DAG parallel groups. Handles dispatch, monitoring, aggregation, and failure recovery.
install
source · Clone the upstream repo
git clone https://github.com/Ibrahim-3d/orchestrator-supaconductor
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/Ibrahim-3d/orchestrator-supaconductor "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/parallel-dispatch" ~/.claude/skills/ibrahim-3d-orchestrator-supaconductor-parallel-dispatch && rm -rf "$T"
manifest:
skills/parallel-dispatch/SKILL.mdsource content
Parallel Dispatch Protocol
Engine for executing DAG tasks in parallel using worker agents.
Core Concepts
Parallel Groups
Tasks from the DAG that can execute simultaneously:
- Same topological level (no dependencies between them)
- Either conflict-free (no shared files) or with coordination strategy
Worker Pool
Maximum 5 concurrent workers to prevent context overflow:
- Each worker is an ephemeral agent created by agent-factory
- Workers coordinate via message bus
- 30-minute timeout with heartbeat monitoring
Dispatch Protocol
1. Parse DAG for Parallel Groups
def get_executable_parallel_groups(dag: dict, completed: set) -> list: """ Get parallel groups that are ready to execute. A group is ready if all dependencies are completed. """ ready_groups = [] for pg in dag.get("parallel_groups", []): # Check if all tasks in group have met dependencies all_ready = True for task_id in pg["tasks"]: task = next((n for n in dag["nodes"] if n["id"] == task_id), None) if not task: continue # Check if all dependencies completed for dep in task.get("depends_on", []): if dep not in completed: all_ready = False break if not all_ready: break if all_ready: # Check no tasks in group are already completed if not any(t in completed for t in pg["tasks"]): ready_groups.append(pg) return ready_groups
2. Create Workers for Parallel Group
def dispatch_parallel_group( parallel_group: dict, dag: dict, track_id: str, bus_path: str ) -> list: """ Dispatch all workers for a parallel group. Returns list of dispatched worker handles. """ from agent_factory import create_workers_for_parallel_group, dispatch_workers # 1. Create worker agents workers = create_workers_for_parallel_group( parallel_group, dag, track_id, bus_path ) # 2. Check pool capacity active_workers = count_active_workers(bus_path) if active_workers + len(workers) > 5: # Split into batches batch_size = 5 - active_workers workers = workers[:batch_size] # 3. Dispatch workers via parallel Task calls handles = dispatch_workers(workers) # 4. Log dispatch for worker in workers: post_message(bus_path, "WORKER_DISPATCHED", "orchestrator", { "worker_id": worker["worker_id"], "task_id": worker["task_id"], "parallel_group": parallel_group["id"] }) return handles
3. Monitor Worker Progress
async def monitor_parallel_group( parallel_group: dict, workers: list, bus_path: str, timeout_minutes: int = 60 ) -> dict: """ Monitor workers until all complete or fail. Returns aggregated results. """ import asyncio from datetime import datetime, timedelta start_time = datetime.utcnow() timeout = timedelta(minutes=timeout_minutes) pending_tasks = set(pg["tasks"] for pg in [parallel_group]) completed_tasks = set() failed_tasks = {} while pending_tasks and (datetime.utcnow() - start_time) < timeout: # Check for completions for task_id in list(pending_tasks): event_file = f"{bus_path}/events/TASK_COMPLETE_{task_id}.event" if os.path.exists(event_file): pending_tasks.remove(task_id) completed_tasks.add(task_id) # Get completion details msgs = read_messages(bus_path, msg_type="TASK_COMPLETE") for msg in msgs: if msg["payload"]["task_id"] == task_id: # Log success break # Check for failures for task_id in list(pending_tasks): event_file = f"{bus_path}/events/TASK_FAILED_{task_id}.event" if os.path.exists(event_file): pending_tasks.remove(task_id) # Get failure details msgs = read_messages(bus_path, msg_type="TASK_FAILED") for msg in msgs: if msg["payload"]["task_id"] == task_id: failed_tasks[task_id] = msg["payload"]["error"] break # Check for stale workers (no heartbeat) stale = check_stale_workers(bus_path, threshold_minutes=10) for stale_worker in stale: task_id = stale_worker["task_id"] if task_id in pending_tasks: failed_tasks[task_id] = f"Worker stale: no heartbeat for {stale_worker['minutes_stale']} min" pending_tasks.remove(task_id) # Check for deadlocks deadlock_cycle = detect_deadlock(bus_path) if deadlock_cycle: for worker_id in deadlock_cycle: # Find task for this worker status = get_worker_status(bus_path, worker_id) if status and status["task_id"] in pending_tasks: failed_tasks[status["task_id"]] = f"Deadlock detected in cycle: {deadlock_cycle}" pending_tasks.remove(status["task_id"]) await asyncio.sleep(5) # Handle timeout for task_id in pending_tasks: failed_tasks[task_id] = "Timeout: task did not complete within time limit" return { "completed": list(completed_tasks), "failed": failed_tasks, "success": len(failed_tasks) == 0 }
Failure Handling
Failure Isolation
When one worker fails, isolate the failure:
def handle_worker_failure( failed_task_id: str, dag: dict, bus_path: str ) -> dict: """ Handle a failed worker. Isolate failure and continue with independent tasks. Returns impact analysis. """ # 1. Find tasks that depend on the failed task blocked_tasks = [] for node in dag["nodes"]: if failed_task_id in node.get("depends_on", []): blocked_tasks.append(node["id"]) # 2. Recursively find all downstream tasks def find_all_downstream(task_id, visited=None): if visited is None: visited = set() if task_id in visited: return [] visited.add(task_id) downstream = [] for node in dag["nodes"]: if task_id in node.get("depends_on", []): downstream.append(node["id"]) downstream.extend(find_all_downstream(node["id"], visited)) return downstream all_blocked = set(blocked_tasks) for task in blocked_tasks: all_blocked.update(find_all_downstream(task)) # 3. Mark blocked tasks for task_id in all_blocked: post_message(bus_path, "TASK_BLOCKED", "orchestrator", { "task_id": task_id, "blocked_by": failed_task_id, "reason": "Upstream task failed" }) # 4. Find tasks that can still proceed all_tasks = set(n["id"] for n in dag["nodes"]) can_proceed = all_tasks - all_blocked - {failed_task_id} return { "failed_task": failed_task_id, "blocked_tasks": list(all_blocked), "can_proceed": list(can_proceed), "needs_fix": True }
Recovery Strategy
def attempt_recovery( failure_result: dict, dag: dict, track_id: str, bus_path: str, max_retries: int = 2 ) -> dict: """ Attempt to recover from failure. """ failed_task = failure_result["failed_task"] # 1. Check retry count retry_key = f"retry_{failed_task}" retries = get_coordination_log_count(bus_path, retry_key) if retries >= max_retries: return { "action": "ESCALATE", "reason": f"Task {failed_task} failed {retries} times, needs manual intervention" } # 2. Log retry attempt log_coordination(bus_path, { "type": retry_key, "attempt": retries + 1, "timestamp": datetime.utcnow().isoformat() + "Z" }) # 3. Re-dispatch failed task task = next((n for n in dag["nodes"] if n["id"] == failed_task), None) if task: worker = create_worker_agent(task, track_id, bus_path) dispatch_workers([worker]) return { "action": "RETRY", "task": failed_task, "attempt": retries + 1 } return {"action": "SKIP", "reason": "Task not found in DAG"}
Deadlock Detection & Resolution
def resolve_deadlock( deadlock_cycle: list, bus_path: str ) -> dict: """ Resolve a detected deadlock by releasing locks from oldest worker. """ if not deadlock_cycle: return {"resolved": True, "action": "none"} # Find oldest worker in cycle (longest waiting) oldest_worker = None oldest_time = None for worker_id in deadlock_cycle: status = get_worker_status(bus_path, worker_id) if status: started = datetime.fromisoformat(status.get("started_at", "").replace("Z", "")) if oldest_time is None or started < oldest_time: oldest_time = started oldest_worker = worker_id if oldest_worker: # Release all locks held by this worker release_all_locks_for_worker(bus_path, oldest_worker) # Post resolution message post_message(bus_path, "DEADLOCK_RESOLVED", "orchestrator", { "cycle": deadlock_cycle, "victim": oldest_worker, "action": "released_locks" }) return { "resolved": True, "action": "released_locks", "victim": oldest_worker } return {"resolved": False, "action": "manual_intervention_needed"}
Aggregating Results
def aggregate_parallel_group_results( parallel_group: dict, bus_path: str ) -> dict: """ Aggregate results from completed parallel group. """ results = { "parallel_group_id": parallel_group["id"], "tasks": {}, "files_modified": [], "commits": [] } for task_id in parallel_group["tasks"]: # Get completion message msgs = read_messages(bus_path, msg_type="TASK_COMPLETE") for msg in msgs: if msg["payload"]["task_id"] == task_id: results["tasks"][task_id] = { "status": "completed", "commit_sha": msg["payload"].get("commit_sha"), "files": msg["payload"].get("files_modified", []) } results["files_modified"].extend(msg["payload"].get("files_modified", [])) if msg["payload"].get("commit_sha"): results["commits"].append(msg["payload"]["commit_sha"]) break else: # Check for failure fail_msgs = read_messages(bus_path, msg_type="TASK_FAILED") for msg in fail_msgs: if msg["payload"]["task_id"] == task_id: results["tasks"][task_id] = { "status": "failed", "error": msg["payload"].get("error") } break results["all_succeeded"] = all( t.get("status") == "completed" for t in results["tasks"].values() ) return results
Full Parallel Execution Loop
async def execute_parallel_phase( dag: dict, track_id: str, bus_path: str, metadata: dict ) -> dict: """ Execute all parallel groups from a DAG phase. Main entry point for parallel execution. """ completed_tasks = set(metadata.get("completed_tasks", [])) phase_results = { "parallel_groups_executed": [], "all_tasks_completed": [], "failed_tasks": {}, "success": True } while True: # Get next ready parallel groups ready_groups = get_executable_parallel_groups(dag, completed_tasks) if not ready_groups: # No more groups to execute break for pg in ready_groups: # Skip if all tasks already completed if all(t in completed_tasks for t in pg["tasks"]): continue # Dispatch workers workers = dispatch_parallel_group(pg, dag, track_id, bus_path) # Monitor until completion result = await monitor_parallel_group(pg, workers, bus_path) # Update completed set completed_tasks.update(result["completed"]) phase_results["all_tasks_completed"].extend(result["completed"]) # Handle failures if result["failed"]: phase_results["failed_tasks"].update(result["failed"]) phase_results["success"] = False # Attempt recovery or continue with independent tasks for failed_task, error in result["failed"].items(): impact = handle_worker_failure(failed_task, dag, bus_path) recovery = attempt_recovery(impact, dag, track_id, bus_path) if recovery["action"] == "ESCALATE": phase_results["escalate"] = True phase_results["escalate_reason"] = recovery["reason"] phase_results["parallel_groups_executed"].append(pg["id"]) # Cleanup workers for worker in workers: cleanup_worker(worker["worker_id"]) # Update metadata metadata["parallel_state"]["parallel_groups_completed"].extend( [pg["id"] for pg in ready_groups] ) save_metadata(track_id, metadata) return phase_results
Usage in Orchestrator
# In conductor-orchestrator PARALLEL_EXECUTE step: async def step_parallel_execute(track_id: str, metadata: dict): # 1. Parse DAG from plan.md dag = parse_dag_from_plan(track_id) # 2. Initialize message bus bus_path = init_message_bus(f"conductor/tracks/{track_id}") # 3. Execute all parallel groups result = await execute_parallel_phase(dag, track_id, bus_path, metadata) # 4. Update metadata metadata["loop_state"]["parallel_state"]["total_workers_spawned"] = ... metadata["loop_state"]["parallel_state"]["completed_workers"] = len(result["all_tasks_completed"]) metadata["loop_state"]["parallel_state"]["failed_workers"] = len(result["failed_tasks"]) # 5. Determine next step if result["success"]: return "EVALUATE_EXECUTION" elif result.get("escalate"): return "COMPLETE_WITH_WARNINGS" else: return "FIX"
Worker Coordination Patterns
File Lock Coordination
For parallel groups with shared files:
# Worker before modifying shared file: if not acquire_lock(bus_path, "src/shared/file.ts", worker_id): # Post blocked message and wait post_message(bus_path, "BLOCKED", worker_id, { "task_id": task_id, "waiting_for": "FILE_UNLOCK_src/shared/file.ts", "resource": "src/shared/file.ts" }) # Poll for unlock if wait_for_event(bus_path, "FILE_UNLOCK_*.event", timeout=300): # Retry lock acquire_lock(bus_path, "src/shared/file.ts", worker_id)
Dependency Notification
Workers notify dependents when complete:
# Worker on completion: unblocked_tasks = find_tasks_unblocked_by(task_id, dag) post_message(bus_path, "TASK_COMPLETE", worker_id, { "task_id": task_id, "commit_sha": commit_sha, "files_modified": files, "unblocks": unblocked_tasks }) # Create event files for each unblocked task for unblocked in unblocked_tasks: Path(f"{bus_path}/events/DEP_READY_{unblocked}.event").touch()