Claude-skill-registry distributed-workers
Use when working on worker implementation, ServiceOrchestrator patterns, WorkerAPIBase, operation dispatch, progress tracking, cancellation, backend-to-worker communication, or adding new worker types.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/distributed-workers" ~/.claude/skills/majiayu000-claude-skill-registry-distributed-workers && rm -rf "$T"
skills/data/distributed-workers/SKILL.mdDistributed Workers Architecture
Load this skill when working on:
- Worker implementation or debugging
- ServiceOrchestrator or WorkerAPIBase patterns
- Operation dispatch, progress tracking, or cancellation
- Backend-to-worker communication
- Adding new worker types
Architecture Overview
KTRDR uses a distributed workers architecture where the backend orchestrates operations across worker nodes:
┌─────────────────────────────────────────────────────────────────┐ │ Backend (Docker Container, Port 8000) │ │ ├─ API Layer (FastAPI) │ │ ├─ Service Orchestrators (NEVER execute operations) │ │ ├─ WorkerRegistry (tracks all workers) │ │ └─ OperationsService (tracks all operations) │ └─────────────────────────────────────────────────────────────────┘ │ ├─ HTTP (Worker Registration & Operation Dispatch) │ ┌────┴────┬──────────┬──────────┬─────────────┐ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌──────────────┐ │Backtest││Backtest││Training││Training│ │IB Host Service│ │Worker 1││Worker 2││Worker 1││Worker 2│ │(Port 5001) │ │:5003 ││:5003 ││:5004 ││:5004 │ │Direct IB TCP │ └────────┘└────────┘└────────┘└────────┘ └──────────────┘ CPU-only CPU-only CPU-only CPU-only Direct IB Gateway ┌─────────────────────┐ │Training Host Service│ │(Port 5002) │ │GPU Access (CUDA/MPS)│ └─────────────────────┘ 10x-100x faster training
Key Principles
- Backend as Orchestrator Only: Backend does not execute operations locally — it only selects workers and dispatches operations
- Distributed-Only Execution: All backtesting and training operations execute on workers (no local fallback)
- Self-Registering Workers: Workers push-register with backend on startup (infrastructure-agnostic)
- GPU-First Routing: Training operations prefer GPU workers (10x-100x faster) with CPU worker fallback
- Horizontal Scalability: Add more workers = more concurrent operations
Worker Types
| Worker | Location | Purpose | Scalability |
|---|---|---|---|
| Backtest Workers | Containerized, CPU | Execute backtesting | Horizontal |
| Training Workers | Containerized, CPU | Training fallback | Horizontal |
| Training Host Service | Native, GPU | GPU training (priority) | Limited by hardware |
| IB Host Service | Native | IB Gateway access | Single instance |
ServiceOrchestrator Pattern
Location:
ktrdr/async_infrastructure/service_orchestrator.py
All service managers inherit from ServiceOrchestrator:
class DataAcquisitionService(ServiceOrchestrator): def __init__(self): # Reads USE_IB_HOST_SERVICE env var self.provider = self._initialize_provider() async def download_data(self, ...): # Unified async pattern with progress tracking return await self._execute_with_progress(...)
Features provided:
- Environment-based configuration
- Adapter initialization (local vs. host service routing)
- Unified async operations with progress tracking
- Cancellation token support
- Operations service integration
WorkerAPIBase Pattern
Location:
ktrdr/workers/base.py
All workers inherit from WorkerAPIBase and get these features for free:
- OperationsService singleton — Worker-local operation tracking
- Operations proxy endpoints:
— Get operation statusGET /api/v1/operations/{id}
— Get operation metricsGET /api/v1/operations/{id}/metrics
— List operationsGET /api/v1/operations
— Cancel operationDELETE /api/v1/operations/{id}/cancel
- Health endpoint — Reports busy/idle status (
)GET /health - FastAPI app with CORS — Ready for Docker communication
- Self-registration — Automatic registration with backend on startup
Key Pattern Elements
- Operation ID Synchronization: Accepts optional
from backend, returns sametask_idoperation_id - Progress Tracking: Workers register progress bridges in their OperationsService
- Remote Queryability: Backend can query worker's operations endpoints directly (1s cache TTL)
- Push-Based Registration: Workers call
on startupPOST /workers/register
Example Implementation
class BacktestWorker(WorkerAPIBase): def __init__(self, worker_port=5003, backend_url="http://backend:8000"): super().__init__( worker_type=WorkerType.BACKTESTING, operation_type=OperationType.BACKTESTING, worker_port=worker_port, backend_url=backend_url, ) # Register domain-specific endpoint @self.app.post("/backtests/start") async def start_backtest(request: BacktestStartRequest): operation_id = request.task_id or f"worker_backtest_{uuid.uuid4().hex[:12]}" result = await self._execute_backtest_work(operation_id, request) return {"success": True, "operation_id": operation_id, **result}
Worker Implementations
-
BacktestWorker (
):ktrdr/backtesting/backtest_worker.py- Adds
endpoint/backtests/start - Calls BacktestingEngine directly via
asyncio.to_thread - Registers BacktestProgressBridge
- Adds
-
TrainingWorker (
):ktrdr/training/training_worker.py- Adds
endpoint/training/start - Calls TrainingManager directly (async)
- Simplified progress tracking
- Adds
Host Service Integration
IB Host Service (uses environment variables)
USE_IB_HOST_SERVICE=true IB_HOST_SERVICE_URL=http://localhost:5001 # default
Why separate: IB Gateway requires direct TCP connection (Docker networking limitation)
Training & Backtesting (uses WorkerRegistry)
Environment flags REMOVED in Phase 5.3:
- ❌
USE_TRAINING_HOST_SERVICE - ❌
REMOTE_BACKTEST_SERVICE_URL
Now uses WorkerRegistry:
- Workers self-register with backend on startup
- Backend selects available workers automatically
- GPU workers register with
capability (prioritized)gpu: true - CPU workers register as fallback
Starting Workers
# Docker Compose (development) docker-compose up -d --scale backtest-worker=5 --scale training-worker=3 # Training Host Service (GPU, runs natively) cd training-host-service && ./start.sh # Workers self-register at: # - Backtest: http://localhost:5003 # - Training (CPU): http://localhost:5004 # - Training (GPU): http://localhost:5002
Verification
# Check registered workers curl http://localhost:8000/api/v1/workers | jq # Expected: All workers show as AVAILABLE with proper capabilities
Cancellation Tokens
Location:
ktrdr/async_infrastructure/cancellation.py
All long-running operations support cancellation:
from ktrdr.async_infrastructure.cancellation import create_cancellation_token token = create_cancellation_token() # In operation loop if token.is_cancelled(): raise asyncio.CancelledError()
- Create tokens with
create_cancellation_token() - Check with
token.is_cancelled() - Operations service manages tokens globally
- CLI displays cancellation status
Async Operations Pattern (CLI)
All CLI commands use
AsyncCLIClient for API communication:
from ktrdr.cli.helpers.async_cli_client import AsyncCLIClient async def some_command(symbol: str): async with AsyncCLIClient() as client: result = await client.post("/endpoint", json=data)
Progress display: Use
GenericProgressManager with ProgressRenderer for live updates
Documentation
- Architecture: docs/architecture-overviews/distributed-workers.md
- Developer Guide: docs/developer/distributed-workers-guide.md
- Deployment: docs/user-guides/deployment.md