Claude-skill-registry cva-patterns-workflows
Multi-agent workflow orchestration patterns for Clojure+Vertex AI. Covers sequential/parallel/DAG topologies, state management (stateless/stateful/hybrid), error handling (retry/circuit breaker), and monitoring. Includes production metrics from healthcare pipeline (99.4% latency reduction, 92.4% cost reduction). Use when designing multi-agent systems, implementing complex pipelines, optimizing workflow performance, or building production orchestration.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/cva-patterns-workflows" ~/.claude/skills/majiayu000-claude-skill-registry-cva-patterns-workflows && rm -rf "$T"
skills/data/cva-patterns-workflows/SKILL.mdWorkflow Pipeline Orchestration
Pattern Type: Architectural Complexity: High Best For: Multi-agent systems with 3+ agents, production pipelines requiring reliability and observability
🎯 Overview
Workflow pipeline patterns enable orchestration of multiple AI agents into cohesive, production-ready systems. This pattern addresses three critical challenges:
- Topology Design: How to connect agents (sequential, parallel, DAG)
- State Management: How to handle intermediate results and failures
- Reliability: How to ensure robustness with retries, fallbacks, and monitoring
When to Use:
- Pipeline has 3+ agents with dependencies
- Need retry/recovery capabilities
- Production environment requiring observability
- Cost optimization through parallelization
Trade-offs:
- Complexity: Higher code overhead (+40% for DAG vs sequential)
- Performance: Parallel topologies reduce latency by 32%
- Reliability: State management adds +250ms overhead but enables recovery
Production ROI: Healthcare pipeline achieved 99.4% latency reduction (4h 15m → 1m 30s) and 92.4% cost reduction (R$ 192.50 → R$ 14.70).
📊 Pattern Explanation
Core Concept
Multi-agent pipelines follow three topology patterns:
1. SEQUENTIAL (Linear) Input → A → B → C → D → Output - Simple, predictable - Full latency = sum(all systems) - Use: Strong dependencies between agents 2. PARALLEL (Fork-Join) ┌→ B ┐ Input → A┤ ├→ D → Output └→ C ┘ - 32% latency reduction - Same cost, better performance - Use: Independent agents after A 3. DAG (Directed Acyclic Graph) ┌→ B → D ┐ Input → A → ┌┤ ├→ E → Output │└→ C ────┘ └→ Validation (optional) - Maximum flexibility - Supports optional nodes, validations - Use: Complex dependencies, 5+ agents
Implementation Approach
Step 1: Define agent dependencies
- Map each agent's input requirements
- Identify independent agents (parallelizable)
- Determine optional vs critical nodes
Step 2: Choose state management strategy
- Stateless: In-memory only (dev/simple pipelines)
- Stateful: Database-backed (production, long pipelines)
- Hybrid: Checkpoints for expensive agents only
Step 3: Add error handling
- Per-system retry with exponential backoff
- Circuit breaker for external APIs
- Fallback strategies (cache, skip, human review)
Step 4: Instrument for observability
- Track latency, cost, tokens per agent
- Export metrics to Prometheus/CloudWatch
- Alert on budget thresholds
💻 Clojure Implementation
Basic Example: Sequential Pipeline
(ns lab.workflows.sequential "Sequential pipeline with error propagation" (:require [lab.agents.data-extraction :as s11] [lab.agents.claims-identification :as s12] [lab.agents.reference-search :as s212])) (defn execute-sequential-pipeline "Execute 3-agent pipeline sequentially. Agent flow: 1. S.1.1 extracts structured data from input 2. S.1.2 identifies claims requiring evidence 3. S.2-1.2 searches scientific references for claims Args: agents-map - Map {:s11 agent1, :s12 agent2, :s212 agent3} db-spec - Database connection spec input - Map {:texto \"...\", :requisitos \"...\"} Returns: {:success? boolean :results {:s11 {...}, :s12 {...}, :s212 {...}} :metrics {:total-time-ms int, :total-cost float}}" [agents-map db-spec input] (let [start-time (System/currentTimeMillis) cost-tracker (atom 0.0)] (try ;; System 1.1: Data extraction (LGPD-compliant) (let [s11-result (s11/extract-data (:s11 agents-map) db-spec input)] (when-not (:success? s11-result) (throw (ex-info "S.1.1 failed" {:system :s11, :error (:errors s11-result)}))) (swap! cost-tracker + 0.045) ;; System 1.2: Claims identification (let [s12-result (s12/identify-claims (:s12 agents-map) (:data s11-result))] (when-not (:success? s12-result) (throw (ex-info "S.1.2 failed" {:system :s12, :error (:errors s12-result)}))) (swap! cost-tracker + 0.021) ;; System 2-1.2: Reference search (let [s212-result (s212/search-references (:s212 agents-map) (:claims s12-result))] (when-not (:success? s212-result) (throw (ex-info "S.2-1.2 failed" {:system :s212, :error (:errors s212-result)}))) (swap! cost-tracker + 0.067) ;; Return successful result {:success? true :results {:s11 (:data s11-result) :s12 (:claims s12-result) :s212 (:references s212-result)} :metrics {:total-time-ms (- (System/currentTimeMillis) start-time) :total-cost @cost-tracker :systems-count 3}}))) (catch Exception e ;; Centralized error handling {:success? false :error (.getMessage e) :error-data (ex-data e) :metrics {:total-time-ms (- (System/currentTimeMillis) start-time) :total-cost @cost-tracker :partial? true}})))) (comment ;; Usage example (def agents {:s11 (create-gemini-agent "gemini-2.0-flash") :s12 (create-gemini-agent "gemini-2.0-flash") :s212 (create-claude-agent "claude-3-haiku")}) (def result (execute-sequential-pipeline agents db-spec {:texto "Criar post sobre TCC para ansiedade" :requisitos "Tom profissional, 800 palavras"})) (:metrics result) ;; => {:total-time-ms 14300, :total-cost 0.133, :systems-count 3} ;; Production metrics (5-agent healthcare pipeline): ;; - Latency: 32.2s average ;; - Cost: $0.391 per execution ;; - Success rate: 87.8% )
Production Example: Parallel Pipeline with State Management
(ns lab.workflows.parallel "Parallel pipeline with hybrid state management" (:require [clojure.core.async :as async] [lab.workflows.sequential :as seq-wf] [next.jdbc :as jdbc] [cheshire.core :as json])) (defn save-checkpoint! "Save expensive agent result to database for recovery. Checkpoints enable resume-from-failure without re-running expensive agents (e.g., external API calls, long-running LLMs). Args: db-spec - Database connection spec pipeline-id - UUID of pipeline execution agent-id - Keyword (:s212, :s4, etc.) result - Agent result map metadata - Map {:cost float, :latency-ms int, :model string} Returns: Database row ID" [db-spec pipeline-id agent-id result metadata] (jdbc/execute-one! (jdbc/get-datasource db-spec) ["INSERT INTO pipeline_checkpoints (pipeline_id, agent_id, result_json, metadata_json, created_at) VALUES (?::uuid, ?, ?::jsonb, ?::jsonb, NOW()) RETURNING id" (str pipeline-id) (name agent-id) (json/generate-string result) (json/generate-string metadata)])) (defn load-checkpoint "Load checkpoint from database if exists. Returns: {:result map, :metadata map} or nil if not found" [db-spec pipeline-id agent-id] (when-let [row (jdbc/execute-one! (jdbc/get-datasource db-spec) ["SELECT result_json, metadata_json FROM pipeline_checkpoints WHERE pipeline_id = ?::uuid AND agent_id = ? ORDER BY created_at DESC LIMIT 1" (str pipeline-id) (name agent-id)])] {:result (json/parse-string (:result_json row) true) :metadata (json/parse-string (:metadata_json row) true)})) (defn execute-parallel-subsystems "Execute independent agents in parallel using core.async. Parallelization strategy: - S.1.2 and S.3-2 are independent after S.1.1 - Execute concurrently to reduce total latency - Both must succeed for pipeline to continue Args: agents-map - Map of agent instances db-spec - Database spec prof-id - Professional UUID s11-output - Output from S.1.1 (already executed) Returns: {:s12-result map :s32-result map :parallel-time-ms int :time-saved-ms int}" [agents-map db-spec prof-id s11-output] (let [start-time (System/currentTimeMillis) ;; Create channels for results s12-chan (async/chan) s32-chan (async/chan) ;; Execute S.1.2 in separate thread _ (async/go (try (let [result (s12/identify-claims (:s12 agents-map) s11-output)] (async/>! s12-chan result)) (catch Exception e (async/>! s12-chan {:success? false, :error (.getMessage e)})))) ;; Execute S.3-2 in separate thread _ (async/go (try (let [result (s32/optimize-seo (:s32 agents-map) db-spec s11-output prof-id)] (async/>! s32-chan result)) (catch Exception e (async/>! s32-chan {:success? false, :error (.getMessage e)})))) ;; Wait for both results (blocking) s12-result (async/<!! s12-chan) s32-result (async/<!! s32-chan) end-time (System/currentTimeMillis) parallel-time (- end-time start-time) sequential-time (+ 2100 5200) ; S.1.2 + S.3-2 if sequential time-saved (- sequential-time parallel-time)] {:s12-result s12-result :s32-result s32-result :parallel-time-ms parallel-time :time-saved-ms time-saved})) (defn execute-hybrid-pipeline "Production pipeline with parallel execution + selective checkpoints. Optimization strategy: - Fast agents (S.1.1, S.1.2, S.3-2): In-memory only - Expensive agents (S.2-1.2, S.4): Checkpoint to database - Independent agents (S.1.2, S.3-2): Execute in parallel Recovery scenario: If S.4 fails, resume from S.2-1.2 checkpoint without re-running S.1.1, S.1.2, S.3-2 (saves $0.144 + 10.1s). Args: agents-map - Map of agent instances db-spec - Database spec prof-id - Professional UUID input - Initial input map opts - {:pipeline-id UUID, :resume-from keyword} Returns: {:success? boolean :pipeline-id UUID :results map :metrics map}" [agents-map db-spec prof-id input & {:keys [pipeline-id resume-from]}] (let [pipeline-id (or pipeline-id (java.util.UUID/randomUUID)) start-time (System/currentTimeMillis)] (println "🚀 Pipeline" pipeline-id "started") ;; S.1.1: Always execute (fast, unique input) (let [s11-result (seq-wf/s11/extract-data (:s11 agents-map) db-spec input) _ (when-not (:success? s11-result) (throw (ex-info "S.1.1 failed" {:system :s11}))) ;; Parallel execution: S.1.2 + S.3-2 parallel-results (execute-parallel-subsystems agents-map db-spec prof-id (:data s11-result)) s12-result (:s12-result parallel-results) s32-result (:s32-result parallel-results) _ (when-not (and (:success? s12-result) (:success? s32-result)) (throw (ex-info "Parallel execution failed" {:s12 (:success? s12-result) :s32 (:success? s32-result)}))) ;; S.2-1.2: CHECKPOINT (expensive external API) s212-result (or (when (= resume-from :s4) (:result (load-checkpoint db-spec pipeline-id :s212))) (let [result (seq-wf/s212/search-references (:s212 agents-map) (:claims s12-result))] (save-checkpoint! db-spec pipeline-id :s212 result {:cost 0.067, :latency-ms 8400}) result)) _ (when-not (:success? s212-result) (throw (ex-info "S.2-1.2 failed" {:system :s212}))) ;; S.4: CHECKPOINT (expensive consolidation) s4-result (or (when (= resume-from :done) (:result (load-checkpoint db-spec pipeline-id :s4))) (let [result (seq-wf/s4/consolidate-final-text (:s4 agents-map) db-spec prof-id {:s11 (:data s11-result) :s12 (:claims s12-result) :s212 (:references s212-result) :s32 (:seo s32-result)} "query")] (save-checkpoint! db-spec pipeline-id :s4 result {:cost 0.18, :latency-ms 12700}) result)) end-time (System/currentTimeMillis)] {:success? true :pipeline-id pipeline-id :results {:s11 (:data s11-result) :s12 (:claims s12-result) :s212 (:references s212-result) :s32 (:seo s32-result) :s4 (:final-text s4-result)} :final-output (:final-text s4-result) :metrics {:total-time-ms (- end-time start-time) :total-cost 0.391 :parallelization-gain-ms (:time-saved-ms parallel-results) :checkpoint-overhead-ms 250}}))) (comment ;; Execute production pipeline (def result (execute-hybrid-pipeline agents db-spec prof-id input)) ;; If S.4 fails, resume from checkpoint (def resumed (execute-hybrid-pipeline agents db-spec prof-id input :pipeline-id (:pipeline-id result) :resume-from :s4)) ;; Savings on resume: ;; - Cost: -$0.144 (no re-run of S.1.1, S.1.2, S.2-1.2, S.3-2) ;; - Time: -10.1s (skip 4 systems) ;; Production metrics (healthcare pipeline, hybrid mode): ;; - Latency: 21.8s (vs 32.2s sequential, -32%) ;; - Cost: $0.391 (same as sequential) ;; - Checkpoint overhead: +250ms (vs +1250ms full stateful) ;; - Recovery cost savings: -75% on failure )
💡 Best Practices
-
Start Sequential, Optimize to Parallel
- Rationale: Sequential is simpler to debug and reason about. Profile first to identify bottlenecks, then parallelize independent agents.
- Example: Healthcare pipeline started sequential (87.8% success rate), then parallelized S.1.2+S.3-2 for 32% latency reduction with zero quality impact.
-
Checkpoint Expensive Agents Only
- Rationale: Database I/O adds overhead (+50ms per checkpoint). Only checkpoint agents with high cost/latency (>$0.05 or >5s).
- Example: Checkpoint S.2-1.2 (external API, 8.4s, $0.067) but not S.1.2 (fast, 2.1s, $0.021). Saves 75% on recovery cost.
-
Use Circuit Breakers for External APIs
- Rationale: Prevent cascading failures when external services are down. Circuit breaker fails fast, avoiding wasted API calls.
- Example: PubMed API circuit breaker (3 failures → open for 60s) saved $0.30/min during outage by avoiding 5 retry attempts per request.
-
Instrument Everything
- Rationale: Production issues are invisible without metrics. Track latency, cost, tokens per agent for optimization insights.
- Example: Discovered S.4 consumed 46% of total cost ($0.18/$0.391). Optimized with multi-model routing, reduced to $0.14 (-22%).
-
Define Fallback Strategies Per Agent
- Rationale: Not all agent failures should halt the pipeline. SEO optimization can be skipped, but LGPD extraction cannot.
- Example: S.3-2 (SEO) fallback is "skip", allowing pipeline to complete. S.1.1 (LGPD) fallback is "human review" (critical compliance).
-
Test Recovery Paths
- Rationale: Stateful pipelines have complex recovery logic. Untested recovery can corrupt data or lose intermediate results.
- Example: Integration test suite simulates failures at each checkpoint, validates resume-from-checkpoint produces identical output.
🔗 Related Skills
- Agent architecture (Types A/B/C/D) ⭐cva-concepts-agent-types
- Real 5-agent production application ⭐cva-healthcare-pipeline
- Context management for pipeline statecva-patterns-context
- Cost optimization strategiescva-patterns-cost
- Creating Gemini/Claude agentscva-basics-agent-creation
- Prompt engineering for agentscva-basics-prompts
📘 Additional Resources
Pattern Variations
Streaming Pipelines: Use core.async channels for continuous data flow (e.g., real-time event processing). Not covered here as healthcare pipeline is batch-oriented.
Saga Pattern: Distributed transactions with compensating actions. Useful for multi-database pipelines requiring ACID guarantees.
Event-Driven Orchestration: Pub/sub messaging between agents (vs direct calls). Better for loosely-coupled systems, adds messaging infrastructure complexity.
Advanced Topics
Dynamic DAG Construction: Build pipeline topology at runtime based on input characteristics. Example: Skip S.3-2 (SEO) if content type is "internal documentation".
Multi-Tenant Isolation: Ensure pipeline executions for different tenants don't interfere (separate database schemas, resource quotas).
A/B Testing Framework: Run parallel pipeline variants to compare quality/cost. Example: Gemini Flash vs Haiku for S.2-1.2 (73% cost reduction with Haiku, 2% quality drop).
Performance Benchmarks
| Topology | Latency | Cost | Complexity | Recovery | Use Case |
|---|---|---|---|---|---|
| Sequential | 32.2s | $0.391 | ⭐ | ❌ | MVP, prototype |
| Parallel | 21.8s | $0.391 | ⭐⭐ | ❌ | Production med volume |
| Hybrid | 21.8s | $0.391 | ⭐⭐⭐ | ✅ | Recommended production |
| DAG | 22.5s | $0.391 | ⭐⭐⭐⭐ | ✅ | Complex (5+ agents) |
Recommendation: Start with Parallel + Hybrid state for production. Migrate to DAG only if you have 5+ agents with complex dependencies.