Claude-code-plugins perplexity-webhooks-events

install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events" ~/.claude/skills/jeremylongshore-claude-code-plugins-perplexity-webhooks-events && rm -rf "$T"
manifest: plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events/SKILL.md
source content

Perplexity Events & Async Patterns

Overview

Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.

Event Patterns

PatternTriggerUse Case
Streaming SSEClient requestReal-time search with progressive rendering
Batch queueJob submissionResearch automation, report generation
Scheduled searchCron jobNews monitoring, trend alerts, competitive intel
Citation pipelinePost-processingSource verification, link validation

Prerequisites

  • openai
    package installed
  • PERPLEXITY_API_KEY
    set
  • Queue system (BullMQ, SQS) for batch patterns
  • Cron scheduler for monitoring patterns

Instructions

Step 1: Streaming Search (Server-Sent Events)

import OpenAI from "openai";
import express from "express";

const perplexity = new OpenAI({
  apiKey: process.env.PERPLEXITY_API_KEY!,
  baseURL: "https://api.perplexity.ai",
});

const app = express();
app.use(express.json());

app.post("/api/search/stream", async (req, res) => {
  const { query, model = "sonar" } = req.body;

  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  try {
    const stream = await perplexity.chat.completions.create({
      model,
      messages: [{ role: "user", content: query }],
      stream: true,
      max_tokens: 2048,
    });

    let fullText = "";
    for await (const chunk of stream) {
      const text = chunk.choices[0]?.delta?.content || "";
      fullText += text;

      res.write(`data: ${JSON.stringify({ type: "text", content: text })}\n\n`);

      // Citations arrive in the final chunk
      const citations = (chunk as any).citations;
      if (citations) {
        res.write(`data: ${JSON.stringify({ type: "citations", urls: citations })}\n\n`);
      }
    }

    res.write(`data: ${JSON.stringify({ type: "done", totalLength: fullText.length })}\n\n`);
  } catch (err: any) {
    res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`);
  }

  res.end();
});

Step 2: Batch Research Pipeline

import { Queue, Worker } from "bullmq";

const searchQueue = new Queue("perplexity-research", {
  connection: { host: "localhost", port: 6379 },
});

async function submitResearchBatch(
  queries: string[],
  callbackUrl: string,
  model: string = "sonar-pro"
) {
  const batchId = crypto.randomUUID();

  for (const query of queries) {
    await searchQueue.add("search", { batchId, query, callbackUrl, model }, {
      attempts: 3,
      backoff: { type: "exponential", delay: 2000 },
    });
  }

  return { batchId, totalQueries: queries.length };
}

const worker = new Worker("perplexity-research", async (job) => {
  const { query, callbackUrl, batchId, model } = job.data;

  const response = await perplexity.chat.completions.create({
    model,
    messages: [{ role: "user", content: query }],
    max_tokens: 2048,
  });

  const result = {
    event: "perplexity.search.completed",
    batchId,
    query,
    answer: response.choices[0].message.content,
    citations: (response as any).citations || [],
    model: response.model,
    tokens: response.usage?.total_tokens,
  };

  // Deliver result via callback
  await fetch(callbackUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(result),
  });
}, {
  connection: { host: "localhost", port: 6379 },
  concurrency: 3,  // Stay within rate limits
  limiter: { max: 40, duration: 60000 },  // 40 RPM safety margin
});

Step 3: Scheduled News Monitor

// Run via cron: every 6 hours
async function monitorTopics(
  topics: string[],
  webhookUrl: string
) {
  for (const topic of topics) {
    const response = await perplexity.chat.completions.create({
      model: "sonar",
      messages: [{
        role: "system",
        content: "Summarize the latest developments. Be concise. Include only new information.",
      }, {
        role: "user",
        content: `Latest developments about "${topic}" in the past 24 hours`,
      }],
      search_recency_filter: "day",
      max_tokens: 500,
    } as any);

    const answer = response.choices[0].message.content || "";
    const citations = (response as any).citations || [];

    // Only notify if there are actual developments
    if (citations.length > 0 && answer.length > 100) {
      await fetch(webhookUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          event: "perplexity.monitor.update",
          topic,
          summary: answer,
          citations,
          timestamp: new Date().toISOString(),
        }),
      });
    }

    // Rate limit protection
    await new Promise((r) => setTimeout(r, 2000));
  }
}

Step 4: Client-Side SSE Consumer

// Browser client consuming the streaming endpoint
function consumeSearchStream(
  query: string,
  onText: (text: string) => void,
  onCitations: (urls: string[]) => void,
  onDone: () => void
) {
  fetch("/api/search/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ query }),
  }).then(async (response) => {
    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const lines = decoder.decode(value).split("\n");
      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const event = JSON.parse(line.slice(6));

        if (event.type === "text") onText(event.content);
        if (event.type === "citations") onCitations(event.urls);
        if (event.type === "done") onDone();
      }
    }
  });
}

Error Handling

IssueCauseSolution
Stream stallsComplex search taking too longSet per-chunk timeout (10s)
429 in batchToo many concurrent workersReduce concurrency, add rate limiter
Empty monitor alertsTopic too nicheBroaden topic or reduce recency filter
Callback failsWebhook URL downRetry with exponential backoff

Output

  • Streaming SSE endpoint for real-time search
  • Batch research pipeline with queue-based processing
  • Scheduled news monitoring with alerting
  • Client-side stream consumer

Resources

Next Steps

For deployment setup, see

perplexity-deploy-integration
.