Claude-code-plugins perplexity-webhooks-events
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events" ~/.claude/skills/jeremylongshore-claude-code-plugins-perplexity-webhooks-events && rm -rf "$T"
manifest:
plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events/SKILL.mdsource content
Perplexity Events & Async Patterns
Overview
Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.
Event Patterns
| Pattern | Trigger | Use Case |
|---|---|---|
| Streaming SSE | Client request | Real-time search with progressive rendering |
| Batch queue | Job submission | Research automation, report generation |
| Scheduled search | Cron job | News monitoring, trend alerts, competitive intel |
| Citation pipeline | Post-processing | Source verification, link validation |
Prerequisites
package installedopenai
setPERPLEXITY_API_KEY- Queue system (BullMQ, SQS) for batch patterns
- Cron scheduler for monitoring patterns
Instructions
Step 1: Streaming Search (Server-Sent Events)
import OpenAI from "openai"; import express from "express"; const perplexity = new OpenAI({ apiKey: process.env.PERPLEXITY_API_KEY!, baseURL: "https://api.perplexity.ai", }); const app = express(); app.use(express.json()); app.post("/api/search/stream", async (req, res) => { const { query, model = "sonar" } = req.body; res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }); try { const stream = await perplexity.chat.completions.create({ model, messages: [{ role: "user", content: query }], stream: true, max_tokens: 2048, }); let fullText = ""; for await (const chunk of stream) { const text = chunk.choices[0]?.delta?.content || ""; fullText += text; res.write(`data: ${JSON.stringify({ type: "text", content: text })}\n\n`); // Citations arrive in the final chunk const citations = (chunk as any).citations; if (citations) { res.write(`data: ${JSON.stringify({ type: "citations", urls: citations })}\n\n`); } } res.write(`data: ${JSON.stringify({ type: "done", totalLength: fullText.length })}\n\n`); } catch (err: any) { res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`); } res.end(); });
Step 2: Batch Research Pipeline
import { Queue, Worker } from "bullmq"; const searchQueue = new Queue("perplexity-research", { connection: { host: "localhost", port: 6379 }, }); async function submitResearchBatch( queries: string[], callbackUrl: string, model: string = "sonar-pro" ) { const batchId = crypto.randomUUID(); for (const query of queries) { await searchQueue.add("search", { batchId, query, callbackUrl, model }, { attempts: 3, backoff: { type: "exponential", delay: 2000 }, }); } return { batchId, totalQueries: queries.length }; } const worker = new Worker("perplexity-research", async (job) => { const { query, callbackUrl, batchId, model } = job.data; const response = await perplexity.chat.completions.create({ model, messages: [{ role: "user", content: query }], max_tokens: 2048, }); const result = { event: "perplexity.search.completed", batchId, query, answer: response.choices[0].message.content, citations: (response as any).citations || [], model: response.model, tokens: response.usage?.total_tokens, }; // Deliver result via callback await fetch(callbackUrl, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(result), }); }, { connection: { host: "localhost", port: 6379 }, concurrency: 3, // Stay within rate limits limiter: { max: 40, duration: 60000 }, // 40 RPM safety margin });
Step 3: Scheduled News Monitor
// Run via cron: every 6 hours async function monitorTopics( topics: string[], webhookUrl: string ) { for (const topic of topics) { const response = await perplexity.chat.completions.create({ model: "sonar", messages: [{ role: "system", content: "Summarize the latest developments. Be concise. Include only new information.", }, { role: "user", content: `Latest developments about "${topic}" in the past 24 hours`, }], search_recency_filter: "day", max_tokens: 500, } as any); const answer = response.choices[0].message.content || ""; const citations = (response as any).citations || []; // Only notify if there are actual developments if (citations.length > 0 && answer.length > 100) { await fetch(webhookUrl, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ event: "perplexity.monitor.update", topic, summary: answer, citations, timestamp: new Date().toISOString(), }), }); } // Rate limit protection await new Promise((r) => setTimeout(r, 2000)); } }
Step 4: Client-Side SSE Consumer
// Browser client consuming the streaming endpoint function consumeSearchStream( query: string, onText: (text: string) => void, onCitations: (urls: string[]) => void, onDone: () => void ) { fetch("/api/search/stream", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ query }), }).then(async (response) => { const reader = response.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const lines = decoder.decode(value).split("\n"); for (const line of lines) { if (!line.startsWith("data: ")) continue; const event = JSON.parse(line.slice(6)); if (event.type === "text") onText(event.content); if (event.type === "citations") onCitations(event.urls); if (event.type === "done") onDone(); } } }); }
Error Handling
| Issue | Cause | Solution |
|---|---|---|
| Stream stalls | Complex search taking too long | Set per-chunk timeout (10s) |
| 429 in batch | Too many concurrent workers | Reduce concurrency, add rate limiter |
| Empty monitor alerts | Topic too niche | Broaden topic or reduce recency filter |
| Callback fails | Webhook URL down | Retry with exponential backoff |
Output
- Streaming SSE endpoint for real-time search
- Batch research pipeline with queue-based processing
- Scheduled news monitoring with alerting
- Client-side stream consumer
Resources
Next Steps
For deployment setup, see
perplexity-deploy-integration.