Claude-code-plugins-plus-skills langchain-webhooks-events
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/langchain-pack/skills/langchain-webhooks-events" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-langchain-webhooks-events && rm -rf "$T"
manifest:
plugins/saas-packs/langchain-pack/skills/langchain-webhooks-events/SKILL.mdsource content
LangChain Webhooks & Events
Overview
Event-driven patterns for LangChain: custom callback handlers for lifecycle hooks, webhook dispatching, Server-Sent Events (SSE) for streaming, WebSocket integration, and event aggregation for tracing.
Callback Handler Architecture
LangChain emits events at every stage of chain/agent execution. Custom handlers can observe, log, stream, or dispatch these events.
chain.invoke() ├── handleChainStart() │ ├── handleLLMStart() │ │ ├── handleLLMNewToken() // streaming │ │ └── handleLLMEnd() │ ├── handleToolStart() │ │ └── handleToolEnd() │ └── handleRetrieverStart() │ └── handleRetrieverEnd() └── handleChainEnd()
Custom Callback Handler
import { BaseCallbackHandler } from "@langchain/core/callbacks/base"; class WebhookHandler extends BaseCallbackHandler { name = "WebhookHandler"; constructor(private webhookUrl: string) { super(); } async handleLLMStart(llm: any, prompts: string[]) { await this.send("llm_start", { model: llm?.id?.[2], promptCount: prompts.length, }); } async handleLLMEnd(output: any) { await this.send("llm_end", { tokenUsage: output.llmOutput?.tokenUsage, }); } async handleLLMError(error: Error) { await this.send("llm_error", { error: error.message, }); } async handleToolStart(_tool: any, input: string) { await this.send("tool_start", { input: input.slice(0, 200) }); } async handleToolEnd(output: string) { await this.send("tool_end", { output: output.slice(0, 200) }); } private async send(event: string, data: Record<string, any>) { try { await fetch(this.webhookUrl, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ event, data, timestamp: new Date().toISOString(), }), signal: AbortSignal.timeout(5000), }); } catch (e) { // Don't let webhook failures break the chain console.warn(`Webhook error: ${e}`); } } } // Attach to model const model = new ChatOpenAI({ model: "gpt-4o-mini", callbacks: [new WebhookHandler("https://api.example.com/webhook")], });
Server-Sent Events (SSE) Endpoint
import express from "express"; import { ChatOpenAI } from "@langchain/openai"; import { ChatPromptTemplate } from "@langchain/core/prompts"; import { StringOutputParser } from "@langchain/core/output_parsers"; const app = express(); app.use(express.json()); const chain = ChatPromptTemplate.fromTemplate("{input}") .pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true })) .pipe(new StringOutputParser()); app.post("/api/chat/stream", async (req, res) => { res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); try { const stream = await chain.stream({ input: req.body.input }); for await (const chunk of stream) { if (res.destroyed) break; // client disconnected res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`); } res.write(`data: ${JSON.stringify({ done: true })}\n\n`); } catch (error: any) { res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`); } res.end(); });
Client-Side SSE Consumer
// Browser JavaScript async function streamChat(input: string) { const response = await fetch("/api/chat/stream", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ input }), }); const reader = response.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split("\n").filter((l) => l.startsWith("data: ")); for (const line of lines) { const data = JSON.parse(line.slice(6)); if (data.done) return; if (data.text) document.getElementById("output")!.textContent += data.text; } } }
WebSocket Streaming
import { WebSocketServer } from "ws"; import { ChatOpenAI } from "@langchain/openai"; import { ChatPromptTemplate } from "@langchain/core/prompts"; import { StringOutputParser } from "@langchain/core/output_parsers"; const wss = new WebSocketServer({ port: 8080 }); const chain = ChatPromptTemplate.fromTemplate("{input}") .pipe(new ChatOpenAI({ model: "gpt-4o-mini", streaming: true })) .pipe(new StringOutputParser()); wss.on("connection", (ws) => { ws.on("message", async (raw) => { const { input } = JSON.parse(raw.toString()); try { const stream = await chain.stream({ input }); for await (const chunk of stream) { if (ws.readyState !== ws.OPEN) break; ws.send(JSON.stringify({ type: "token", text: chunk })); } ws.send(JSON.stringify({ type: "done" })); } catch (error: any) { ws.send(JSON.stringify({ type: "error", message: error.message })); } }); });
Event Aggregation (Trace Collection)
interface TraceEvent { event: string; timestamp: number; data: Record<string, any>; runId: string; } class TraceAggregator extends BaseCallbackHandler { name = "TraceAggregator"; events: TraceEvent[] = []; handleChainStart(_chain: any, inputs: any, runId: string) { this.log("chain_start", runId, { inputKeys: Object.keys(inputs) }); } handleChainEnd(outputs: any, runId: string) { this.log("chain_end", runId, { outputKeys: Object.keys(outputs) }); } handleLLMStart(llm: any, _prompts: string[], runId: string) { this.log("llm_start", runId, { model: llm?.id?.[2] }); } handleLLMEnd(output: any, runId: string) { this.log("llm_end", runId, { tokens: output.llmOutput?.tokenUsage?.totalTokens, }); } private log(event: string, runId: string, data: Record<string, any>) { this.events.push({ event, timestamp: Date.now(), data, runId }); } getTrace() { return { events: this.events, totalEvents: this.events.length, durationMs: this.events.length > 1 ? this.events[this.events.length - 1].timestamp - this.events[0].timestamp : 0, }; } } // Usage const tracer = new TraceAggregator(); await chain.invoke({ input: "test" }, { callbacks: [tracer] }); console.log(tracer.getTrace());
Error Handling
| Error | Cause | Fix |
|---|---|---|
| Webhook timeout | Slow endpoint | Use , make async |
| WebSocket disconnect | Client closed | Check before sending |
| SSE connection reset | Proxy timeout | Add keep-alive pings every 15s |
| Events not captured | Callback not passed | Add to array in |
Resources
Next Steps
Use
langchain-observability for comprehensive production monitoring.