Claude-code-plugins-plus-skills anth-webhooks-events
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/anthropic-pack/skills/anth-webhooks-events" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-anth-webhooks-events && rm -rf "$T"
manifest:
plugins/saas-packs/anthropic-pack/skills/anth-webhooks-events/SKILL.mdsource content
Anthropic Events & Async Processing
Overview
The Claude API does not use traditional webhooks. Instead it provides two event-driven patterns: Server-Sent Events (SSE) for real-time streaming and the Message Batches API for async bulk processing. This skill covers both.
SSE Streaming Events
import anthropic client = anthropic.Anthropic() # Process each SSE event type with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Explain microservices."}] ) as stream: for event in stream: match event.type: case "message_start": print(f"Started: {event.message.id}") case "content_block_start": if event.content_block.type == "tool_use": print(f"Tool call: {event.content_block.name}") case "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="", flush=True) elif event.delta.type == "input_json_delta": print(event.delta.partial_json, end="") case "message_delta": print(f"\nStop: {event.delta.stop_reason}") print(f"Output tokens: {event.usage.output_tokens}") case "message_stop": print("[Complete]")
SSE Event Reference
| Event | When | Key Data |
|---|---|---|
| Stream begins | , , |
| New block begins | (text or tool_use), |
| Incremental content | or |
| Block finishes | |
| Message-level update | , |
| Stream complete | (empty) |
| Keepalive | (empty) |
Async Batch Processing
# Submit batch (up to 100K requests, 50% cheaper) batch = client.messages.batches.create( requests=[ { "custom_id": f"doc-{i}", "params": { "model": "claude-sonnet-4-20250514", "max_tokens": 1024, "messages": [{"role": "user", "content": f"Summarize: {doc}"}] } } for i, doc in enumerate(documents) ] ) # Poll for completion import time while True: status = client.messages.batches.retrieve(batch.id) if status.processing_status == "ended": break counts = status.request_counts print(f"Processing: {counts.processing} | Done: {counts.succeeded} | Errors: {counts.errored}") time.sleep(30) # Stream results for result in client.messages.batches.results(batch.id): if result.result.type == "succeeded": print(f"[{result.custom_id}]: {result.result.message.content[0].text[:100]}") else: print(f"[{result.custom_id}] ERROR: {result.result.error}")
Event-Driven Architecture Pattern
# Use queues to decouple Claude requests from user-facing endpoints from redis import Redis from rq import Queue redis = Redis() queue = Queue(connection=redis) def process_with_claude(prompt: str, callback_url: str): """Background job for async Claude processing.""" client = anthropic.Anthropic() msg = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) # Notify your system via internal callback import requests requests.post(callback_url, json={ "text": msg.content[0].text, "usage": {"input": msg.usage.input_tokens, "output": msg.usage.output_tokens} }) # Enqueue from your API handler job = queue.enqueue(process_with_claude, prompt="...", callback_url="https://internal/callback")
Error Handling
| Issue | Cause | Fix |
|---|---|---|
| Stream disconnects | Network timeout | Reconnect and re-request (responses are not resumable) |
Batch | Not processed in 24h | Resubmit the batch |
results | Individual request was invalid | Check per request |
Resources
Next Steps
For performance optimization, see
anth-performance-tuning.