git clone https://github.com/MacPhobos/research-mind
T=$(mktemp -d) && git clone --depth=1 https://github.com/MacPhobos/research-mind "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/toolchains-ai-frameworks-langgraph" ~/.claude/skills/macphobos-research-mind-toolchains-ai-frameworks-langgraph && rm -rf "$T"
.claude/skills/toolchains-ai-frameworks-langgraph/skill.mdLangGraph Workflows
Summary
LangGraph is a framework for building stateful, multi-agent applications with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.
Key Innovation: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.
When to Use
✅ Use LangGraph When:
- Multi-agent coordination required
- Complex state management needs
- Human-in-the-loop workflows (approval gates, reviews)
- Need debugging/observability (time-travel, replay)
- Conditional branching based on outputs
- Building production agent systems
- State persistence across sessions
❌ Don't Use LangGraph When:
- Simple single-agent tasks
- No state persistence needed
- Prototyping/experimentation phase (use simple chains)
- Team lacks graph/state machine expertise
- Stateless request-response patterns
Quick Start
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator # 1. Define state schema class AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str # 2. Create graph workflow = StateGraph(AgentState) # 3. Add nodes (agents) def researcher(state): return {"messages": ["Research complete"], "current_step": "research"} def writer(state): return {"messages": ["Article written"], "current_step": "writing"} workflow.add_node("researcher", researcher) workflow.add_node("writer", writer) # 4. Add edges (transitions) workflow.add_edge("researcher", "writer") workflow.add_edge("writer", END) # 5. Set entry point and compile workflow.set_entry_point("researcher") app = workflow.compile() # 6. Execute result = app.invoke({"messages": [], "current_step": "start"}) print(result)
Core Concepts
StateGraph
The fundamental building block representing a directed graph of agents with shared state.
from langgraph.graph import StateGraph, END from typing import TypedDict class AgentState(TypedDict): """State schema shared across all nodes.""" messages: list user_input: str final_output: str metadata: dict # Create graph with state schema workflow = StateGraph(AgentState)
Key Properties:
- Nodes: Agent functions that transform state
- Edges: Transitions between nodes (static or conditional)
- State: Shared data structure passed between nodes
- Entry Point: Starting node of execution
- END: Terminal node signaling completion
Nodes
Nodes are functions that receive current state and return state updates.
def research_agent(state: AgentState) -> dict: """Node function: receives state, returns updates.""" query = state["user_input"] # Perform research (simplified) results = search_web(query) # Return state updates (partial state) return { "messages": state["messages"] + [f"Research: {results}"], "metadata": {"research_complete": True} } # Add node to graph workflow.add_node("researcher", research_agent)
Node Behavior:
- Receives full state as input
- Returns partial state (only fields to update)
- Can be sync or async functions
- Can invoke LLMs, call APIs, run computations
Edges
Edges define transitions between nodes.
Static Edges
# Direct transition: researcher → writer workflow.add_edge("researcher", "writer") # Transition to END workflow.add_edge("writer", END)
Conditional Edges
def should_continue(state: AgentState) -> str: """Routing function: decides next node based on state.""" last_message = state["messages"][-1] if "APPROVED" in last_message: return END elif "NEEDS_REVISION" in last_message: return "writer" else: return "reviewer" workflow.add_conditional_edges( "reviewer", # Source node should_continue, # Routing function { END: END, "writer": "writer", "reviewer": "reviewer" } )
Graph Construction
Complete Workflow Example
from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated import operator from langchain_anthropic import ChatAnthropic # State schema with reducer class ResearchState(TypedDict): topic: str research_notes: Annotated[list, operator.add] # Reducer: appends to list draft: str revision_count: int approved: bool # Initialize LLM llm = ChatAnthropic(model="claude-sonnet-4") # Node 1: Research def research_node(state: ResearchState) -> dict: """Research the topic and gather information.""" topic = state["topic"] prompt = f"Research key points about: {topic}" response = llm.invoke(prompt) return { "research_notes": [response.content] } # Node 2: Write def write_node(state: ResearchState) -> dict: """Write draft based on research.""" notes = "\n".join(state["research_notes"]) prompt = f"Write article based on:\n{notes}" response = llm.invoke(prompt) return { "draft": response.content, "revision_count": state.get("revision_count", 0) } # Node 3: Review def review_node(state: ResearchState) -> dict: """Review the draft and decide if approved.""" draft = state["draft"] prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}" response = llm.invoke(prompt) approved = "APPROVED" in response.content return { "approved": approved, "revision_count": state["revision_count"] + 1, "research_notes": [f"Review feedback: {response.content}"] } # Routing logic def should_continue_writing(state: ResearchState) -> str: """Decide next step after review.""" if state["approved"]: return END elif state["revision_count"] >= 3: return END # Max revisions reached else: return "writer" # Build graph workflow = StateGraph(ResearchState) workflow.add_node("researcher", research_node) workflow.add_node("writer", write_node) workflow.add_node("reviewer", review_node) # Static edges workflow.add_edge("researcher", "writer") workflow.add_edge("writer", "reviewer") # Conditional edge from reviewer workflow.add_conditional_edges( "reviewer", should_continue_writing, { END: END, "writer": "writer" } ) workflow.set_entry_point("researcher") # Compile app = workflow.compile() # Execute result = app.invoke({ "topic": "AI Safety", "research_notes": [], "draft": "", "revision_count": 0, "approved": False }) print(f"Final draft: {result['draft']}") print(f"Revisions: {result['revision_count']}")
State Management
State Schema with TypedDict
from typing import TypedDict, Annotated, Literal import operator class WorkflowState(TypedDict): # Simple fields (replaced on update) user_id: str request_id: str status: Literal["pending", "processing", "complete", "failed"] # List with reducer (appends instead of replacing) messages: Annotated[list, operator.add] # Dict with custom reducer metadata: Annotated[dict, lambda x, y: {**x, **y}] # Optional fields error: str | None result: dict | None
State Reducers
Reducers control how state updates are merged.
import operator from typing import Annotated # Built-in reducers Annotated[list, operator.add] # Append to list Annotated[set, operator.or_] # Union of sets Annotated[int, operator.add] # Sum integers # Custom reducer def merge_dicts(existing: dict, update: dict) -> dict: """Deep merge dictionaries.""" result = existing.copy() for key, value in update.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = merge_dicts(result[key], value) else: result[key] = value return result class State(TypedDict): config: Annotated[dict, merge_dicts]
State Updates
Nodes return partial state - only fields to update:
def node_function(state: State) -> dict: """Return only fields that should be updated.""" return { "messages": ["New message"], # Will be appended "status": "processing" # Will be replaced } # Other fields remain unchanged
Conditional Routing
Basic Routing Function
def route_based_on_score(state: State) -> str: """Route to different nodes based on state.""" score = state["quality_score"] if score >= 0.9: return "publish" elif score >= 0.6: return "review" else: return "revise" workflow.add_conditional_edges( "evaluator", route_based_on_score, { "publish": "publisher", "review": "reviewer", "revise": "reviser" } )
Dynamic Routing with LLM
from langchain_anthropic import ChatAnthropic def llm_router(state: State) -> str: """Use LLM to decide next step.""" llm = ChatAnthropic(model="claude-sonnet-4") prompt = f""" Current state: {state['current_step']} User request: {state['user_input']} Which agent should handle this next? Options: researcher, coder, writer, FINISH Return only one word. """ response = llm.invoke(prompt) next_agent = response.content.strip().lower() if next_agent == "finish": return END else: return next_agent workflow.add_conditional_edges( "supervisor", llm_router, { "researcher": "researcher", "coder": "coder", "writer": "writer", END: END } )
Multi-Condition Routing
def complex_router(state: State) -> str: """Route based on multiple conditions.""" # Check multiple conditions has_errors = bool(state.get("errors")) is_approved = state.get("approved", False) iteration_count = state.get("iterations", 0) # Priority-based routing if has_errors: return "error_handler" elif is_approved: return END elif iteration_count >= 5: return "escalation" else: return "processor" workflow.add_conditional_edges( "validator", complex_router, { "error_handler": "error_handler", "processor": "processor", "escalation": "escalation", END: END } )
Multi-Agent Patterns
Supervisor Pattern
One supervisor coordinates multiple specialized agents.
from langgraph.graph import StateGraph, END from langchain_anthropic import ChatAnthropic from typing import TypedDict, Literal class SupervisorState(TypedDict): task: str agent_history: list result: dict next_agent: str # Specialized agents class ResearchAgent: def __init__(self): self.llm = ChatAnthropic(model="claude-sonnet-4") def run(self, state: SupervisorState) -> dict: response = self.llm.invoke(f"Research: {state['task']}") return { "agent_history": [f"Research: {response.content}"], "result": {"research": response.content} } class CodingAgent: def __init__(self): self.llm = ChatAnthropic(model="claude-sonnet-4") def run(self, state: SupervisorState) -> dict: response = self.llm.invoke(f"Code: {state['task']}") return { "agent_history": [f"Code: {response.content}"], "result": {"code": response.content} } class SupervisorAgent: def __init__(self): self.llm = ChatAnthropic(model="claude-sonnet-4") def route(self, state: SupervisorState) -> dict: """Decide which agent to use next.""" history = "\n".join(state.get("agent_history", [])) prompt = f""" Task: {state['task']} Progress: {history} Which agent should handle the next step? Options: researcher, coder, FINISH Return only one word. """ response = self.llm.invoke(prompt) next_agent = response.content.strip().lower() return {"next_agent": next_agent} # Build supervisor workflow def create_supervisor_workflow(): workflow = StateGraph(SupervisorState) # Initialize agents research_agent = ResearchAgent() coding_agent = CodingAgent() supervisor = SupervisorAgent() # Add nodes workflow.add_node("supervisor", supervisor.route) workflow.add_node("researcher", research_agent.run) workflow.add_node("coder", coding_agent.run) # Conditional routing from supervisor def route_from_supervisor(state: SupervisorState) -> str: next_agent = state.get("next_agent", "FINISH") if next_agent == "finish": return END return next_agent workflow.add_conditional_edges( "supervisor", route_from_supervisor, { "researcher": "researcher", "coder": "coder", END: END } ) # Loop back to supervisor after each agent workflow.add_edge("researcher", "supervisor") workflow.add_edge("coder", "supervisor") workflow.set_entry_point("supervisor") return workflow.compile() # Execute app = create_supervisor_workflow() result = app.invoke({ "task": "Build a REST API for user management", "agent_history": [], "result": {}, "next_agent": "" })
Hierarchical Multi-Agent
Nested supervisor pattern with sub-teams.
class TeamState(TypedDict): task: str team_results: dict def create_backend_team(): """Sub-graph for backend development.""" workflow = StateGraph(TeamState) workflow.add_node("api_designer", design_api) workflow.add_node("database_designer", design_db) workflow.add_node("implementer", implement_backend) workflow.add_edge("api_designer", "database_designer") workflow.add_edge("database_designer", "implementer") workflow.add_edge("implementer", END) workflow.set_entry_point("api_designer") return workflow.compile() def create_frontend_team(): """Sub-graph for frontend development.""" workflow = StateGraph(TeamState) workflow.add_node("ui_designer", design_ui) workflow.add_node("component_builder", build_components) workflow.add_node("integrator", integrate_frontend) workflow.add_edge("ui_designer", "component_builder") workflow.add_edge("component_builder", "integrator") workflow.add_edge("integrator", END) workflow.set_entry_point("ui_designer") return workflow.compile() # Top-level coordinator def create_project_workflow(): workflow = StateGraph(TeamState) # Add team sub-graphs as nodes backend_team = create_backend_team() frontend_team = create_frontend_team() workflow.add_node("backend_team", backend_team) workflow.add_node("frontend_team", frontend_team) workflow.add_node("integrator", integrate_teams) # Parallel execution of teams workflow.add_edge("backend_team", "integrator") workflow.add_edge("frontend_team", "integrator") workflow.add_edge("integrator", END) workflow.set_entry_point("backend_team") return workflow.compile()
Swarm Pattern (2025)
Dynamic agent hand-offs with collaborative decision-making.
from typing import Literal class SwarmState(TypedDict): task: str messages: list current_agent: str handoff_reason: str def create_swarm(): """Agents can dynamically hand off to each other.""" def research_agent(state: SwarmState) -> dict: llm = ChatAnthropic(model="claude-sonnet-4") # Perform research response = llm.invoke(f"Research: {state['task']}") # Decide if handoff needed needs_code = "implementation" in response.content.lower() if needs_code: return { "messages": [response.content], "current_agent": "coder", "handoff_reason": "Need implementation" } else: return { "messages": [response.content], "current_agent": "writer", "handoff_reason": "Ready for documentation" } def coding_agent(state: SwarmState) -> dict: llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(f"Implement: {state['task']}") return { "messages": [response.content], "current_agent": "tester", "handoff_reason": "Code complete, needs testing" } def tester_agent(state: SwarmState) -> dict: # Test the code tests_pass = True # Simplified if tests_pass: return { "messages": ["Tests passed"], "current_agent": "DONE", "handoff_reason": "All tests passing" } else: return { "messages": ["Tests failed"], "current_agent": "coder", "handoff_reason": "Fix failing tests" } # Build graph workflow = StateGraph(SwarmState) workflow.add_node("researcher", research_agent) workflow.add_node("coder", coding_agent) workflow.add_node("tester", tester_agent) # Dynamic routing based on current_agent def route_swarm(state: SwarmState) -> str: next_agent = state.get("current_agent", "DONE") if next_agent == "DONE": return END return next_agent workflow.add_conditional_edges( "researcher", route_swarm, {"coder": "coder", "writer": "writer"} ) workflow.add_conditional_edges( "coder", route_swarm, {"tester": "tester"} ) workflow.add_conditional_edges( "tester", route_swarm, {"coder": "coder", END: END} ) workflow.set_entry_point("researcher") return workflow.compile()
Human-in-the-Loop
Interrupt for Approval
from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.graph import StateGraph, END # Enable checkpointing for interrupts memory = SqliteSaver.from_conn_string(":memory:") def create_approval_workflow(): workflow = StateGraph(dict) workflow.add_node("draft", create_draft) workflow.add_node("approve", human_approval) # Interrupt point workflow.add_node("publish", publish_content) workflow.add_edge("draft", "approve") workflow.add_edge("approve", "publish") workflow.add_edge("publish", END) workflow.set_entry_point("draft") # Compile with interrupt before "approve" return workflow.compile( checkpointer=memory, interrupt_before=["approve"] # Pause here ) # Usage app = create_approval_workflow() config = {"configurable": {"thread_id": "1"}} # Step 1: Run until interrupt for event in app.stream({"content": "Draft article..."}, config): print(event) # Workflow pauses at "approve" node # Human reviews draft print("Draft ready for review. Approve? (y/n)") approval = input() # Step 2: Resume with approval decision if approval == "y": # Continue execution result = app.invoke(None, config) # Resume from checkpoint print("Published:", result) else: # Optionally update state and retry app.update_state(config, {"needs_revision": True}) result = app.invoke(None, config)
Interactive Approval Gates
class ApprovalState(TypedDict): draft: str approved: bool feedback: str def approval_node(state: ApprovalState) -> dict: """This node requires human interaction.""" # This is where workflow pauses # Human provides input through update_state return state # No automatic changes def create_interactive_workflow(): workflow = StateGraph(ApprovalState) workflow.add_node("writer", write_draft) workflow.add_node("approval_gate", approval_node) workflow.add_node("reviser", revise_draft) workflow.add_node("publisher", publish_final) workflow.add_edge("writer", "approval_gate") # Route based on approval def route_after_approval(state: ApprovalState) -> str: if state.get("approved"): return "publisher" else: return "reviser" workflow.add_conditional_edges( "approval_gate", route_after_approval, {"publisher": "publisher", "reviser": "reviser"} ) workflow.add_edge("reviser", "approval_gate") # Loop back workflow.add_edge("publisher", END) workflow.set_entry_point("writer") memory = SqliteSaver.from_conn_string("approvals.db") return workflow.compile( checkpointer=memory, interrupt_before=["approval_gate"] ) # Execute with human intervention app = create_interactive_workflow() config = {"configurable": {"thread_id": "article_123"}} # Step 1: Generate draft (pauses at approval_gate) for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config): print(event) # Step 2: Human reviews and provides feedback current_state = app.get_state(config) print(f"Review this draft: {current_state.values['draft']}") # Human decides app.update_state(config, { "approved": False, "feedback": "Add more examples in section 2" }) # Step 3: Resume (goes to reviser due to approved=False) result = app.invoke(None, config)
Checkpointing and Persistence
MemorySaver (In-Memory)
from langgraph.checkpoint.memory import MemorySaver # In-memory checkpointing (lost on restart) memory = MemorySaver() app = workflow.compile(checkpointer=memory) config = {"configurable": {"thread_id": "conversation_1"}} # State automatically saved at each step result = app.invoke(initial_state, config)
SQLite Persistence
from langgraph.checkpoint.sqlite import SqliteSaver # Persistent checkpointing with SQLite checkpointer = SqliteSaver.from_conn_string("./workflow_state.db") app = workflow.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "user_session_456"}} # State persists across restarts result = app.invoke(initial_state, config) # Later: Resume from same thread result2 = app.invoke(None, config) # Continues from last state
PostgreSQL for Production
from langgraph.checkpoint.postgres import PostgresSaver # Production-grade persistence checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph_db" ) app = workflow.compile(checkpointer=checkpointer) # Supports concurrent workflows with isolation config1 = {"configurable": {"thread_id": "user_1"}} config2 = {"configurable": {"thread_id": "user_2"}} # Each thread maintains independent state result1 = app.invoke(state1, config1) result2 = app.invoke(state2, config2)
Redis for Distributed Systems
from langgraph.checkpoint.redis import RedisSaver # Distributed checkpointing checkpointer = RedisSaver.from_conn_string( "redis://localhost:6379", ttl=3600 # 1 hour TTL ) app = workflow.compile(checkpointer=checkpointer) # Supports horizontal scaling # Multiple workers can process different threads
Time-Travel Debugging
View State History
from langgraph.checkpoint.sqlite import SqliteSaver checkpointer = SqliteSaver.from_conn_string("workflow.db") app = workflow.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "debug_session"}} # Run workflow result = app.invoke(initial_state, config) # Retrieve full history history = app.get_state_history(config) for i, state_snapshot in enumerate(history): print(f"Step {i}:") print(f" Node: {state_snapshot.next}") print(f" State: {state_snapshot.values}") print(f" Metadata: {state_snapshot.metadata}") print()
Replay from Checkpoint
# Get state at specific step history = list(app.get_state_history(config)) checkpoint_3 = history[3] # Get state at step 3 # Replay from that checkpoint config_replay = { "configurable": { "thread_id": "debug_session", "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"] } } # Resume from step 3 result = app.invoke(None, config_replay)
Rewind and Modify
# Rewind to step 5 history = list(app.get_state_history(config)) step_5 = history[5] # Update state at that point app.update_state( config, {"messages": ["Modified message"]}, as_node="researcher" # Apply update as if from this node ) # Continue execution with modified state result = app.invoke(None, config)
Debug Workflow
def debug_workflow(): """Debug workflow step-by-step.""" checkpointer = SqliteSaver.from_conn_string("debug.db") app = workflow.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "debug"}} # Execute step-by-step state = initial_state for step in range(10): # Max 10 steps print(f"\n=== Step {step} ===") # Get current state current = app.get_state(config) print(f"Current state: {current.values}") print(f"Next node: {current.next}") if not current.next: print("Workflow complete") break # Execute one step for event in app.stream(None, config): print(f"Event: {event}") # Pause for inspection input("Press Enter to continue...") # View full history print("\n=== Full History ===") for i, snapshot in enumerate(app.get_state_history(config)): print(f"Step {i}: {snapshot.next} -> {snapshot.values}") debug_workflow()
Streaming
Token Streaming
from langchain_anthropic import ChatAnthropic def streaming_node(state: dict) -> dict: """Node that streams LLM output.""" llm = ChatAnthropic(model="claude-sonnet-4") # Use streaming response = "" for chunk in llm.stream(state["prompt"]): content = chunk.content print(content, end="", flush=True) response += content return {"response": response} # Stream events from workflow for event in app.stream(initial_state): print(event)
Event Streaming
# Stream all events (node execution, state updates) for event in app.stream(initial_state, stream_mode="updates"): node_name = event.keys()[0] state_update = event[node_name] print(f"Node '{node_name}' updated state: {state_update}")
Custom Streaming
from typing import AsyncGenerator async def streaming_workflow(input_data: dict) -> AsyncGenerator: """Stream workflow progress in real-time.""" config = {"configurable": {"thread_id": "stream"}} async for event in app.astream(input_data, config): # Stream each state update yield { "type": "state_update", "data": event } # Stream final result final_state = app.get_state(config) yield { "type": "complete", "data": final_state.values } # Usage in FastAPI from fastapi import FastAPI from fastapi.responses import StreamingResponse app_api = FastAPI() @app_api.post("/workflow/stream") async def stream_workflow(input: dict): return StreamingResponse( streaming_workflow(input), media_type="text/event-stream" )
Tool Integration
LangChain Tools
from langchain.tools import Tool from langchain_community.tools import DuckDuckGoSearchRun from langchain.agents import AgentExecutor, create_react_agent # Define tools search = DuckDuckGoSearchRun() tools = [ Tool( name="Search", func=search.run, description="Search the web for information" ) ] def agent_with_tools(state: dict) -> dict: """Node that uses LangChain tools.""" from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-sonnet-4") # Create agent with tools agent = create_react_agent(llm, tools, prompt_template) agent_executor = AgentExecutor(agent=agent, tools=tools) # Execute with tools result = agent_executor.invoke({"input": state["query"]}) return {"result": result["output"]} # Add to graph workflow.add_node("agent_with_tools", agent_with_tools)
Custom Tools
from langchain.tools import StructuredTool from pydantic import BaseModel, Field class CalculatorInput(BaseModel): expression: str = Field(description="Mathematical expression to evaluate") def calculate(expression: str) -> str: """Evaluate mathematical expression.""" try: result = eval(expression) # Simplified return f"Result: {result}" except Exception as e: return f"Error: {e}" calculator_tool = StructuredTool.from_function( func=calculate, name="Calculator", description="Evaluate mathematical expressions", args_schema=CalculatorInput ) tools = [calculator_tool] def tool_using_node(state: dict) -> dict: """Use custom tools.""" result = calculator_tool.invoke({"expression": state["math_problem"]}) return {"solution": result}
Anthropic Tool Use
from anthropic import Anthropic import json def node_with_anthropic_tools(state: dict) -> dict: """Use Anthropic's tool use feature.""" client = Anthropic() # Define tools tools = [ { "name": "get_weather", "description": "Get weather for a location", "input_schema": { "type": "object", "properties": { "location": {"type": "string"} }, "required": ["location"] } } ] # First call: Request tool use response = client.messages.create( model="claude-sonnet-4", max_tokens=1024, tools=tools, messages=[{"role": "user", "content": state["query"]}] ) # Execute tool if requested if response.stop_reason == "tool_use": tool_use = next( block for block in response.content if block.type == "tool_use" ) # Execute tool tool_result = execute_tool(tool_use.name, tool_use.input) # Second call: Provide tool result final_response = client.messages.create( model="claude-sonnet-4", max_tokens=1024, tools=tools, messages=[ {"role": "user", "content": state["query"]}, {"role": "assistant", "content": response.content}, { "role": "user", "content": [ { "type": "tool_result", "tool_use_id": tool_use.id, "content": str(tool_result) } ] } ] ) return {"response": final_response.content[0].text} return {"response": response.content[0].text}
Error Handling
Try-Catch in Nodes
def robust_node(state: dict) -> dict: """Node with error handling.""" try: # Main logic result = risky_operation(state["input"]) return { "result": result, "error": None, "status": "success" } except ValueError as e: # Handle specific error return { "error": f"Validation error: {e}", "status": "validation_failed" } except Exception as e: # Handle unexpected errors return { "error": f"Unexpected error: {e}", "status": "failed" } # Route based on error status def error_router(state: dict) -> str: status = state.get("status") if status == "success": return "next_step" elif status == "validation_failed": return "validator" else: return "error_handler" workflow.add_conditional_edges( "robust_node", error_router, { "next_step": "next_step", "validator": "validator", "error_handler": "error_handler" } )
Retry Logic
from typing import TypedDict class RetryState(TypedDict): input: str result: str error: str | None retry_count: int max_retries: int def node_with_retry(state: RetryState) -> dict: """Node that can be retried on failure.""" try: result = unstable_operation(state["input"]) return { "result": result, "error": None, "retry_count": 0 } except Exception as e: return { "error": str(e), "retry_count": state.get("retry_count", 0) + 1 } def should_retry(state: RetryState) -> str: """Decide whether to retry or fail.""" if state.get("error") is None: return "success" retry_count = state.get("retry_count", 0) max_retries = state.get("max_retries", 3) if retry_count < max_retries: return "retry" else: return "failed" # Build retry loop workflow.add_node("operation", node_with_retry) workflow.add_node("success_handler", handle_success) workflow.add_node("failure_handler", handle_failure) workflow.add_conditional_edges( "operation", should_retry, { "success": "success_handler", "retry": "operation", # Loop back "failed": "failure_handler" } )
Fallback Strategies
def node_with_fallback(state: dict) -> dict: """Try primary method, fall back to secondary.""" # Try primary LLM try: result = primary_llm(state["prompt"]) return { "result": result, "method": "primary" } except Exception as e_primary: # Fallback to secondary LLM try: result = secondary_llm(state["prompt"]) return { "result": result, "method": "fallback", "primary_error": str(e_primary) } except Exception as e_secondary: # Both failed return { "error": f"Both methods failed: {e_primary}, {e_secondary}", "method": "failed" }
Subgraphs and Composition
Subgraphs as Nodes
def create_subgraph(): """Create reusable subgraph.""" subgraph = StateGraph(dict) subgraph.add_node("step1", step1_func) subgraph.add_node("step2", step2_func) subgraph.add_edge("step1", "step2") subgraph.add_edge("step2", END) subgraph.set_entry_point("step1") return subgraph.compile() # Use subgraph in main graph def create_main_graph(): workflow = StateGraph(dict) # Add subgraph as a node subgraph_compiled = create_subgraph() workflow.add_node("subgraph", subgraph_compiled) workflow.add_node("before", before_func) workflow.add_node("after", after_func) workflow.add_edge("before", "subgraph") workflow.add_edge("subgraph", "after") workflow.add_edge("after", END) workflow.set_entry_point("before") return workflow.compile()
Parallel Subgraphs
from langgraph.graph import StateGraph, END def create_parallel_workflow(): """Execute multiple subgraphs in parallel.""" # Create subgraphs backend_graph = create_backend_subgraph() frontend_graph = create_frontend_subgraph() database_graph = create_database_subgraph() # Main graph workflow = StateGraph(dict) # Add subgraphs as parallel nodes workflow.add_node("backend", backend_graph) workflow.add_node("frontend", frontend_graph) workflow.add_node("database", database_graph) # Merge results workflow.add_node("merge", merge_results) # All subgraphs lead to merge workflow.add_edge("backend", "merge") workflow.add_edge("frontend", "merge") workflow.add_edge("database", "merge") workflow.add_edge("merge", END) # Set all as entry points (parallel execution) workflow.set_entry_point("backend") workflow.set_entry_point("frontend") workflow.set_entry_point("database") return workflow.compile()
Map-Reduce Patterns
Map-Reduce with LangGraph
from typing import TypedDict, List class MapReduceState(TypedDict): documents: List[str] summaries: List[str] final_summary: str def map_node(state: MapReduceState) -> dict: """Map: Summarize each document.""" llm = ChatAnthropic(model="claude-sonnet-4") summaries = [] for doc in state["documents"]: summary = llm.invoke(f"Summarize: {doc}") summaries.append(summary.content) return {"summaries": summaries} def reduce_node(state: MapReduceState) -> dict: """Reduce: Combine summaries into final summary.""" llm = ChatAnthropic(model="claude-sonnet-4") combined = "\n".join(state["summaries"]) final = llm.invoke(f"Combine these summaries:\n{combined}") return {"final_summary": final.content} # Build map-reduce workflow workflow = StateGraph(MapReduceState) workflow.add_node("map", map_node) workflow.add_node("reduce", reduce_node) workflow.add_edge("map", "reduce") workflow.add_edge("reduce", END) workflow.set_entry_point("map") app = workflow.compile() # Execute result = app.invoke({ "documents": ["Doc 1...", "Doc 2...", "Doc 3..."], "summaries": [], "final_summary": "" })
Parallel Map with Batching
import asyncio from typing import List async def parallel_map_node(state: dict) -> dict: """Map: Process documents in parallel.""" llm = ChatAnthropic(model="claude-sonnet-4") async def summarize(doc: str) -> str: response = await llm.ainvoke(f"Summarize: {doc}") return response.content # Process all documents in parallel summaries = await asyncio.gather( *[summarize(doc) for doc in state["documents"]] ) return {"summaries": list(summaries)} # Use async workflow app = workflow.compile() # Execute with asyncio result = asyncio.run(app.ainvoke({ "documents": ["Doc 1", "Doc 2", "Doc 3"], "summaries": [], "final_summary": "" }))
Production Deployment
LangGraph Cloud
# Deploy to LangGraph Cloud (managed service) # 1. Install LangGraph CLI # pip install langgraph-cli # 2. Create langgraph.json config langgraph_config = { "dependencies": ["langchain", "langchain-anthropic"], "graphs": { "agent": "./graph.py:app" }, "env": { "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY" } } # 3. Deploy # langgraph deploy # 4. Use deployed graph via API import requests response = requests.post( "https://your-deployment.langraph.cloud/invoke", json={ "input": {"messages": ["Hello"]}, "config": {"thread_id": "user_123"} }, headers={"Authorization": "Bearer YOUR_API_KEY"} ) result = response.json()
Self-Hosted with FastAPI
from fastapi import FastAPI, HTTPException from pydantic import BaseModel from langgraph.checkpoint.postgres import PostgresSaver import uvicorn # Create FastAPI app app_api = FastAPI() # Initialize workflow checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost/langgraph" ) workflow_app = workflow.compile(checkpointer=checkpointer) class WorkflowRequest(BaseModel): input: dict thread_id: str class WorkflowResponse(BaseModel): output: dict thread_id: str @app_api.post("/invoke", response_model=WorkflowResponse) async def invoke_workflow(request: WorkflowRequest): """Invoke workflow synchronously.""" try: config = {"configurable": {"thread_id": request.thread_id}} result = workflow_app.invoke(request.input, config) return WorkflowResponse( output=result, thread_id=request.thread_id ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app_api.post("/stream") async def stream_workflow(request: WorkflowRequest): """Stream workflow execution.""" from fastapi.responses import StreamingResponse config = {"configurable": {"thread_id": request.thread_id}} async def generate(): async for event in workflow_app.astream(request.input, config): yield f"data: {json.dumps(event)}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") # Run server if __name__ == "__main__": uvicorn.run(app_api, host="0.0.0.0", port=8000)
Docker Deployment
# Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml version: '3.8' services: app: build: . ports: - "8000:8000" environment: - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph depends_on: - db - redis db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=langgraph volumes: - postgres_data:/var/lib/postgresql/data redis: image: redis:7 ports: - "6379:6379" volumes: postgres_data:
LangSmith Integration
Automatic Tracing
import os from langchain_anthropic import ChatAnthropic # Enable LangSmith tracing os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key" os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project" # All LangGraph executions automatically traced app = workflow.compile() result = app.invoke(initial_state) # View trace in LangSmith UI: https://smith.langchain.com
Custom Metadata
from langsmith import traceable @traceable( run_type="chain", name="research_agent", metadata={"version": "1.0.0"} ) def research_agent(state: dict) -> dict: """Node with custom LangSmith metadata.""" # Function execution automatically traced return {"research": "results"} workflow.add_node("researcher", research_agent)
Evaluation with LangSmith
from langsmith import Client from langsmith.evaluation import evaluate client = Client() # Create test dataset examples = [ { "inputs": {"topic": "AI Safety"}, "outputs": {"quality_score": 0.9} } ] dataset = client.create_dataset("langgraph_tests") for example in examples: client.create_example( dataset_id=dataset.id, inputs=example["inputs"], outputs=example["outputs"] ) # Evaluate workflow def workflow_wrapper(inputs: dict) -> dict: result = app.invoke(inputs) return result def quality_evaluator(run, example): """Custom evaluator.""" score = calculate_quality(run.outputs) return {"key": "quality", "score": score} results = evaluate( workflow_wrapper, data="langgraph_tests", evaluators=[quality_evaluator], experiment_prefix="langgraph_v1" ) print(f"Average quality: {results['results']['quality']:.2f}")
Real-World Examples
Customer Support Agent
from typing import TypedDict, Literal from langchain_anthropic import ChatAnthropic class SupportState(TypedDict): customer_query: str category: Literal["billing", "technical", "general"] priority: Literal["low", "medium", "high"] resolution: str escalated: bool def categorize(state: SupportState) -> dict: """Categorize customer query.""" llm = ChatAnthropic(model="claude-sonnet-4") prompt = f""" Categorize this support query: "{state['customer_query']}" Categories: billing, technical, general Priority: low, medium, high Return format: category|priority """ response = llm.invoke(prompt) category, priority = response.content.strip().split("|") return { "category": category, "priority": priority } def handle_billing(state: SupportState) -> dict: """Handle billing issues.""" llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(f"Resolve billing issue: {state['customer_query']}") return {"resolution": response.content} def handle_technical(state: SupportState) -> dict: """Handle technical issues.""" llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(f"Resolve technical issue: {state['customer_query']}") return {"resolution": response.content} def escalate(state: SupportState) -> dict: """Escalate to human agent.""" return { "escalated": True, "resolution": "Escalated to senior support team" } def route_by_category(state: SupportState) -> str: """Route based on category and priority.""" if state["priority"] == "high": return "escalate" category = state["category"] if category == "billing": return "billing" elif category == "technical": return "technical" else: return "general" # Build support workflow workflow = StateGraph(SupportState) workflow.add_node("categorize", categorize) workflow.add_node("billing", handle_billing) workflow.add_node("technical", handle_technical) workflow.add_node("general", handle_technical) # Reuse workflow.add_node("escalate", escalate) workflow.add_edge("categorize", "router") workflow.add_conditional_edges( "categorize", route_by_category, { "billing": "billing", "technical": "technical", "general": "general", "escalate": "escalate" } ) workflow.add_edge("billing", END) workflow.add_edge("technical", END) workflow.add_edge("general", END) workflow.add_edge("escalate", END) workflow.set_entry_point("categorize") support_app = workflow.compile() # Usage result = support_app.invoke({ "customer_query": "I was charged twice for my subscription", "category": "", "priority": "", "resolution": "", "escalated": False })
Research Assistant
class ResearchState(TypedDict): topic: str research_notes: list outline: str draft: str final_article: str revision_count: int def research(state: ResearchState) -> dict: """Gather information on topic.""" # Use search tools results = search_web(state["topic"]) return { "research_notes": [f"Research: {results}"] } def create_outline(state: ResearchState) -> dict: """Create article outline.""" llm = ChatAnthropic(model="claude-sonnet-4") notes = "\n".join(state["research_notes"]) response = llm.invoke(f"Create outline for: {notes}") return {"outline": response.content} def write_draft(state: ResearchState) -> dict: """Write article draft.""" llm = ChatAnthropic(model="claude-sonnet-4") prompt = f""" Topic: {state['topic']} Outline: {state['outline']} Research: {state['research_notes']} Write comprehensive article. """ response = llm.invoke(prompt) return {"draft": response.content} def review_and_revise(state: ResearchState) -> dict: """Review draft quality.""" llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(f"Review and improve:\n{state['draft']}") return { "final_article": response.content, "revision_count": state.get("revision_count", 0) + 1 } # Build research workflow workflow = StateGraph(ResearchState) workflow.add_node("research", research) workflow.add_node("outline", create_outline) workflow.add_node("write", write_draft) workflow.add_node("review", review_and_revise) workflow.add_edge("research", "outline") workflow.add_edge("outline", "write") workflow.add_edge("write", "review") def check_quality(state: ResearchState) -> str: if state["revision_count"] >= 2: return END # Simple quality check if len(state.get("final_article", "")) > 1000: return END else: return "write" # Revise workflow.add_conditional_edges( "review", check_quality, {END: END, "write": "write"} ) workflow.set_entry_point("research") research_app = workflow.compile()
Code Review Workflow
class CodeReviewState(TypedDict): code: str language: str lint_results: list test_results: dict security_scan: dict review_comments: list approved: bool def lint_code(state: CodeReviewState) -> dict: """Run linters on code.""" # Run appropriate linter issues = run_linter(state["code"], state["language"]) return {"lint_results": issues} def run_tests(state: CodeReviewState) -> dict: """Execute test suite.""" results = execute_tests(state["code"]) return {"test_results": results} def security_scan(state: CodeReviewState) -> dict: """Scan for security vulnerabilities.""" scan_results = scan_for_vulnerabilities(state["code"]) return {"security_scan": scan_results} def ai_review(state: CodeReviewState) -> dict: """AI-powered code review.""" llm = ChatAnthropic(model="claude-sonnet-4") prompt = f""" Review this {state['language']} code: {state['code']} Lint issues: {state['lint_results']} Test results: {state['test_results']} Security: {state['security_scan']} Provide review comments. """ response = llm.invoke(prompt) return {"review_comments": [response.content]} def approve_or_reject(state: CodeReviewState) -> dict: """Final approval decision.""" # Auto-approve if all checks pass lint_ok = len(state["lint_results"]) == 0 tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0) security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0 approved = lint_ok and tests_ok and security_ok return {"approved": approved} # Build code review workflow workflow = StateGraph(CodeReviewState) workflow.add_node("lint", lint_code) workflow.add_node("test", run_tests) workflow.add_node("security", security_scan) workflow.add_node("ai_review", ai_review) workflow.add_node("decision", approve_or_reject) # Parallel execution of checks workflow.add_edge("lint", "ai_review") workflow.add_edge("test", "ai_review") workflow.add_edge("security", "ai_review") workflow.add_edge("ai_review", "decision") workflow.add_edge("decision", END) workflow.set_entry_point("lint") workflow.set_entry_point("test") workflow.set_entry_point("security") code_review_app = workflow.compile()
Performance Optimization
Parallel Execution
# Multiple nodes with same parent execute in parallel workflow.add_edge("start", "task_a") workflow.add_edge("start", "task_b") workflow.add_edge("start", "task_c") # task_a, task_b, task_c run concurrently
Caching with Checkpointers
from langgraph.checkpoint.sqlite import SqliteSaver # Checkpoint results for reuse checkpointer = SqliteSaver.from_conn_string("cache.db") app = workflow.compile(checkpointer=checkpointer) # Same thread_id reuses cached results config = {"configurable": {"thread_id": "cached_session"}} # First run: executes all nodes result1 = app.invoke(input_data, config) # Second run: uses cached state result2 = app.invoke(None, config) # Instant
Batching
def batch_processing_node(state: dict) -> dict: """Process items in batches.""" items = state["items"] batch_size = 10 results = [] for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] batch_result = process_batch(batch) results.extend(batch_result) return {"results": results}
Async Execution
from langgraph.graph import StateGraph import asyncio async def async_node(state: dict) -> dict: """Async node for I/O-bound operations.""" results = await asyncio.gather( fetch_api_1(state["input"]), fetch_api_2(state["input"]), fetch_api_3(state["input"]) ) return {"results": results} # Use async invoke result = await app.ainvoke(input_data)
Comparison with Simple Chains
LangChain LCEL (Simple Chain)
from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-sonnet-4") # Simple linear chain chain = ( RunnablePassthrough() | llm | {"output": lambda x: x.content} ) result = chain.invoke("Hello")
Limitations:
- ❌ No state persistence
- ❌ No conditional branching
- ❌ No cycles/loops
- ❌ No human-in-the-loop
- ❌ Limited debugging
LangGraph (Cyclic Workflow)
from langgraph.graph import StateGraph, END workflow = StateGraph(dict) workflow.add_node("step1", step1_func) workflow.add_node("step2", step2_func) # Conditional loop workflow.add_conditional_edges( "step2", should_continue, {"step1": "step1", END: END} ) app = workflow.compile(checkpointer=memory)
Advantages:
- ✅ Persistent state across steps
- ✅ Conditional branching
- ✅ Cycles and loops
- ✅ Human-in-the-loop support
- ✅ Time-travel debugging
- ✅ Production-ready
Migration from LangChain LCEL
Before: LCEL Chain
from langchain_core.runnables import RunnablePassthrough from langchain_anthropic import ChatAnthropic llm = ChatAnthropic(model="claude-sonnet-4") chain = ( {"input": RunnablePassthrough()} | llm | {"output": lambda x: x.content} ) result = chain.invoke("Question")
After: LangGraph
from langgraph.graph import StateGraph, END from typing import TypedDict class State(TypedDict): input: str output: str def llm_node(state: State) -> dict: llm = ChatAnthropic(model="claude-sonnet-4") response = llm.invoke(state["input"]) return {"output": response.content} workflow = StateGraph(State) workflow.add_node("llm", llm_node) workflow.add_edge("llm", END) workflow.set_entry_point("llm") app = workflow.compile() result = app.invoke({"input": "Question", "output": ""})
Migration Checklist
- Identify stateful vs stateless operations
- Convert chain steps to graph nodes
- Define state schema (TypedDict)
- Add edges between nodes
- Implement conditional routing if needed
- Add checkpointing for state persistence
- Test with same inputs
- Verify output matches original chain
Best Practices
State Design
# ✅ GOOD: Flat, explicit state class GoodState(TypedDict): user_id: str messages: list current_step: str result: dict # ❌ BAD: Nested, ambiguous state class BadState(TypedDict): data: dict # Too generic context: Any # No type safety
Node Granularity
# ✅ GOOD: Single responsibility per node def validate_input(state): ... def process_data(state): ... def format_output(state): ... # ❌ BAD: Monolithic node def do_everything(state): ...
Error Handling
# ✅ GOOD: Explicit error states class State(TypedDict): result: dict | None error: str | None status: Literal["pending", "success", "failed"] def robust_node(state): try: result = operation() return {"result": result, "status": "success"} except Exception as e: return {"error": str(e), "status": "failed"} # ❌ BAD: Silent failures def fragile_node(state): try: return operation() except: return {} # Error lost
Testing
# Test individual nodes def test_node(): state = {"input": "test"} result = my_node(state) assert result["output"] == "expected" # Test full workflow def test_workflow(): app = workflow.compile() result = app.invoke(initial_state) assert result["final_output"] == "expected" # Test error paths def test_error_handling(): state = {"input": "invalid"} result = my_node(state) assert result["error"] is not None
Monitoring
import logging logger = logging.getLogger(__name__) def monitored_node(state: dict) -> dict: """Node with logging.""" logger.info(f"Executing node with state: {state.keys()}") try: result = operation() logger.info("Node succeeded") return {"result": result} except Exception as e: logger.error(f"Node failed: {e}") raise
Summary
LangGraph transforms agent development from linear chains into cyclic, stateful workflows with production-grade capabilities:
Core Strengths:
- 🔄 Cyclic workflows with loops and branches
- 💾 Persistent state across sessions
- 🕐 Time-travel debugging and replay
- 👤 Human-in-the-loop approval gates
- 🔧 Tool integration (LangChain, Anthropic, custom)
- 📊 LangSmith integration for tracing
- 🏭 Production deployment (cloud or self-hosted)
When to Choose LangGraph:
- Multi-agent coordination required
- Complex branching logic
- State persistence needed
- Human approvals in workflow
- Production systems with debugging needs
Learning Path:
- Start with StateGraph basics
- Add conditional routing
- Implement checkpointing
- Build multi-agent patterns
- Deploy to production
Production Checklist:
- State schema with TypedDict
- Error handling in all nodes
- Checkpointing configured
- LangSmith tracing enabled
- Monitoring and logging
- Testing (unit + integration)
- Deployment strategy (cloud/self-hosted)
For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.
Related Skills
When using Langgraph, these skills enhance your workflow:
- dspy: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)
- test-driven-development: Testing LangGraph workflows and state machines
- systematic-debugging: Debugging multi-agent workflows and state transitions
[Full documentation available in these skills if deployed in your bundle]