Claude-skill-registry langgraph-python-expert
Expert guidance for LangGraph Python library. Build stateful, multi-actor applications with LLMs using nodes, edges, and state management. Use when working with LangGraph, building agent workflows, state machines, or complex multi-step LLM applications. Requires langgraph, langchain-core packages.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/langgraph-python-expert" ~/.claude/skills/majiayu000-claude-skill-registry-langgraph-python-expert && rm -rf "$T"
skills/data/langgraph-python-expert/SKILL.mdLangGraph Python Expert
Comprehensive expert for building sophisticated stateful applications with LangGraph, focusing on production-ready workflows, state management, and agent orchestration.
📚 Official Source Documentation
This skill includes access to the official LangGraph source code through the
source/langgraph/ directory (managed as git submodule with sparse-checkout), which contains:
- Core Libraries:
,libs/langgraph/
,libs/prebuilt/libs/checkpoint*/ - Official Examples:
- Up-to-date examples and tutorialsexamples/ - Complete Documentation:
- Latest documentation and API referencesdocs/docs/
Source Structure (66MB with sparse-checkout)
source/langgraph/ ├── libs/ │ ├── langgraph/ # Core StateGraph, nodes, edges │ ├── prebuilt/ # create_react_agent, ToolNode │ ├── checkpoint/ # Base checkpoint classes │ ├── checkpoint-sqlite/ # SQLite persistence │ └── checkpoint-postgres/# PostgreSQL persistence ├── examples/ # Official examples and tutorials ├── docs/docs/ # Documentation (concepts, how-tos, reference) ├── README.md # Project overview ├── CLAUDE.md # Claude Code instructions └── AGENTS.md # Agent development guide
Updating Source Code
cd source/langgraph git pull origin main
For detailed structure, see SOURCE_STRUCTURE.md.
Quick Start
Installation
pip install langgraph langchain-core langchain-openai
Basic Concepts
StateGraph: The core component for building workflows with state persistence Nodes: Functions that process the state and return updates Edges: Define the flow between nodes (conditional or direct) State: TypedDict that holds conversation/application state Persistence: Checkpointing for memory and conversation history
Core Components
1. State Definition
from typing import TypedDict, List, Optional from langchain_core.messages import BaseMessage class AgentState(TypedDict): messages: List[BaseMessage] current_user: Optional[str] step_count: int requires_action: bool
2. Node Functions
from langchain_core.messages import HumanMessage, AIMessage def llm_node(state: AgentState) -> AgentState: """Process messages with LLM and return updated state""" messages = state["messages"] response = llm.invoke(messages) return { "messages": messages + [response], "step_count": state["step_count"] + 1 } def router_node(state: AgentState) -> str: """Decide next node based on state""" last_message = state["messages"][-1] if "tool_call" in last_message.additional_kwargs: return "tool_executor" return "end"
3. Graph Construction
from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver # Create graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("agent", agent_node) workflow.add_node("tool_executor", tool_node) workflow.add_node("router", router_node) # Add edges workflow.set_entry_point("agent") workflow.add_conditional_edges( "agent", router_node, { "tool_executor": "tool_executor", "end": END } ) workflow.add_edge("tool_executor", "agent") # Memory memory = MemorySaver() app = workflow.compile(checkpointer=memory)
Advanced Patterns
1. Multi-Agent Collaboration
from langgraph.graph import StateGraph, MessagesState from langgraph.prebuilt import create_react_agent class MultiAgentState(MessagesState): researcher_notes: str writer_content: str reviewer_feedback: List[str] def researcher_node(state: MultiAgentState) -> MultiAgentState: """Research agent that gathers information""" researcher_agent = create_react_agent(llm, research_tools) result = researcher_agent.invoke({ "messages": state["messages"][-2:] # Last two messages }) return { "researcher_notes": result["messages"][-1].content, "messages": state["messages"] + result["messages"] } def writer_node(state: MultiAgentState) -> MultiAgentState: """Writer agent that creates content based on research""" writer_agent = create_react_agent(llm, writing_tools) prompt = f"Research notes: {state['researcher_notes']}" result = writer_agent.invoke({ "messages": [HumanMessage(content=prompt)] }) return { "writer_content": result["messages"][-1].content, "messages": state["messages"] + result["messages"] }
2. Dynamic Tool Selection
from typing import Dict, Any from langchain_core.tools import BaseTool class DynamicToolNode: def __init__(self, tool_registry: Dict[str, BaseTool]): self.tool_registry = tool_registry def __call__(self, state: AgentState) -> AgentState: last_message = state["messages"][-1] if not last_message.tool_calls: return state # Dynamically select tools based on context selected_tools = self.select_tools_by_context(state) # Execute tool calls tool_messages = [] for tool_call in last_message.tool_calls: if tool_call["name"] in selected_tools: tool = selected_tools[tool_call["name"]] result = tool.invoke(tool_call["args"]) tool_messages.append( ToolMessage( tool_call_id=tool_call["id"], content=str(result) ) ) return { "messages": state["messages"] + tool_messages } def select_tools_by_context(self, state: AgentState) -> Dict[str, BaseTool]: """Intelligently select tools based on conversation context""" context = " ".join([msg.content for msg in state["messages"][-5:]]) available_tools = {} if "code" in context.lower(): available_tools.update({"code_executor": code_tool}) if "search" in context.lower(): available_tools.update({"web_search": search_tool}) if "math" in context.lower(): available_tools.update({"calculator": math_tool}) return available_tools
3. State Persistence and Recovery
from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.checkpoint.postgres import PostgresSaver # Production-ready persistence def create_production_app(): # Use PostgreSQL for production connection_string = "postgresql://user:pass@localhost/langgraph" checkpointer = PostgresSaver.from_conn_string(connection_string) # Build workflow workflow = StateGraph(AgentState) # ... add nodes and edges # Compile with persistence app = workflow.compile(checkpointer=checkpointer) return app # Thread-based conversation management def manage_conversation(app, thread_id: str): """Manage persistent conversations across sessions""" config = {"configurable": {"thread_id": thread_id}} # Continue existing conversation result = app.invoke({ "messages": [HumanMessage(content="Continue our discussion")] }, config) return result
4. Error Handling and Retry Logic
from typing import Union from langgraph.graph import StateGraph import time class RobustAgentState(TypedDict): messages: List[BaseMessage] retry_count: int max_retries: int error_history: List[str] def error_handling_node(state: RobustAgentState) -> Union[RobustAgentState, str]: """Node with built-in error handling and retry logic""" try: # Attempt the primary operation result = perform_operation(state) # Reset retry count on success return { **result, "retry_count": 0, "error_history": [] } except Exception as e: error_msg = str(e) new_retry_count = state["retry_count"] + 1 if new_retry_count >= state["max_retries"]: return "error_handler" # Route to error handling # Add delay for exponential backoff time.sleep(2 ** new_retry_count) return { "retry_count": new_retry_count, "error_history": state["error_history"] + [error_msg] } def fallback_node(state: RobustAgentState) -> RobustAgentState: """Fallback strategy when primary operation fails""" last_error = state["error_history"][-1] if state["error_history"] else "Unknown error" fallback_message = AIMessage( content=f"I encountered an error: {last_error}. " f"Let me try a different approach." ) return { "messages": state["messages"] + [fallback_message], "retry_count": 0 }
Integration Examples
1. RAG with LangGraph
def create_rag_graph(): class RAGState(TypedDict): question: str context: List[str] answer: str sources: List[str] def retrieve_node(state: RAGState) -> RAGState: # Retrieve relevant documents docs = retriever.invoke(state["question"]) return { "context": [doc.page_content for doc in docs], "sources": [doc.metadata.get("source", "unknown") for doc in docs] } def generate_node(state: RAGState) -> RAGState: # Generate answer using retrieved context prompt = f""" Question: {state['question']} Context: {state['context']} Generate a comprehensive answer based on the context. """ response = llm.invoke([HumanMessage(content=prompt)]) return { "answer": response.content } # Build RAG workflow workflow = StateGraph(RAGState) workflow.add_node("retrieve", retrieve_node) workflow.add_node("generate", generate_node) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "generate") workflow.add_edge("generate", END) return workflow.compile()
2. Sequential Task Processing
def create_sequential_processor(): class TaskState(TypedDict): tasks: List[Dict[str, Any]] current_task_index: int results: List[Any] status: str def task_executor(state: TaskState) -> TaskState: idx = state["current_task_index"] if idx >= len(state["tasks"]): return {"status": "completed"} current_task = state["tasks"][idx] result = execute_task(current_task) return { "current_task_index": idx + 1, "results": state["results"] + [result], "status": "processing" if idx + 1 < len(state["tasks"]) else "completed" } def task_router(state: TaskState) -> str: if state["status"] == "completed": return END return "continue_processing" workflow = StateGraph(TaskState) workflow.add_node("execute_task", task_executor) workflow.add_conditional_edges("execute_task", task_router) return workflow.compile()
Best Practices
1. State Design
- Keep state minimal and focused
- Use TypedDict for type safety
- Avoid storing large objects in state
- Use references/IDs instead of full objects when possible
2. Node Design
- Make nodes pure functions when possible
- Handle errors gracefully
- Return only the state keys that need updating
- Use descriptive names for clarity
3. Graph Architecture
- Break complex workflows into smaller, reusable subgraphs
- Use conditional edges for intelligent routing
- Implement proper error handling paths
- Design for testability and debugging
4. Performance Optimization
- Use streaming for long-running operations
- Implement proper caching strategies
- Consider async/await for I/O operations
- Monitor and optimize checkpoint sizes
Testing and Debugging
1. Unit Testing Nodes
import pytest from langgraph.graph import StateGraph def test_llm_node(): # Mock state test_state = { "messages": [HumanMessage(content="Test message")], "step_count": 0 } # Mock LLM with patch('your_module.llm') as mock_llm: mock_llm.invoke.return_value = AIMessage(content="Test response") result = llm_node(test_state) assert result["step_count"] == 1 assert len(result["messages"]) == 2 mock_llm.invoke.assert_called_once()
2. Integration Testing
def test_full_workflow(): app = create_test_workflow() initial_state = { "messages": [HumanMessage(content="Hello")], "step_count": 0 } result = app.invoke(initial_state) assert "messages" in result assert result["messages"][-1].type == "ai"
3. Debugging Tools
# Enable debug mode import langgraph langgraph.debug = True # Print state transitions def debug_node(state: AgentState) -> AgentState: print(f"Node input: {state}") result = your_node_logic(state) print(f"Node output: {result}") return result # Use with context manager from langgraph.graph import StateGraph def create_debug_workflow(): workflow = StateGraph(AgentState) workflow.add_node("debug_step", debug_node) # ... rest of workflow return workflow.compile()
Common Patterns and Solutions
1. Human-in-the-Loop
def human_approval_node(state: AgentState) -> AgentState: """Wait for human approval before proceeding""" last_message = state["messages"][-1] if state.get("awaiting_approval"): # Check if approval was received user_input = input(f"Approve this action? {last_message.content} (y/n): ") if user_input.lower() == 'y': return { "awaiting_approval": False, "messages": state["messages"] + [ AIMessage(content="Action approved by human") ] } else: return { "awaiting_approval": False, "messages": state["messages"] + [ AIMessage(content="Action rejected by human") ] } else: # Request approval return { "awaiting_approval": True, "messages": state["messages"] }
2. Parallel Processing
from langgraph.graph import StateGraph, START, END def parallel_processor(state: Dict[str, Any]) -> Dict[str, Any]: """Process multiple items in parallel""" input_data = state["input_items"] # Define parallel tasks def task_1(data): return process_type_1(data) def task_2(data): return process_type_2(data) # Execute in parallel (using threading or async) with ThreadPoolExecutor(max_workers=2) as executor: future_1 = executor.submit(task_1, input_data) future_2 = executor.submit(task_2, input_data) result_1 = future_1.result() result_2 = future_2.result() return { "result_1": result_1, "result_2": result_2 }
Production Deployment
1. Environment Setup
import os from langgraph.graph import StateGraph from langgraph.checkpoint.postgres import PostgresSaver def create_production_app(): # Load configuration db_url = os.getenv("DATABASE_URL") openai_api_key = os.getenv("OPENAI_API_KEY") # Initialize components checkpointer = PostgresSaver.from_conn_string(db_url) # Build workflow with production settings workflow = StateGraph(ProductionState) # ... add nodes and edges app = workflow.compile( checkpointer=checkpointer, # Enable interrupts for human-in-the-loop interrupt_before=["human_approval"], interrupt_after=["critical_action"] ) return app
2. Monitoring and Logging
import logging from datetime import datetime class LoggingMiddleware: def __init__(self, logger_name="langgraph"): self.logger = logging.getLogger(logger_name) def __call__(self, func): def wrapper(state): start_time = datetime.now() self.logger.info(f"Starting {func.__name__} at {start_time}") try: result = func(state) duration = datetime.now() - start_time self.logger.info( f"Completed {func.__name__} in {duration.total_seconds():.2f}s" ) return result except Exception as e: self.logger.error(f"Error in {func.__name__}: {str(e)}") raise return wrapper # Apply to nodes @LoggingMiddleware() def production_node(state: AgentState) -> AgentState: # Your node logic here pass
Troubleshooting
Common Issues and Solutions
-
State Size Too Large
- Problem: Checkpoint files become too large
- Solution: Store large data externally, use references
-
Memory Leaks
- Problem: Memory usage increases over time
- Solution: Clean up unused state, use proper object disposal
-
Concurrency Issues
- Problem: Race conditions in multi-threaded execution
- Solution: Use proper locking mechanisms, avoid shared mutable state
-
Tool Execution Failures
- Problem: Tools fail or timeout
- Solution: Implement proper error handling and retry logic
Requirements
Ensure these packages are installed in your environment:
pip install langgraph>=0.2.0 pip install langchain-core>=0.3.0 pip install langchain-openai>=0.1.0 pip install langchain-anthropic>=0.1.0 pip install psycopg2-binary # For PostgreSQL persistence pip install sqlalchemy # Alternative persistence options
Source Code Access
The LangGraph source code is managed as a git submodule with sparse-checkout to reduce size (66MB vs full repo):
# Update to latest version cd source/langgraph git pull origin main # View sparse-checkout configuration git sparse-checkout list # Temporarily access full repo (if needed) git sparse-checkout disable # ... do work ... git sparse-checkout reapply
Key locations:
- Core API (StateGraph, nodes, edges)source/langgraph/libs/langgraph/langgraph/
- Prebuilt components (create_react_agent)source/langgraph/libs/prebuilt/langgraph/
- Official examples and tutorialssource/langgraph/examples/
- Documentation (concepts, how-tos, reference)source/langgraph/docs/docs/
See SOURCE_STRUCTURE.md for detailed navigation guide.
Performance Tips
- Use streaming for long-running operations
- Optimize state size - avoid storing large objects
- Cache effectively - implement proper caching strategies
- Monitor checkpoints - keep checkpoint sizes reasonable
- Use async/await for I/O-bound operations
- Batch operations when possible to reduce overhead