Claude-skill-registry langgraph-workflows
Design and implement multi-agent workflows with LangGraph 0.2+ - state management, supervisor-worker patterns, conditional routing, and fault-tolerant checkpointing
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/langgraph-workflows" ~/.claude/skills/majiayu000-claude-skill-registry-langgraph-workflows && rm -rf "$T"
skills/data/langgraph-workflows/SKILL.mdLangGraph Workflows
Master multi-agent workflow orchestration with LangGraph 0.2+
Overview
LangGraph is a library for building stateful, multi-agent workflows as directed graphs. This skill covers production patterns for building complex AI workflows with fault tolerance, checkpointing, and observability.
Real-World Use Cases:
- Multi-Agent Code Review: Security, performance, style, and test coverage agents
- E-commerce Product Enrichment: Image classification, attribute extraction, SEO optimization
- Customer Support Routing: Intent classification, priority scoring, agent assignment
- Document Processing Pipeline: OCR, entity extraction, summarization, QA validation
- Research Assistant: Query expansion, retrieval, synthesis, fact-checking
When to use this skill:
- Building multi-step AI workflows with agent coordination
- Implementing supervisor-worker patterns (one agent routes to specialists)
- Creating fault-tolerant workflows with checkpointing
- Managing complex state across multiple LLM calls
- Conditional routing based on workflow state
When NOT to use this skill:
- Single-agent tasks (use simple LangChain chains)
- Stateless API calls (no need for graph complexity)
- Simple sequential pipelines (LangChain LCEL is simpler)
Core Concepts
1. State Management
LangGraph workflows operate on shared state passed between nodes.
Two State Approaches:
# Approach 1: TypedDict (simple, type-safe) from typing import TypedDict, Annotated from operator import add class WorkflowState(TypedDict): input: str output: str agent_responses: Annotated[list[dict], add] # List accumulates metadata: dict # Approach 2: Pydantic (validation, complex logic) from pydantic import BaseModel, Field class WorkflowState(BaseModel): input: str = Field(description="User input") output: str = "" agent_responses: list[dict] = Field(default_factory=list) def add_response(self, agent: str, result: str): self.agent_responses.append({"agent": agent, "result": result})
Real-World Example: Code Review Pipeline
class CodeReviewState(TypedDict): repository: str pull_request_id: int code_diff: str # Agent outputs (each agent adds to these) security_findings: Annotated[list[SecurityIssue], add] performance_issues: Annotated[list[PerformanceWarning], add] style_violations: Annotated[list[StyleViolation], add] test_coverage_gaps: Annotated[list[CoverageGap], add] # Control flow current_agent: str agents_completed: list[str] quality_passed: bool requires_human_review: bool
Key Pattern: Annotated[list[T], add]
- Without
: Each node replaces the listadd - With
: Each node appends to the listadd - Critical for multi-agent accumulation!
2. Supervisor-Worker Pattern
The most common multi-agent pattern: one supervisor routes to specialized workers.
from langgraph.graph import StateGraph, END # Define nodes def supervisor(state: WorkflowState) -> WorkflowState: """Route to next worker based on state.""" if state["needs_analysis"]: state["next"] = "analyzer" elif state["needs_validation"]: state["next"] = "validator" else: state["next"] = END return state def analyzer(state: WorkflowState) -> WorkflowState: """Specialized analysis worker.""" result = analyze(state["input"]) state["results"].append(result) return state # Build graph workflow = StateGraph(WorkflowState) workflow.add_node("supervisor", supervisor) workflow.add_node("analyzer", analyzer) workflow.add_node("validator", validator) # Supervisor routes dynamically workflow.add_conditional_edges( "supervisor", lambda s: s["next"], # Route based on state { "analyzer": "analyzer", "validator": "validator", END: END } ) # Workers return to supervisor workflow.add_edge("analyzer", "supervisor") workflow.add_edge("validator", "supervisor") workflow.set_entry_point("supervisor") app = workflow.compile()
Production Example: Code Review Supervisor
# app/workflows/code_review_workflow.py def supervisor_node(state: CodeReviewState) -> CodeReviewState: """Route to next available review agent.""" completed = set(state["agents_completed"]) available_agents = [a for a in ALL_REVIEW_AGENTS if a not in completed] if not available_agents: state["next"] = "quality_gate" else: # Priority-based routing (security first, then performance, etc.) state["next"] = available_agents[0] return state # Specialist review agents REVIEW_AGENTS = [ "security_scanner", # OWASP Top 10, CVE detection "performance_analyzer", # N+1 queries, algorithmic complexity "style_checker", # ESLint, Prettier, PEP8 "test_coverage", # Missing tests, assertions "documentation_review", # Docstrings, READMEs "dependency_audit" # Outdated libs, license compliance ] for agent_name in REVIEW_AGENTS: workflow.add_node(agent_name, create_review_agent(agent_name)) workflow.add_edge(agent_name, "supervisor") # Return to supervisor
Benefits:
- Easy to add/remove agents (just modify routing logic)
- Centralized coordination (supervisor sees all state)
- Parallel execution possible (if agents independent)
3. Conditional Routing
Conditional edges let you route dynamically based on state.
def route_based_on_quality(state: WorkflowState) -> str: """Decide next step based on quality score.""" if state["quality_score"] >= 0.8: return "publish" elif state["retry_count"] < 3: return "retry" else: return "manual_review" workflow.add_conditional_edges( "quality_check", route_based_on_quality, { "publish": "publish_node", "retry": "generator", "manual_review": "review_queue" } )
this project Example: Quality Gate
def route_after_quality_gate(state: AnalysisState) -> str: """Route based on quality gate result.""" if state["quality_passed"]: return "compress_findings" # Success path elif state["retry_count"] < 2: return "supervisor" # Retry with more agents else: return END # Failed, return partial results workflow.add_conditional_edges( "quality_gate", route_after_quality_gate, { "compress_findings": "compress_findings", "supervisor": "supervisor", END: END } )
Routing Patterns:
- Sequential:
(simple edges)A -> B -> C - Branching:
(conditional edges)A -> (B or C) - Looping:
(retry logic)A -> B -> A - Convergence:
(multiple inputs, one output)(A or B) -> C
4. Checkpointing & Persistence
Problem: If a workflow crashes mid-execution, you lose all progress.
Solution: LangGraph checkpointing saves state after each node.
from langgraph.checkpoint import MemorySaver, SqliteSaver # In-memory (development) memory = MemorySaver() app = workflow.compile(checkpointer=memory) # Persistent (production) - SQLite checkpointer = SqliteSaver.from_conn_string("checkpoints.db") app = workflow.compile(checkpointer=checkpointer) # Persistent (production) - PostgreSQL from langgraph.checkpoint.postgres import PostgresSaver checkpointer = PostgresSaver.from_conn_string("postgresql://...") app = workflow.compile(checkpointer=checkpointer)
Using Checkpoints:
# Start new workflow config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(initial_state, config=config) # Resume interrupted workflow config = {"configurable": {"thread_id": "analysis-123"}} result = app.invoke(None, config=config) # Resumes from last checkpoint
this project Checkpointing:
# backend/app/workflows/checkpoints.py from langgraph.checkpoint.postgres import PostgresSaver def create_checkpointer(): """Create PostgreSQL checkpointer for production.""" return PostgresSaver.from_conn_string( settings.DATABASE_URL, # Save after each agent completes save_every=1 ) # Compile with checkpointing app = workflow.compile( checkpointer=create_checkpointer(), interrupt_before=["quality_gate"] # Manual review point ) # Resume after crash result = app.invoke( None, config={"configurable": {"thread_id": analysis_id}} )
Benefits:
- Fault tolerance: Resume after crashes
- Human-in-the-loop: Pause for approval (
)interrupt_before - Debugging: Inspect state at each checkpoint
- Cost savings: Don't re-run expensive LLM calls
5. Integration with Langfuse
LangGraph + Langfuse = Full Observability
from langfuse.decorators import observe, langfuse_context from langfuse import Langfuse langfuse = Langfuse() @observe() # Traces entire workflow def run_analysis_workflow(url: str): """Run LangGraph workflow with Langfuse tracing.""" # Set trace metadata langfuse_context.update_current_trace( name="content_analysis", metadata={"url": url}, tags=["langgraph", "multi-agent"] ) # Compile workflow app = workflow.compile(checkpointer=checkpointer) # Each node is automatically traced as a span result = app.invoke({"url": url}) # Log final metrics langfuse_context.update_current_observation( output=result, metadata={"agents_used": len(result["agents_completed"])} ) return result # Node-level tracing @observe(as_type="generation") # Mark as LLM call def security_agent_node(state: AnalysisState): """Security analysis agent.""" langfuse_context.update_current_observation( name="security_agent", input=state["raw_content"][:200] # First 200 chars ) result = security_agent.analyze(state["raw_content"]) langfuse_context.update_current_observation( output=result, usage={ "input_tokens": result["usage"]["input_tokens"], "output_tokens": result["usage"]["output_tokens"] } ) state["findings"].append(result) state["agents_completed"].append("security") return state
Langfuse Dashboard Shows:
- Full workflow execution graph
- Per-node latency and costs
- Token usage by agent
- Failed nodes and retry attempts
- State at each checkpoint
this project's 8-Agent Analysis Pipeline
Architecture:
User Content ↓ [Supervisor] → Routes to 8 specialist agents ↓ [Security Agent] ──┐ [Tech Comparator] ──┤ [Implementation] ──┤ [Tutorial] ──┼→ [Supervisor] → [Quality Gate] [Depth Analyzer] ──┤ ↓ [Prerequisites] ──┤ Pass: Compress [Best Practices] ──┤ Fail: Retry or END [Code Examples] ──┘
State Schema:
class Finding(BaseModel): agent: str category: str content: str confidence: float class AnalysisState(TypedDict): # Input url: str raw_content: str # Agent outputs findings: Annotated[list[Finding], add] embeddings: Annotated[list[Embedding], add] # Control flow current_agent: str agents_completed: list[str] next: str # Quality control quality_score: float quality_passed: bool retry_count: int # Final output compressed_summary: str artifact: dict
Key Design Decisions:
- Supervisor pattern: Centralized routing, easy to modify agent list
- Accumulating state:
ensures all findings preservedAnnotated[list[T], add] - Quality gate: Validates before compression (prevents bad outputs)
- Checkpointing: Resume expensive multi-agent workflows after failures
- Langfuse tracing: Track costs and latency per agent
Common Patterns
Pattern 1: Map-Reduce (Parallel Agents)
from langgraph.graph import StateGraph def fan_out(state): """Split work into parallel tasks.""" state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}] return state def worker(state): """Process one task.""" # LangGraph handles parallel execution task = state["current_task"] result = process(task) return {"results": [result]} def fan_in(state): """Combine parallel results.""" combined = aggregate(state["results"]) return {"final": combined} workflow = StateGraph(State) workflow.add_node("fan_out", fan_out) workflow.add_node("worker", worker) workflow.add_node("fan_in", fan_in) workflow.add_edge("fan_out", "worker") workflow.add_edge("worker", "fan_in") # Waits for all workers
Pattern 2: Human-in-the-Loop
workflow = StateGraph(State) workflow.add_node("draft", generate_draft) workflow.add_node("review", human_review) workflow.add_node("publish", publish_content) # Interrupt before review (wait for human) app = workflow.compile(interrupt_before=["review"]) # Step 1: Generate draft (stops at review) result = app.invoke({"topic": "AI"}, config=config) # Step 2: Human reviews, modifies state state = app.get_state(config) state["approved"] = True # Human decision app.update_state(config, state) # Step 3: Resume workflow result = app.invoke(None, config=config) # Continues to publish
Pattern 3: Retry with Backoff
def llm_call_with_retry(state): """Retry failed LLM calls.""" try: result = call_llm(state["input"]) state["output"] = result state["retry_count"] = 0 return state except Exception as e: state["retry_count"] += 1 state["error"] = str(e) return state def should_retry(state) -> str: if state["retry_count"] == 0: return "success" elif state["retry_count"] < 3: return "retry" else: return "failed" workflow.add_conditional_edges( "llm_call", should_retry, { "success": "next_step", "retry": "llm_call", # Loop back "failed": "error_handler" } )
Best Practices
1. State Design
- Keep state flat: Avoid deeply nested dicts (hard to debug)
- Use TypedDict: Type safety catches errors early
- Annotated accumulators: Use
for multi-agent outputsAnnotated[list, add] - Immutable inputs: Don't modify input fields (helps with checkpointing)
2. Node Design
- Pure functions: Nodes should not have side effects (except I/O)
- Idempotent: Safe to re-run (important for checkpointing)
- Single responsibility: One agent = one node
- Return new state: Don't mutate in place (use
)state.copy()
3. Error Handling
- Wrap nodes: Try/catch to prevent workflow crash
- Dead letter queue: Send failed items to error handler
- Retry logic: Exponential backoff for transient errors
- Checkpoints: Enable recovery without losing progress
4. Performance
- Parallel execution: Use
API for independent tasksSend - Lazy loading: Don't load heavy data until needed
- Streaming: Stream LLM responses for better UX
- Caching: Cache expensive operations (embeddings, API calls)
5. Observability
- Trace everything: Use
on all nodes@observe() - Log state changes: Before/after state for debugging
- Cost tracking: Record token usage per node
- Alerting: Set up alerts for workflow failures
Debugging LangGraph Workflows
Visualize the Graph
from IPython.display import Image # Generate graph visualization image = app.get_graph().draw_mermaid_png() Image(image)
Inspect Checkpoints
# Get all checkpoints for a workflow checkpoints = app.get_state_history(config) for checkpoint in checkpoints: print(f"Step: {checkpoint.metadata['step']}") print(f"Node: {checkpoint.metadata['source']}") print(f"State: {checkpoint.values}")
Step-by-Step Execution
# Execute one node at a time for step in app.stream(initial_state, config): print(f"After {step['node']}: {step['state']}") input("Press Enter to continue...")
Migration from LangChain Chains
Old Way (LCEL Chain):
chain = ( load_content | analyze | summarize | format_output ) result = chain.invoke({"url": url})
New Way (LangGraph):
workflow = StateGraph(State) workflow.add_node("load", load_content) workflow.add_node("analyze", analyze) workflow.add_node("summarize", summarize) workflow.add_node("format", format_output) workflow.add_edge("load", "analyze") workflow.add_edge("analyze", "summarize") workflow.add_edge("summarize", "format") app = workflow.compile() result = app.invoke({"url": url})
When to use LangGraph over LCEL:
- Need state persistence (checkpointing)
- Conditional routing based on results
- Multi-agent coordination
- Human-in-the-loop approval
- Fault tolerance required
References
LangGraph Documentation
this project Examples
- Main analysis pipelinebackend/app/workflows/content_analysis_workflow.py
- Individual agent nodesbackend/app/workflows/nodes/
- State schema definitionsbackend/app/workflows/state.py
Related Skills
- LLM integration patternsai-native-development
- Workflow tracing and monitoringlangfuse-observability
- Optimize multi-agent executionperformance-optimization
Version: 1.0.0 (December 2025) Status: Production-ready patterns from this project's multi-agent pipeline