Claude-skill-registry langfuse-webhooks-events
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/langfuse-webhooks-events" ~/.claude/skills/majiayu000-claude-skill-registry-langfuse-webhooks-events && rm -rf "$T"
manifest:
skills/data/langfuse-webhooks-events/SKILL.mdsource content
Langfuse Webhooks & Events
Overview
Configure webhooks and event callbacks to receive real-time notifications from Langfuse.
Prerequisites
- Langfuse account with webhook access
- HTTPS endpoint to receive webhooks
- Understanding of event-driven architecture
Instructions
Step 1: Create Webhook Endpoint
// api/webhooks/langfuse/route.ts (Next.js App Router) import { NextRequest, NextResponse } from "next/server"; import crypto from "crypto"; const WEBHOOK_SECRET = process.env.LANGFUSE_WEBHOOK_SECRET!; interface LangfuseWebhookPayload { event: string; timestamp: string; data: { traceId?: string; observationId?: string; scoreId?: string; projectId: string; [key: string]: any; }; } function verifySignature(payload: string, signature: string): boolean { const expectedSignature = crypto .createHmac("sha256", WEBHOOK_SECRET) .update(payload) .digest("hex"); return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expectedSignature) ); } export async function POST(request: NextRequest) { const payload = await request.text(); const signature = request.headers.get("x-langfuse-signature"); // Verify webhook signature if (!signature || !verifySignature(payload, signature)) { console.error("Invalid webhook signature"); return NextResponse.json({ error: "Invalid signature" }, { status: 401 }); } const event: LangfuseWebhookPayload = JSON.parse(payload); console.log(`Received Langfuse event: ${event.event}`); // Handle different event types switch (event.event) { case "trace.created": await handleTraceCreated(event.data); break; case "trace.updated": await handleTraceUpdated(event.data); break; case "score.created": await handleScoreCreated(event.data); break; case "generation.created": await handleGenerationCreated(event.data); break; default: console.log(`Unhandled event type: ${event.event}`); } return NextResponse.json({ received: true }); } async function handleTraceCreated(data: any) { console.log(`New trace created: ${data.traceId}`); // Trigger downstream actions } async function handleTraceUpdated(data: any) { // Check for errors if (data.level === "ERROR") { await sendAlertNotification({ title: "Langfuse Error Trace", traceId: data.traceId, message: data.statusMessage, }); } } async function handleScoreCreated(data: any) { // Alert on low scores if (data.value < 0.5) { await sendAlertNotification({ title: "Low Score Alert", traceId: data.traceId, score: data.name, value: data.value, }); } } async function handleGenerationCreated(data: any) { // Track token usage if (data.usage) { await trackTokenUsage({ model: data.model, promptTokens: data.usage.promptTokens, completionTokens: data.usage.completionTokens, }); } }
Step 2: Configure Webhook in Langfuse Dashboard
1. Go to Langfuse Dashboard -> Settings -> Webhooks 2. Click "Add Webhook" 3. Configure: - URL: https://your-domain.com/api/webhooks/langfuse - Events: Select events to subscribe to - Secret: Generate and save webhook secret 4. Test webhook with "Send Test Event"
Step 3: Implement Event Processing Queue
// lib/webhook-queue.ts import { Queue, Worker } from "bullmq"; import Redis from "ioredis"; const connection = new Redis(process.env.REDIS_URL!); // Queue for processing webhook events export const langfuseQueue = new Queue("langfuse-events", { connection }); // Webhook handler adds to queue export async function queueWebhookEvent(event: LangfuseWebhookPayload) { await langfuseQueue.add(event.event, event, { removeOnComplete: 1000, removeOnFail: 5000, attempts: 3, backoff: { type: "exponential", delay: 1000, }, }); } // Worker processes events const worker = new Worker( "langfuse-events", async (job) => { const event = job.data as LangfuseWebhookPayload; switch (job.name) { case "trace.created": await processTraceCreated(event); break; case "score.created": await processScoreCreated(event); break; // ... other handlers } }, { connection } ); worker.on("failed", (job, error) => { console.error(`Job ${job?.id} failed:`, error); });
Step 4: Real-Time Event Streaming (Alternative)
// For real-time updates without webhooks // Use Langfuse API polling with caching import { Langfuse } from "langfuse"; class LangfuseEventStream { private langfuse: Langfuse; private lastChecked: Date; private pollInterval: number; constructor(pollIntervalMs: number = 5000) { this.langfuse = new Langfuse(); this.lastChecked = new Date(); this.pollInterval = pollIntervalMs; } async start(handlers: { onTrace?: (trace: any) => void; onScore?: (score: any) => void; }) { setInterval(async () => { await this.pollForUpdates(handlers); }, this.pollInterval); } private async pollForUpdates(handlers: { onTrace?: (trace: any) => void; onScore?: (score: any) => void; }) { try { const traces = await this.langfuse.fetchTraces({ fromTimestamp: this.lastChecked, }); for (const trace of traces.data) { handlers.onTrace?.(trace); } this.lastChecked = new Date(); } catch (error) { console.error("Failed to poll Langfuse:", error); } } } // Usage const stream = new LangfuseEventStream(10000); // 10 second poll stream.start({ onTrace: (trace) => { if (trace.level === "ERROR") { sendSlackAlert(`Error in trace ${trace.id}`); } }, });
Step 5: Integration with External Services
// Slack notification on errors async function sendSlackNotification(event: LangfuseWebhookPayload) { if (event.data.level !== "ERROR") return; await fetch(process.env.SLACK_WEBHOOK_URL!, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ blocks: [ { type: "header", text: { type: "plain_text", text: "Langfuse Error Alert", }, }, { type: "section", fields: [ { type: "mrkdwn", text: `*Trace ID:*\n${event.data.traceId}`, }, { type: "mrkdwn", text: `*Error:*\n${event.data.statusMessage}`, }, ], }, { type: "actions", elements: [ { type: "button", text: { type: "plain_text", text: "View Trace" }, url: `https://cloud.langfuse.com/trace/${event.data.traceId}`, }, ], }, ], }), }); } // PagerDuty for critical alerts async function sendPagerDutyAlert(event: LangfuseWebhookPayload) { await fetch("https://events.pagerduty.com/v2/enqueue", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ routing_key: process.env.PAGERDUTY_ROUTING_KEY, event_action: "trigger", payload: { summary: `Langfuse: ${event.event}`, severity: "critical", source: "langfuse", custom_details: event.data, }, }), }); }
Output
- Secure webhook endpoint with signature verification
- Event processing queue for reliability
- Real-time polling alternative
- External service integrations (Slack, PagerDuty)
Webhook Event Types
| Event | Description | Use Case |
|---|---|---|
| New trace started | Real-time monitoring |
| Trace modified | Error detection |
| LLM call logged | Token tracking |
| Score added | Quality alerts |
| Score modified | Evaluation updates |
Error Handling
| Issue | Cause | Solution |
|---|---|---|
| Invalid signature | Wrong secret | Verify webhook secret |
| Missed events | Handler failure | Use queue with retries |
| Duplicate events | No idempotency | Track processed event IDs |
| Timeout | Slow handler | Queue events, respond fast |
Examples
Idempotent Event Processing
const processedEvents = new Set<string>(); async function handleWebhook(event: LangfuseWebhookPayload) { const eventId = `${event.event}-${event.timestamp}-${event.data.traceId}`; if (processedEvents.has(eventId)) { console.log(`Skipping duplicate event: ${eventId}`); return; } processedEvents.add(eventId); // Process event... // Clean up old events periodically if (processedEvents.size > 10000) { processedEvents.clear(); } }
Resources
Next Steps
For performance optimization, see
langfuse-performance-tuning.