Claude-skill-registry interactor-webhooks
Receive real-time updates from Interactor via webhooks (push) or Server-Sent Events (pull). Use when building real-time UIs, monitoring credential changes, tracking workflow progress, or streaming AI chat responses.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/interactor-webhooks" ~/.claude/skills/majiayu000-claude-skill-registry-interactor-webhooks && rm -rf "$T"
skills/data/interactor-webhooks/SKILL.mdInteractor Webhooks and Streaming Skill
Receive real-time updates from Interactor via webhooks (push to your server) or Server-Sent Events (pull from browser/client).
When to Use
- Credential Monitoring: React to credential status changes (expired, revoked)
- Workflow Notifications: Get notified when workflows complete or need input
- Real-time Chat: Stream AI assistant responses to users
- Live Dashboards: Display real-time workflow progress
- Event-Driven Architecture: Trigger actions based on Interactor events
Prerequisites
- Interactor authentication configured (see
skill)interactor-auth - HTTPS endpoint for webhooks (required for production)
- Understanding of webhook security (signature verification)
Webhooks vs SSE: When to Use Each
| Use Case | Recommended | Reason |
|---|---|---|
| Backend notifications | Webhooks | Server-to-server, reliable delivery |
| Credential status changes | Webhooks | Background processing, no UI needed |
| Workflow completion | Webhooks | Trigger backend actions |
| Real-time chat UI | SSE | Low latency, browser-native |
| Live workflow progress | SSE | Visual feedback for users |
| Streaming AI responses | SSE | Token-by-token display |
General Rule: Use webhooks for backend-to-backend, SSE for frontend real-time updates.
Webhooks
Webhooks push events to your server when things happen in Interactor.
Available Event Types
curl https://core.interactor.com/api/v1/webhooks/event-types \ -H "Authorization: Bearer <token>"
Response:
{ "data": { "event_types": [ "credential.created", "credential.refreshed", "credential.expired", "credential.revoked", "workflow.instance.created", "workflow.instance.completed", "workflow.instance.failed", "workflow.instance.halted", "agent.room.message", "agent.room.closed" ] } }
Note: Additional events like
and tool invocation events are available via SSE streams only. See the SSE section for details.workflow.instance.resumed
Event Categories
| Category | Webhook Events | Description |
|---|---|---|
| Credentials | , , , | OAuth token lifecycle |
| Workflows | , , , | Workflow execution status |
| Agents | , | AI chat events |
SSE-Only Events:
,workflow.instance.resumed,tool_useare available via Server-Sent Events streams only.tool_result
Schema Versioning Policy
Interactor follows these principles for webhook payload changes:
| Change Type | Versioning | Example |
|---|---|---|
| New optional fields | Non-breaking, no version bump | Adding field to events |
| New event types | Non-breaking, subscribe to receive | |
| Field type changes | Breaking, announced 90 days ahead | from string to number |
| Field removal | Breaking, announced 90 days ahead | Removing deprecated fields |
| Payload restructure | New API version (v2) | Complete payload format change |
Best practices for forward compatibility:
- Ignore unknown fields (don't fail on extra properties)
- Use optional types for new fields:
metadata?: Record<string, unknown> - Subscribe to Interactor changelog for breaking change announcements
- Test against the
endpoint after updates/webhooks/:id/test
Complete Event Mapping Table
| Event | Trigger | Delivery | Typical Handler Action |
|---|---|---|---|
| User completes OAuth flow | Webhook | Log for audit, update UI state |
| Token auto-refreshed | Webhook | Log for audit (usually no action needed) |
| Refresh token failed | Webhook | Notify user to reconnect, disable features |
| User revoked via provider | Webhook | Notify user to reconnect, disable features |
| Workflow started | Webhook | Track in analytics, show in dashboard |
| Workflow needs user input | Webhook | Notify user, show input form |
| Workflow finished successfully | Webhook | Process results, update records |
| Workflow error | Webhook | Alert ops, log error details |
| AI sent complete message | Webhook | Forward to push notification or websocket |
| Chat session ended | Webhook | Log analytics, cleanup resources |
| Workflow state transition | SSE | Update progress UI |
| Workflow data modified | SSE | Refresh displayed data |
| Workflow needs input | SSE | Show input form |
| User provided input | SSE | Update UI, show processing |
| Workflow finished | SSE | Show completion, redirect |
| Complete message received | SSE | Display in chat |
| AI started responding | SSE | Show typing indicator |
| Token received | SSE | Append to streaming message |
| AI finished message | SSE | Finalize message, enable input |
| AI invoked a tool | SSE | Show tool activity indicator |
| Tool returned result | SSE | Display tool result (optional) |
| Connection keepalive | SSE | Reset connection health timer |
Permissions & RBAC
Webhook management requires specific permissions in Interactor:
| Action | Required Permission | Who Has It |
|---|---|---|
| List webhooks | | Admin, Developer |
| Create webhook | | Admin, Developer |
| Update webhook | | Admin, Developer |
| Delete webhook | | Admin only |
| Regenerate secret | | Admin, Developer |
| View delivery history | | Admin, Developer |
API Token Scopes:
When creating API tokens for webhook management, request these scopes:
- Full webhook management (read + write + delete)webhooks
- Read-only access to webhook configurationwebhooks:read
- Create and update (no delete)webhooks:write
# Token with full webhook access curl -X POST https://core.interactor.com/api/v1/tokens \ -H "Authorization: Bearer <admin_token>" \ -d '{"name": "Webhook Manager", "scopes": ["webhooks"]}'
Security Note: Webhook secrets are only shown once at creation and regeneration. Store them securely in environment variables or a secrets manager.
Instructions
Step 1: Create a Webhook
curl -X POST https://core.interactor.com/api/v1/webhooks \ -H "Authorization: Bearer <token>" \ -H "Content-Type: application/json" \ -d '{ "url": "https://yourapp.com/webhooks/interactor", "events": [ "credential.expired", "credential.revoked", "workflow.instance.completed", "workflow.instance.halted" ], "enabled": true }'
Response:
{ "data": { "id": "wh_abc", "url": "https://yourapp.com/webhooks/interactor", "secret": "whsec_xyz_SAVE_THIS", "events": [ "credential.expired", "credential.revoked", "workflow.instance.completed", "workflow.instance.halted" ], "enabled": true, "created_at": "2026-01-20T12:00:00Z" } }
CRITICAL: Save the
- you'll need it to verify webhook signatures. It's only shown once!secret
Step 2: List Webhooks
curl https://core.interactor.com/api/v1/webhooks \ -H "Authorization: Bearer <token>"
Response:
{ "data": { "webhooks": [ { "id": "wh_abc", "url": "https://yourapp.com/webhooks/interactor", "events": ["credential.expired", "workflow.instance.completed"], "enabled": true, "created_at": "2026-01-20T12:00:00Z", "last_delivery_at": "2026-01-20T12:30:00Z", "last_delivery_status": "delivered" } ] } }
Step 3: Get Webhook Details
curl https://core.interactor.com/api/v1/webhooks/wh_abc \ -H "Authorization: Bearer <token>"
Step 4: Update Webhook
curl -X PUT https://core.interactor.com/api/v1/webhooks/wh_abc \ -H "Authorization: Bearer <token>" \ -H "Content-Type: application/json" \ -d '{ "events": ["credential.created", "credential.expired"], "url": "https://yourapp.com/webhooks/v2/interactor" }'
Step 5: Toggle Webhook (Enable/Disable)
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/toggle \ -H "Authorization: Bearer <token>"
Step 6: Delete Webhook
curl -X DELETE https://core.interactor.com/api/v1/webhooks/wh_abc \ -H "Authorization: Bearer <token>"
Step 7: Regenerate Secret
If your secret is compromised:
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/regenerate-secret \ -H "Authorization: Bearer <token>"
Response:
{ "data": { "id": "wh_abc", "secret": "whsec_NEW_SECRET_SAVE_THIS", "regenerated_at": "2026-01-20T12:00:00Z" } }
CRITICAL: The new secret is only shown once. Update your webhook handler immediately with the new secret.
Step 8: View Recent Events
See delivery history and debug issues:
curl https://core.interactor.com/api/v1/webhooks/wh_abc/events \ -H "Authorization: Bearer <token>"
Response:
{ "data": { "events": [ { "id": "evt_123", "type": "workflow.instance.completed", "delivered_at": "2026-01-20T12:00:00Z", "status": "delivered", "response_code": 200, "response_time_ms": 145 }, { "id": "evt_122", "type": "credential.expired", "delivered_at": "2026-01-20T11:55:00Z", "status": "failed", "response_code": 500, "retry_count": 2 } ] } }
Step 9: Test Webhook
Send a test event to verify your endpoint:
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/test \ -H "Authorization: Bearer <token>"
Webhook Payload Format
All webhook events follow this structure:
{ "id": "evt_abc123", "type": "workflow.instance.completed", "timestamp": "2026-01-20T12:00:00Z", "data": { "instance_id": "inst_xyz", "workflow_name": "approval_workflow", "namespace": "user_123", "status": "completed", "output": { "approved": true, "amount": 5000 } } }
Event-Specific Payloads
credential.created:
{ "id": "evt_001", "type": "credential.created", "timestamp": "2026-01-20T12:00:00Z", "data": { "credential_id": "cred_abc", "service_id": "google_calendar", "service_name": "Google Calendar", "namespace": "user_123", "scopes": ["calendar.readonly", "calendar.events"] } }
credential.refreshed:
{ "id": "evt_002", "type": "credential.refreshed", "timestamp": "2026-01-20T12:00:00Z", "data": { "credential_id": "cred_abc", "service_id": "google_calendar", "service_name": "Google Calendar", "namespace": "user_123", "expires_at": "2026-01-20T13:00:00Z" } }
credential.expired:
{ "id": "evt_003", "type": "credential.expired", "timestamp": "2026-01-20T12:00:00Z", "data": { "credential_id": "cred_abc", "service_id": "google_calendar", "service_name": "Google Calendar", "namespace": "user_123", "reason": "refresh_token_invalid" } }
credential.revoked:
{ "id": "evt_004", "type": "credential.revoked", "timestamp": "2026-01-20T12:00:00Z", "data": { "credential_id": "cred_abc", "service_id": "google_calendar", "service_name": "Google Calendar", "namespace": "user_123", "reason": "user_revoked_access" } }
workflow.instance.created:
{ "id": "evt_005", "type": "workflow.instance.created", "timestamp": "2026-01-20T12:00:00Z", "data": { "instance_id": "inst_xyz", "workflow_name": "approval_workflow", "workflow_id": "wf_abc", "namespace": "user_123", "initial_input": { "request_id": "req_456", "amount": 5000 } } }
workflow.instance.halted:
{ "id": "evt_006", "type": "workflow.instance.halted", "timestamp": "2026-01-20T12:00:00Z", "data": { "instance_id": "inst_xyz", "workflow_name": "approval_workflow", "namespace": "user_123", "current_state": "await_approval", "halting_presentation": { "type": "form", "title": "Approval Required", "fields": [...] } } }
workflow.instance.completed:
{ "id": "evt_007", "type": "workflow.instance.completed", "timestamp": "2026-01-20T12:00:00Z", "data": { "instance_id": "inst_xyz", "workflow_name": "approval_workflow", "namespace": "user_123", "final_state": "approved", "workflow_data": { "request_id": "req_456", "approved": true, "amount": 5000 } } }
workflow.instance.failed:
{ "id": "evt_008", "type": "workflow.instance.failed", "timestamp": "2026-01-20T12:00:00Z", "data": { "instance_id": "inst_xyz", "workflow_name": "approval_workflow", "namespace": "user_123", "failed_state": "process_payment", "error": { "code": "payment_declined", "message": "Card was declined by issuer" } } }
agent.room.message:
{ "id": "evt_009", "type": "agent.room.message", "timestamp": "2026-01-20T12:00:00Z", "data": { "room_id": "room_xyz", "assistant_id": "asst_abc", "namespace": "user_123", "message_id": "msg_123", "role": "assistant", "content": "Here's what I found about your billing question..." } }
agent.room.closed:
{ "id": "evt_010", "type": "agent.room.closed", "timestamp": "2026-01-20T12:00:00Z", "data": { "room_id": "room_xyz", "assistant_id": "asst_abc", "namespace": "user_123", "reason": "user_closed", "message_count": 15, "duration_seconds": 300 } }
Note: Not all events require explicit handlers. For example,
andcredential.createdare often only logged for audit purposes, whilecredential.refreshedmay only need tracking in analytics systems.workflow.instance.created
Verifying Webhook Signatures
CRITICAL: Always verify signatures to ensure webhooks came from Interactor.
Signature Header Format
Webhooks include two headers for verification:
X-Interactor-Signature: sha256=<64 hex characters> X-Interactor-Timestamp: 2026-01-20T12:00:00Z
Example:
X-Interactor-Signature: sha256=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2 X-Interactor-Timestamp: 2026-01-20T12:00:00Z
Format validation: The signature header MUST match the format
followed by exactly 64 lowercase hexadecimal characters. Reject any other format.sha256=
Preventing Replay Attacks
CRITICAL: Always validate the timestamp to prevent replay attacks.
- Parse
as ISO8601X-Interactor-Timestamp - Reject requests where
(recommended: 5 minutes)|now - timestamp| > allowed_skew - Verify signature only after timestamp validation passes
const MAX_TIMESTAMP_SKEW_SECONDS = 300; // 5 minutes function validateTimestamp(timestampHeader: string | undefined): boolean { if (!timestampHeader) return false; const timestamp = new Date(timestampHeader); if (isNaN(timestamp.getTime())) return false; const now = Date.now(); const diff = Math.abs(now - timestamp.getTime()); return diff <= MAX_TIMESTAMP_SKEW_SECONDS * 1000; }
Key Rotation & Multiple Active Secrets
When rotating webhook secrets, you may have a period where both old and new secrets are valid:
function verifyWithMultipleSecrets( payload: string, signatureHeader: string, secrets: string[] ): boolean { for (const secret of secrets) { if (isValidSignature(signatureHeader, Buffer.from(payload), secret)) { return true; } } return false; } // During rotation, configure both secrets: const WEBHOOK_SECRETS = [ process.env.INTERACTOR_WEBHOOK_SECRET!, // Current secret process.env.INTERACTOR_WEBHOOK_SECRET_PREVIOUS!, // Previous secret (optional) ].filter(Boolean);
Rotation procedure:
- Generate new secret via
endpoint/regenerate-secret - Deploy new secret to
INTERACTOR_WEBHOOK_SECRET - Keep old secret in
for 24-48 hoursINTERACTOR_WEBHOOK_SECRET_PREVIOUS - Remove old secret after all in-flight events have been delivered
TypeScript/Node.js Verification
import crypto from 'crypto'; import express from 'express'; const app = express(); const MAX_TIMESTAMP_SKEW_MS = 5 * 60 * 1000; // 5 minutes /** * Validates the X-Interactor-Timestamp header to prevent replay attacks. * Returns true if the timestamp is within the allowed skew window. */ function validateTimestamp(timestampHeader: string | undefined): boolean { if (!timestampHeader) return false; const timestamp = new Date(timestampHeader); if (isNaN(timestamp.getTime())) return false; const diff = Math.abs(Date.now() - timestamp.getTime()); return diff <= MAX_TIMESTAMP_SKEW_MS; } /** * Validates and verifies the webhook signature using timing-safe comparison. * Properly handles the sha256= prefix and validates hex format. * * IMPORTANT: This function never throws - it returns false for any invalid input. */ function isValidSignature( signatureHeader: string | undefined, payload: Buffer, secret: string ): boolean { // Guard: header must exist if (!signatureHeader) return false; // Guard: header must match exact format sha256=<64 hex chars> const match = signatureHeader.match(/^sha256=([0-9a-f]{64})$/); if (!match) return false; const providedSignature = match[1]; // Compute expected signature const expectedSignature = crypto .createHmac('sha256', secret) .update(payload) .digest('hex'); // Convert to buffers for timing-safe comparison // Both are now guaranteed to be 64 hex chars = 32 bytes when decoded const providedBuffer = Buffer.from(providedSignature, 'hex'); const expectedBuffer = Buffer.from(expectedSignature, 'hex'); // Length check (should always pass given regex, but defense in depth) if (providedBuffer.length !== expectedBuffer.length) return false; return crypto.timingSafeEqual(providedBuffer, expectedBuffer); } // IMPORTANT: Use raw body for signature verification app.post( '/webhooks/interactor', express.raw({ type: 'application/json' }), (req, res) => { const signatureHeader = req.headers['x-interactor-signature'] as string; const timestampHeader = req.headers['x-interactor-timestamp'] as string; const payload = req.body; // Keep as Buffer // Step 1: Validate timestamp (prevent replay attacks) if (!validateTimestamp(timestampHeader)) { console.warn('Webhook rejected: invalid or stale timestamp'); return res.status(401).json({ error: 'invalid_timestamp' }); } // Step 2: Verify signature if (!isValidSignature(signatureHeader, payload, process.env.INTERACTOR_WEBHOOK_SECRET!)) { console.warn('Webhook rejected: invalid signature'); return res.status(401).json({ error: 'invalid_signature' }); } // Step 3: Parse and handle event let event: WebhookEvent; try { event = JSON.parse(payload.toString()); } catch { return res.status(400).json({ error: 'invalid_json' }); } console.log(`Received event: ${event.type} (${event.id})`); // Handle asynchronously - respond immediately handleWebhookEvent(event).catch((err) => { console.error(`Failed to process event ${event.id}:`, err); }); // Always respond quickly (< 5 seconds) res.status(200).json({ received: true }); } ); async function handleWebhookEvent(event: WebhookEvent) { switch (event.type) { case 'credential.expired': case 'credential.revoked': // Notify user to reconnect their account await notifyUserToReconnect( event.data.namespace, event.data.service_name ); break; case 'workflow.instance.halted': // Notify user they have a pending approval await notifyUserOfPendingApproval( event.data.namespace, event.data.instance_id, event.data.halting_presentation ); break; case 'workflow.instance.completed': // Process completed workflow await processCompletedWorkflow( event.data.instance_id, event.data.workflow_data ); break; case 'workflow.instance.failed': // Handle workflow failure await handleWorkflowFailure( event.data.namespace, event.data.instance_id, event.data.error, event.data.failed_state ); break; case 'agent.room.message': // Forward message to real-time channel (if not using SSE) await forwardMessageToClient( event.data.namespace, event.data.room_id, event.data.message_id, event.data.content ); break; } } // Webhook event types for type safety type WebhookEventType = | 'credential.created' | 'credential.refreshed' | 'credential.expired' | 'credential.revoked' | 'workflow.instance.created' | 'workflow.instance.completed' | 'workflow.instance.failed' | 'workflow.instance.halted' | 'agent.room.message' | 'agent.room.closed'; // SSE-only event types (not available via webhooks) type SSEEventType = | 'state_changed' | 'workflow_data_updated' | 'halted' | 'resumed' | 'completed' | 'message' | 'message_start' | 'message_delta' | 'message_end' | 'tool_use' | 'tool_result' | 'heartbeat'; interface WebhookEvent<T = Record<string, unknown>> { id: string; type: WebhookEventType; timestamp: string; data: T; } // Specific payload types for each event interface CredentialEventData { credential_id: string; service_id: string; service_name?: string; namespace: string; reason?: string; scopes?: string[]; expires_at?: string; // For credential.refreshed } interface WorkflowEventData { instance_id: string; workflow_name: string; workflow_id?: string; namespace: string; status?: 'created' | 'running' | 'halted' | 'completed' | 'failed'; current_state?: string; final_state?: string; failed_state?: string; error?: { code: string; message: string }; initial_input?: Record<string, unknown>; workflow_data?: Record<string, unknown>; output?: Record<string, unknown>; halting_presentation?: Record<string, unknown>; } interface AgentMessageEventData { room_id: string; assistant_id: string; namespace: string; message_id: string; role: 'user' | 'assistant'; content: string; } interface AgentRoomClosedEventData { room_id: string; assistant_id: string; namespace: string; reason: 'user_closed' | 'timeout' | 'error'; message_count: number; duration_seconds: number; }
Python/Flask Verification
import hmac import hashlib import os import re from datetime import datetime, timezone, timedelta from flask import Flask, request, jsonify app = Flask(__name__) MAX_TIMESTAMP_SKEW = timedelta(minutes=5) SIGNATURE_PATTERN = re.compile(r'^sha256=([0-9a-f]{64})$') def validate_timestamp(timestamp_header: str | None) -> bool: """Validate timestamp to prevent replay attacks.""" if not timestamp_header: return False try: timestamp = datetime.fromisoformat(timestamp_header.replace('Z', '+00:00')) now = datetime.now(timezone.utc) return abs(now - timestamp) <= MAX_TIMESTAMP_SKEW except (ValueError, TypeError): return False def is_valid_signature(signature_header: str | None, payload: bytes, secret: str) -> bool: """ Validate and verify webhook signature with timing-safe comparison. Returns False for any invalid input - never raises exceptions. """ if not signature_header: return False # Validate format: sha256=<64 hex chars> match = SIGNATURE_PATTERN.match(signature_header) if not match: return False provided_signature = match.group(1) expected_signature = hmac.new( secret.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(provided_signature, expected_signature) @app.route('/webhooks/interactor', methods=['POST']) def handle_webhook(): signature_header = request.headers.get('X-Interactor-Signature') timestamp_header = request.headers.get('X-Interactor-Timestamp') payload = request.get_data() # Step 1: Validate timestamp (prevent replay attacks) if not validate_timestamp(timestamp_header): print('Webhook rejected: invalid or stale timestamp') return jsonify({'error': 'invalid_timestamp'}), 401 # Step 2: Verify signature if not is_valid_signature(signature_header, payload, os.environ['INTERACTOR_WEBHOOK_SECRET']): print('Webhook rejected: invalid signature') return jsonify({'error': 'invalid_signature'}), 401 # Step 3: Parse and handle event try: event = request.get_json() except Exception: return jsonify({'error': 'invalid_json'}), 400 print(f"Received event: {event['type']} ({event['id']})") # Handle asynchronously (use Celery, RQ, or similar in production) handle_webhook_event(event) # Always respond quickly (< 5 seconds) return jsonify({'received': True}), 200 def handle_webhook_event(event: dict): event_type = event['type'] data = event['data'] if event_type in ['credential.expired', 'credential.revoked']: notify_user_to_reconnect(data['namespace'], data.get('service_name')) elif event_type == 'workflow.instance.halted': notify_user_of_pending_approval( data['namespace'], data['instance_id'], data.get('halting_presentation') ) elif event_type == 'workflow.instance.completed': process_completed_workflow(data['instance_id'], data.get('workflow_data')) elif event_type == 'workflow.instance.failed': handle_workflow_failure( data['namespace'], data['instance_id'], data.get('error'), data.get('failed_state') ) elif event_type == 'agent.room.message': forward_message_to_client( data['namespace'], data['room_id'], data['message_id'], data['content'] )
Elixir/Phoenix Verification
defmodule MyAppWeb.WebhookController do use MyAppWeb, :controller import Plug.Conn, only: [get_req_header: 2] # Maximum body size for webhooks (1MB should be plenty) @max_body_length 1_048_576 # Maximum timestamp skew (5 minutes in seconds) @max_timestamp_skew 300 # Regex to validate signature format: sha256=<64 hex chars> @signature_pattern ~r/^sha256=([0-9a-f]{64})$/ def interactor(conn, _params) do signature_header = get_req_header(conn, "x-interactor-signature") |> List.first() timestamp_header = get_req_header(conn, "x-interactor-timestamp") |> List.first() with {:ok, payload, conn} <- read_body(conn, length: @max_body_length), :ok <- validate_timestamp(timestamp_header), secret <- Application.fetch_env!(:my_app, :interactor_webhook_secret), :ok <- verify_signature(payload, signature_header, secret), {:ok, event} <- Jason.decode(payload) do # Handle asynchronously to respond quickly Task.start(fn -> handle_event(event) end) conn |> put_status(200) |> json(%{received: true}) else {:more, _partial, conn} -> conn |> put_status(413) |> json(%{error: "payload_too_large"}) {:error, :invalid_timestamp} -> conn |> put_status(401) |> json(%{error: "invalid_timestamp"}) {:error, :invalid_signature} -> conn |> put_status(401) |> json(%{error: "invalid_signature"}) {:error, _reason} -> conn |> put_status(400) |> json(%{error: "invalid_json"}) end end defp validate_timestamp(nil), do: {:error, :invalid_timestamp} defp validate_timestamp(timestamp_header) do case DateTime.from_iso8601(timestamp_header) do {:ok, timestamp, _offset} -> now = DateTime.utc_now() diff = abs(DateTime.diff(now, timestamp, :second)) if diff <= @max_timestamp_skew do :ok else {:error, :invalid_timestamp} end {:error, _} -> {:error, :invalid_timestamp} end end defp verify_signature(_payload, nil, _secret), do: {:error, :invalid_signature} defp verify_signature(payload, signature_header, secret) do case Regex.run(@signature_pattern, signature_header) do [_, provided_hex] -> expected_hex = :crypto.mac(:hmac, :sha256, secret, payload) |> Base.encode16(case: :lower) if Plug.Crypto.secure_compare(provided_hex, expected_hex) do :ok else {:error, :invalid_signature} end _ -> {:error, :invalid_signature} end end defp handle_event(%{"type" => "credential.expired", "data" => data}) do MyApp.Notifications.notify_reconnect(data["namespace"], data["service_name"]) end defp handle_event(%{"type" => "workflow.instance.halted", "data" => data}) do MyApp.Notifications.notify_pending_approval( data["namespace"], data["instance_id"], data["halting_presentation"] ) end defp handle_event(%{"type" => "workflow.instance.completed", "data" => data}) do MyApp.Workflows.process_completed(data["instance_id"], data["workflow_data"]) end defp handle_event(%{"type" => "workflow.instance.failed", "data" => data}) do MyApp.Workflows.handle_failure( data["namespace"], data["instance_id"], data["error"], data["failed_state"] ) end defp handle_event(%{"type" => "agent.room.message", "data" => data}) do MyApp.Chat.forward_message( data["namespace"], data["room_id"], data["message_id"], data["content"] ) end defp handle_event(_event), do: :ok end
Retry Policy
Interactor retries failed webhook deliveries with exponential backoff:
| Attempt | Delay | Total Time |
|---|---|---|
| 1 | Immediate | 0 |
| 2 | 1 minute | 1 min |
| 3 | 5 minutes | 6 min |
| 4 | 30 minutes | 36 min |
| 5 | 2 hours | 2h 36min |
After 5 failed attempts, the webhook is disabled. Re-enable via the toggle endpoint.
HTTP Response Semantics
Your webhook handler's HTTP response determines Interactor's retry behavior:
| HTTP Status | Interactor Behavior | Your Action |
|---|---|---|
| ✅ Success - no retry | Event processed successfully |
| ❌ Permanent failure - no retry | Bad request, fix your handler |
| ❌ Permanent failure - no retry | Signature invalid, check secret |
| ❌ Permanent failure - no retry | Forbidden, check permissions |
| ❌ Permanent failure - no retry | Endpoint not found, check URL |
| 🔄 Retry with backoff | Request timeout, respond faster |
| 🔄 Retry with backoff | Rate limited, will retry later |
| 🔄 Retry with backoff | Server error, will retry |
| 🔄 Retry with backoff | Gateway/timeout, will retry |
| Timeout (>30s) | 🔄 Retry with backoff | No response received, will retry |
| Connection refused | 🔄 Retry with backoff | Server unreachable, will retry |
Important: Return
immediately, then process asynchronously. If you return200 OKerrors for transient issues, Interactor won't retry.4xx
Best Practices for Reliability
- Respond quickly - Return 200 within 5 seconds
- Process asynchronously - Queue events for background processing
- Be idempotent - Handle duplicate deliveries gracefully
- Log event IDs - Track which events you've processed
Idempotent Event Processing
// Example: Idempotent event processing with Redis const IDEMPOTENCY_TTL = 7 * 24 * 60 * 60; // 7 days in seconds async function handleWebhookEvent(event: WebhookEvent) { const idempotencyKey = `webhook:processed:${event.id}`; // Atomic check-and-set to prevent race conditions const wasSet = await redis.set(idempotencyKey, Date.now(), 'NX', 'EX', IDEMPOTENCY_TTL); if (!wasSet) { console.log(`Event ${event.id} already processed, skipping`); return; } try { await processEvent(event); console.log(`Successfully processed event ${event.id}`); } catch (error) { // Delete the key so retry can process it await redis.del(idempotencyKey); throw error; } }
Idempotency Storage Recommendations:
| Storage | TTL | Use Case |
|---|---|---|
| Redis | 7 days | High-throughput, distributed systems |
| PostgreSQL | 30 days | Audit trail needed, lower throughput |
| In-memory | Session | Development/testing only |
Dead-Letter Queue (DLQ) Strategy
For events that repeatedly fail processing, implement a DLQ:
const MAX_PROCESS_ATTEMPTS = 3; const DLQ_KEY = 'webhook:dlq'; async function handleWebhookEvent(event: WebhookEvent) { const attemptKey = `webhook:attempts:${event.id}`; const attempts = await redis.incr(attemptKey); await redis.expire(attemptKey, 24 * 60 * 60); // 24 hour window try { await processEvent(event); await redis.del(attemptKey); } catch (error) { if (attempts >= MAX_PROCESS_ATTEMPTS) { // Move to DLQ for manual review await redis.rpush(DLQ_KEY, JSON.stringify({ event, error: error.message, failedAt: new Date().toISOString(), attempts })); await redis.del(attemptKey); console.error(`Event ${event.id} moved to DLQ after ${attempts} attempts`); // Alert operations team await alertOps(`Webhook event ${event.id} failed ${attempts} times`); } else { console.warn(`Event ${event.id} failed (attempt ${attempts}/${MAX_PROCESS_ATTEMPTS})`); throw error; // Will be retried by Interactor } } } // Periodic DLQ processor (run via cron) async function processDLQ() { while (true) { const item = await redis.lpop(DLQ_KEY); if (!item) break; const { event, error, failedAt } = JSON.parse(item); console.log(`DLQ item: ${event.id} failed at ${failedAt}: ${error}`); // Manual review or automated recovery logic } }
Monitoring & Observability
Track webhook health with these metrics:
Prometheus Metrics Example:
import { Counter, Histogram, Gauge } from 'prom-client'; // Webhook metrics const webhookReceived = new Counter({ name: 'interactor_webhook_received_total', help: 'Total webhooks received', labelNames: ['event_type', 'status'] // status: success, invalid_signature, invalid_timestamp, processing_error }); const webhookProcessingDuration = new Histogram({ name: 'interactor_webhook_processing_duration_seconds', help: 'Webhook processing duration in seconds', labelNames: ['event_type'], buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5] }); const webhookDLQSize = new Gauge({ name: 'interactor_webhook_dlq_size', help: 'Current size of the dead-letter queue' }); // Usage in handler app.post('/webhooks/interactor', async (req, res) => { const timer = webhookProcessingDuration.startTimer(); try { // ... validation ... if (!isValidSignature(...)) { webhookReceived.inc({ event_type: 'unknown', status: 'invalid_signature' }); return res.status(401).json({ error: 'invalid_signature' }); } const event = JSON.parse(payload.toString()); webhookReceived.inc({ event_type: event.type, status: 'success' }); await handleWebhookEvent(event); timer({ event_type: event.type }); res.status(200).json({ received: true }); } catch (error) { webhookReceived.inc({ event_type: 'unknown', status: 'processing_error' }); timer({ event_type: 'error' }); throw error; } });
Key Metrics to Monitor:
| Metric | Alert Threshold | Description |
|---|---|---|
| >5 in 5min | Possible secret mismatch or attack |
| p99 >5s | Risk of timeout, scale handlers |
| >0 | Events need manual review |
| >10 in 5min | Handler bugs, investigate logs |
Server-Sent Events (SSE)
For real-time streaming in browsers and clients.
Workflow Instance Stream
Stream updates for a specific workflow instance:
curl -N https://core.interactor.com/api/v1/workflows/instances/inst_xyz/stream \ -H "Authorization: Bearer <token>" \ -H "Accept: text/event-stream"
Events:
event: state_changed data: {"state": "manager_approval", "previous_state": "submit", "thread_id": "thread_main"} event: workflow_data_updated data: {"key": "submitted_at", "value": "2026-01-20T12:00:00Z"} event: halted data: {"state": "manager_approval", "presentation": {...}} event: resumed data: {"state": "manager_approval", "input": {"approved": true}} event: completed data: {"status": "completed", "final_state": "approved", "output": {...}}
Chat Room Stream
Stream messages in a chat room:
curl -N https://core.interactor.com/api/v1/agents/rooms/room_xyz/stream \ -H "Authorization: Bearer <token>" \ -H "Accept: text/event-stream"
Events:
event: message data: {"id": "msg_1", "role": "user", "content": "Hello"} event: message_start data: {"id": "msg_2", "role": "assistant"} event: message_delta data: {"id": "msg_2", "delta": "Hi there! "} event: message_delta data: {"id": "msg_2", "delta": "How can I "} event: message_delta data: {"id": "msg_2", "delta": "help you today?"} event: message_end data: {"id": "msg_2", "role": "assistant", "content": "Hi there! How can I help you today?"} event: tool_use data: {"id": "call_1", "tool": "search_products", "parameters": {"query": "laptop"}} event: tool_result data: {"id": "call_1", "tool": "search_products", "result": {"products": [...]}} event: heartbeat data: {"timestamp": "2026-01-20T12:00:30Z"}
SSE Security Best Practices
Short-Lived Tokens
Since EventSource doesn't support custom headers, tokens must be passed in the URL. Use short-lived tokens to minimize exposure:
// Backend: Generate short-lived SSE token (5 minute expiry) app.post('/api/sse-token', authenticate, async (req, res) => { const sseToken = jwt.sign( { sub: req.user.id, purpose: 'sse', roomId: req.body.roomId // Scope token to specific resource }, process.env.SSE_TOKEN_SECRET, { expiresIn: '5m' } ); res.json({ token: sseToken, expiresIn: 300 }); }); // Frontend: Request token before connecting async function connectToSSE(roomId: string) { const { token } = await fetch('/api/sse-token', { method: 'POST', headers: { 'Authorization': `Bearer ${authToken}` }, body: JSON.stringify({ roomId }) }).then(r => r.json()); return new EventSource(`/api/sse/rooms/${roomId}?token=${token}`); }
Server-Side Proxy Pattern
Proxy SSE connections through your backend to avoid exposing Interactor tokens:
// Your backend proxies SSE from Interactor app.get('/api/sse/rooms/:roomId', authenticate, (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); // Connect to Interactor with server-side token const upstream = new EventSource( `https://core.interactor.com/api/v1/agents/rooms/${req.params.roomId}/stream`, { headers: { 'Authorization': `Bearer ${process.env.INTERACTOR_ACCESS_TOKEN}` } } ); // Forward events to client upstream.onmessage = (event) => { res.write(`event: ${event.type}\ndata: ${event.data}\n\n`); }; req.on('close', () => upstream.close()); });
CORS Configuration
If connecting directly to Interactor from the browser:
// Ensure your domain is whitelisted in Interactor settings // Interactor will include these headers: // Access-Control-Allow-Origin: https://yourdomain.com // Access-Control-Allow-Credentials: true // Your CSP should allow connections: // connect-src 'self' https://core.interactor.com;
Heartbeat & Connection Health
Interactor sends heartbeat events every 30 seconds. Use them to detect stale connections:
const HEARTBEAT_TIMEOUT_MS = 45_000; // 30s interval + 15s grace period let lastHeartbeat = Date.now(); let healthCheckInterval: NodeJS.Timeout; function setupHealthCheck(eventSource: EventSource, onStale: () => void) { eventSource.addEventListener('heartbeat', () => { lastHeartbeat = Date.now(); }); healthCheckInterval = setInterval(() => { const timeSinceHeartbeat = Date.now() - lastHeartbeat; if (timeSinceHeartbeat > HEARTBEAT_TIMEOUT_MS) { console.warn(`SSE connection stale (${timeSinceHeartbeat}ms since last heartbeat)`); onStale(); } }, 10_000); // Check every 10 seconds } function cleanup() { clearInterval(healthCheckInterval); eventSource.close(); }
Health Thresholds:
| Condition | Threshold | Action |
|---|---|---|
| Normal | Heartbeat within 30s | Connection healthy |
| Warning | 30-45s since heartbeat | Log warning, prepare reconnect |
| Stale | >45s since heartbeat | Force reconnect |
| Failed | 3 consecutive reconnect failures | Alert user, escalate to support |
SSE Client Implementations
Browser (Native EventSource)
const token = 'your_access_token'; const roomId = 'room_xyz'; // Note: EventSource doesn't support custom headers // Pass token as query parameter const eventSource = new EventSource( `https://core.interactor.com/api/v1/agents/rooms/${roomId}/stream?token=${token}` ); // ⚠️ SECURITY NOTE: Token in URL may appear in server access logs. // For enhanced security, use short-lived tokens specifically for SSE connections. // Consider using a dedicated SSE token endpoint that issues time-limited tokens. // Handle different event types eventSource.addEventListener('message', (event) => { const data = JSON.parse(event.data); displayMessage(data); }); eventSource.addEventListener('message_start', (event) => { const data = JSON.parse(event.data); startStreamingMessage(data.id); }); eventSource.addEventListener('message_delta', (event) => { const data = JSON.parse(event.data); appendToStreamingMessage(data.id, data.delta); }); eventSource.addEventListener('message_end', (event) => { const data = JSON.parse(event.data); finalizeStreamingMessage(data.id, data.content); }); eventSource.addEventListener('tool_use', (event) => { const data = JSON.parse(event.data); showToolUsage(data.tool, data.parameters); }); eventSource.addEventListener('tool_result', (event) => { const data = JSON.parse(event.data); showToolResult(data.tool, data.result); }); eventSource.addEventListener('heartbeat', (event) => { // Connection is alive updateLastHeartbeat(); }); eventSource.onerror = (error) => { console.error('SSE error:', error); // Implement reconnection logic if (eventSource.readyState === EventSource.CLOSED) { setTimeout(() => reconnect(), 5000); } }; // Clean up when done function cleanup() { eventSource.close(); }
React Hook for Chat Streaming
import { useEffect, useState, useRef, useCallback } from 'react'; interface Message { id: string; role: 'user' | 'assistant'; content: string; isStreaming?: boolean; } interface UseChatStreamOptions { roomId: string; token: string; onError?: (error: Error) => void; } export function useChatStream({ roomId, token, onError }: UseChatStreamOptions) { const [messages, setMessages] = useState<Message[]>([]); const [isConnected, setIsConnected] = useState(false); const [isStreaming, setIsStreaming] = useState(false); const eventSourceRef = useRef<EventSource | null>(null); const streamingContentRef = useRef<Map<string, string>>(new Map()); // Use ref for onError to prevent infinite re-renders // when onError is an inline function const onErrorRef = useRef(onError); useEffect(() => { onErrorRef.current = onError; }, [onError]); const connect = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); } const url = `https://core.interactor.com/api/v1/agents/rooms/${roomId}/stream?token=${token}`; const eventSource = new EventSource(url); eventSourceRef.current = eventSource; eventSource.onopen = () => { setIsConnected(true); }; eventSource.addEventListener('message', (event) => { const data = JSON.parse(event.data); setMessages(prev => [...prev, data]); }); eventSource.addEventListener('message_start', (event) => { const data = JSON.parse(event.data); setIsStreaming(true); streamingContentRef.current.set(data.id, ''); setMessages(prev => [...prev, { id: data.id, role: 'assistant', content: '', isStreaming: true }]); }); eventSource.addEventListener('message_delta', (event) => { const data = JSON.parse(event.data); const currentContent = streamingContentRef.current.get(data.id) || ''; const newContent = currentContent + data.delta; streamingContentRef.current.set(data.id, newContent); setMessages(prev => prev.map(msg => msg.id === data.id ? { ...msg, content: newContent } : msg )); }); eventSource.addEventListener('message_end', (event) => { const data = JSON.parse(event.data); setIsStreaming(false); streamingContentRef.current.delete(data.id); setMessages(prev => prev.map(msg => msg.id === data.id ? { ...msg, content: data.content, isStreaming: false } : msg )); }); eventSource.onerror = () => { setIsConnected(false); onErrorRef.current?.(new Error('SSE connection error')); // Auto-reconnect after 5 seconds setTimeout(() => { if (eventSourceRef.current?.readyState === EventSource.CLOSED) { connect(); } }, 5000); }; }, [roomId, token]); // Note: onError removed, using ref instead const disconnect = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } // Clean up streaming content map to prevent memory leaks streamingContentRef.current.clear(); setIsConnected(false); setIsStreaming(false); }, []); useEffect(() => { connect(); return () => disconnect(); }, [connect, disconnect]); return { messages, isConnected, isStreaming, reconnect: connect, disconnect }; } // Usage in component function ChatRoom({ roomId, token }: { roomId: string; token: string }) { const { messages, isConnected, isStreaming } = useChatStream({ roomId, token, onError: (error) => console.error('Chat error:', error) }); return ( <div className="chat-room"> <div className="status"> {isConnected ? '🟢 Connected' : '🔴 Disconnected'} {isStreaming && ' (typing...)'} </div> <div className="messages"> {messages.map((msg) => ( <div key={msg.id} className={`message ${msg.role}`}> <div className="content"> {msg.content} {msg.isStreaming && <span className="cursor">▊</span>} </div> </div> ))} </div> </div> ); }
Node.js SSE Client
import EventSource from 'eventsource'; class InteractorSSEClient { private eventSource: EventSource | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 5; constructor( private baseUrl: string, private token: string ) {} connectToRoom(roomId: string, handlers: { onMessage?: (message: any) => void; onMessageStart?: (data: any) => void; onMessageDelta?: (data: any) => void; onMessageEnd?: (data: any) => void; onToolUse?: (data: any) => void; onToolResult?: (data: any) => void; onHeartbeat?: (data: any) => void; onError?: (error: Error) => void; }) { const url = `${this.baseUrl}/agents/rooms/${roomId}/stream`; this.eventSource = new EventSource(url, { headers: { 'Authorization': `Bearer ${this.token}` } }); this.eventSource.onopen = () => { console.log('SSE connected to room:', roomId); this.reconnectAttempts = 0; }; if (handlers.onMessage) { this.eventSource.addEventListener('message', (event) => { handlers.onMessage!(JSON.parse(event.data)); }); } if (handlers.onMessageStart) { this.eventSource.addEventListener('message_start', (event) => { handlers.onMessageStart!(JSON.parse(event.data)); }); } if (handlers.onMessageDelta) { this.eventSource.addEventListener('message_delta', (event) => { handlers.onMessageDelta!(JSON.parse(event.data)); }); } if (handlers.onMessageEnd) { this.eventSource.addEventListener('message_end', (event) => { handlers.onMessageEnd!(JSON.parse(event.data)); }); } if (handlers.onToolUse) { this.eventSource.addEventListener('tool_use', (event) => { handlers.onToolUse!(JSON.parse(event.data)); }); } if (handlers.onToolResult) { this.eventSource.addEventListener('tool_result', (event) => { handlers.onToolResult!(JSON.parse(event.data)); }); } if (handlers.onHeartbeat) { this.eventSource.addEventListener('heartbeat', (event) => { handlers.onHeartbeat!(JSON.parse(event.data)); }); } this.eventSource.onerror = (error) => { console.error('SSE error:', error); handlers.onError?.(new Error('SSE connection error')); // Auto-reconnect with backoff if (this.reconnectAttempts < this.maxReconnectAttempts) { const delay = Math.pow(2, this.reconnectAttempts) * 1000; this.reconnectAttempts++; setTimeout(() => this.connectToRoom(roomId, handlers), delay); } }; return this; } connectToWorkflow(instanceId: string, handlers: { onStateChanged?: (data: any) => void; onWorkflowDataUpdated?: (data: any) => void; onHalted?: (data: any) => void; onResumed?: (data: any) => void; onCompleted?: (data: any) => void; onHeartbeat?: (data: any) => void; onError?: (error: Error) => void; }) { const url = `${this.baseUrl}/workflows/instances/${instanceId}/stream`; this.eventSource = new EventSource(url, { headers: { 'Authorization': `Bearer ${this.token}` } }); this.eventSource.onopen = () => { console.log('SSE connected to workflow:', instanceId); this.reconnectAttempts = 0; }; if (handlers.onStateChanged) { this.eventSource.addEventListener('state_changed', (event) => { handlers.onStateChanged!(JSON.parse(event.data)); }); } if (handlers.onWorkflowDataUpdated) { this.eventSource.addEventListener('workflow_data_updated', (event) => { handlers.onWorkflowDataUpdated!(JSON.parse(event.data)); }); } if (handlers.onHalted) { this.eventSource.addEventListener('halted', (event) => { handlers.onHalted!(JSON.parse(event.data)); }); } if (handlers.onResumed) { this.eventSource.addEventListener('resumed', (event) => { handlers.onResumed!(JSON.parse(event.data)); }); } if (handlers.onCompleted) { this.eventSource.addEventListener('completed', (event) => { handlers.onCompleted!(JSON.parse(event.data)); }); } if (handlers.onHeartbeat) { this.eventSource.addEventListener('heartbeat', (event) => { handlers.onHeartbeat!(JSON.parse(event.data)); }); } this.eventSource.onerror = (error) => { console.error('SSE error:', error); handlers.onError?.(new Error('SSE connection error')); // Auto-reconnect with backoff (same as room connection) if (this.reconnectAttempts < this.maxReconnectAttempts) { const delay = Math.pow(2, this.reconnectAttempts) * 1000; this.reconnectAttempts++; setTimeout(() => this.connectToWorkflow(instanceId, handlers), delay); } }; return this; } disconnect() { if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } } } // Usage const sseClient = new InteractorSSEClient( 'https://core.interactor.com/api/v1', process.env.INTERACTOR_ACCESS_TOKEN! ); sseClient.connectToRoom('room_xyz', { onMessageDelta: (data) => { process.stdout.write(data.delta); }, onMessageEnd: (data) => { console.log('\n--- Message complete ---'); }, onError: (error) => { console.error('Error:', error); } });
Rate Limits
| Resource | Limit |
|---|---|
| Webhooks per account | 50 |
| SSE connections per account | 10 concurrent |
| Events per webhook | Unlimited |
Best Practices
Webhooks
- Always verify signatures - Reject requests with invalid signatures
- Respond quickly - Return 200 within 5 seconds, process asynchronously
- Handle duplicates - Events may be delivered more than once
- Use idempotent processing - Track event IDs to prevent double-processing
- Monitor delivery - Check webhook events list for failures
- Use HTTPS - Required for production webhooks
SSE
- Handle reconnection - SSE connections may drop; implement auto-reconnect
- Watch for heartbeats - Detect stale connections
- Close when done - Close connections when leaving pages/screens
- Limit connections - Max 10 concurrent SSE connections per account
- Use for frontend only - For backend, prefer webhooks
Local Development & Testing
Testing Webhooks Locally
Webhooks require a publicly accessible URL. For local development:
Option 1: ngrok (Recommended)
# Install ngrok: https://ngrok.com/download ngrok http 4000 # For Phoenix default port # Use the generated URL for your webhook # Example: https://abc123.ngrok.io/webhooks/interactor
Option 2: localtunnel
npm install -g localtunnel lt --port 4000 # Use the generated URL for your webhook
Option 3: Cloudflare Tunnel
cloudflared tunnel --url http://localhost:4000
Testing Webhook Signature Verification
Create a test script to verify your signature implementation:
// test-webhook-signature.ts import crypto from 'crypto'; const secret = 'whsec_your_test_secret'; const payload = JSON.stringify({ id: 'evt_test', type: 'workflow.instance.completed', timestamp: new Date().toISOString(), data: { instance_id: 'inst_test' } }); const signature = 'sha256=' + crypto .createHmac('sha256', secret) .update(payload) .digest('hex'); console.log('Test payload:', payload); console.log('Test signature:', signature); // Use curl to test: // curl -X POST http://localhost:4000/webhooks/interactor \ // -H "Content-Type: application/json" \ // -H "X-Interactor-Signature: ${signature}" \ // -d '${payload}'
Use the Test Endpoint
Interactor provides a test endpoint to send sample events:
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/test \ -H "Authorization: Bearer <token>"
This sends a test event to your webhook URL to verify it's working.
Postman Collection
Import this collection to test webhook handling:
{ "info": { "name": "Interactor Webhooks", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" }, "variable": [ { "key": "webhook_secret", "value": "whsec_your_test_secret" }, { "key": "webhook_url", "value": "http://localhost:4000/webhooks/interactor" } ], "item": [ { "name": "Test Webhook - Credential Expired", "request": { "method": "POST", "url": "{{webhook_url}}", "header": [ { "key": "Content-Type", "value": "application/json" }, { "key": "X-Interactor-Signature", "value": "sha256={{signature}}" }, { "key": "X-Interactor-Timestamp", "value": "{{timestamp}}" } ], "body": { "mode": "raw", "raw": "{\"id\":\"evt_test_001\",\"type\":\"credential.expired\",\"timestamp\":\"{{timestamp}}\",\"data\":{\"credential_id\":\"cred_test\",\"service_id\":\"google_calendar\",\"namespace\":\"user_123\",\"reason\":\"refresh_token_invalid\"}}" } } } ] }
Pre-request script to generate signature:
const crypto = require('crypto-js'); const timestamp = new Date().toISOString(); const payload = pm.request.body.raw.replace(/\{\{timestamp\}\}/g, timestamp); const signature = crypto.HmacSHA256(payload, pm.variables.get('webhook_secret')).toString(); pm.variables.set('timestamp', timestamp); pm.variables.set('signature', signature); pm.request.body.raw = payload;
Unit Test Template (TypeScript/Jest)
import crypto from 'crypto'; import request from 'supertest'; import app from '../app'; // Your Express app describe('Webhook Handler', () => { const WEBHOOK_SECRET = 'whsec_test_secret_123'; beforeAll(() => { process.env.INTERACTOR_WEBHOOK_SECRET = WEBHOOK_SECRET; }); function generateSignature(payload: string): string { return 'sha256=' + crypto.createHmac('sha256', WEBHOOK_SECRET).update(payload).digest('hex'); } function generateTimestamp(offsetMs = 0): string { return new Date(Date.now() + offsetMs).toISOString(); } it('accepts valid webhook', async () => { const payload = JSON.stringify({ id: 'evt_test_001', type: 'workflow.instance.completed', timestamp: generateTimestamp(), data: { instance_id: 'inst_123' } }); const res = await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', generateSignature(payload)) .set('X-Interactor-Timestamp', generateTimestamp()) .send(payload); expect(res.status).toBe(200); expect(res.body.received).toBe(true); }); it('rejects invalid signature', async () => { const payload = JSON.stringify({ id: 'evt_test', type: 'test' }); const res = await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', 'sha256=invalid') .set('X-Interactor-Timestamp', generateTimestamp()) .send(payload); expect(res.status).toBe(401); expect(res.body.error).toBe('invalid_signature'); }); it('rejects malformed signature header', async () => { const payload = JSON.stringify({ id: 'evt_test', type: 'test' }); const res = await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', 'not_sha256_format') .set('X-Interactor-Timestamp', generateTimestamp()) .send(payload); expect(res.status).toBe(401); }); it('rejects stale timestamp (replay attack)', async () => { const payload = JSON.stringify({ id: 'evt_test', type: 'test' }); const staleTimestamp = generateTimestamp(-10 * 60 * 1000); // 10 minutes ago const res = await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', generateSignature(payload)) .set('X-Interactor-Timestamp', staleTimestamp) .send(payload); expect(res.status).toBe(401); expect(res.body.error).toBe('invalid_timestamp'); }); it('handles duplicate events idempotently', async () => { const eventId = 'evt_duplicate_test'; const payload = JSON.stringify({ id: eventId, type: 'workflow.instance.completed', timestamp: generateTimestamp(), data: { instance_id: 'inst_123' } }); // First request await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', generateSignature(payload)) .set('X-Interactor-Timestamp', generateTimestamp()) .send(payload); // Second request (duplicate) const res = await request(app) .post('/webhooks/interactor') .set('Content-Type', 'application/json') .set('X-Interactor-Signature', generateSignature(payload)) .set('X-Interactor-Timestamp', generateTimestamp()) .send(payload); expect(res.status).toBe(200); // Should still succeed, just skip processing }); });
Event Replay Script
Replay historical events for debugging or recovery:
#!/usr/bin/env npx ts-node // scripts/replay-webhook-events.ts import crypto from 'crypto'; import fetch from 'node-fetch'; interface ReplayOptions { webhookUrl: string; webhookSecret: string; events: Array<{ id: string; type: string; data: unknown }>; delayMs?: number; } async function replayEvents({ webhookUrl, webhookSecret, events, delayMs = 100 }: ReplayOptions) { for (const event of events) { const timestamp = new Date().toISOString(); const payload = JSON.stringify({ ...event, timestamp, _replayed: true, // Mark as replayed for debugging _originalTimestamp: event.timestamp }); const signature = 'sha256=' + crypto .createHmac('sha256', webhookSecret) .update(payload) .digest('hex'); try { const res = await fetch(webhookUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Interactor-Signature': signature, 'X-Interactor-Timestamp': timestamp }, body: payload }); console.log(`[${event.id}] ${event.type}: ${res.status} ${res.statusText}`); } catch (error) { console.error(`[${event.id}] FAILED:`, error); } await new Promise(r => setTimeout(r, delayMs)); } } // Usage: Fetch events from Interactor and replay locally const events = [ { id: 'evt_001', type: 'credential.expired', data: { credential_id: 'cred_abc', namespace: 'user_123' } }, { id: 'evt_002', type: 'workflow.instance.completed', data: { instance_id: 'inst_xyz' } } ]; replayEvents({ webhookUrl: 'http://localhost:4000/webhooks/interactor', webhookSecret: process.env.INTERACTOR_WEBHOOK_SECRET!, events });
Error Handling
Webhook API Errors
| Error Code | HTTP Status | Description | Resolution |
|---|---|---|---|
| 404 | Webhook ID doesn't exist | Verify webhook ID, may have been deleted |
| 400 | URL not valid HTTPS | Use URL (HTTP only in dev) |
| 400 | Unknown event types in subscription | Check for valid events |
| 400 | Webhook disabled after failures | Fix endpoint issues, then toggle to re-enable |
| 400 | Account webhook limit reached | Delete unused webhooks or contact support |
| 400 | Cannot reach webhook URL | Ensure URL is publicly accessible |
| 500 | Internal error generating secret | Retry request, contact support if persists |
| 429 | Too many API requests | Wait and retry with exponential backoff |
| 401 | Invalid or expired token | Refresh authentication token |
| 403 | Insufficient permissions | Check API token scopes |
Webhook Delivery Errors
Your endpoint may receive these error scenarios:
| Scenario | Your Response | Interactor Behavior |
|---|---|---|
| Signature mismatch | Return | Logged as authentication failure |
| Timestamp too old | Return | Logged as authentication failure |
| Unknown event type | Return | Treated as success (forward compatible) |
| Processing error (recoverable) | Return | Retried with backoff |
| Processing error (permanent) | Return | Not retried, logged as permanent failure |
| Timeout (no response) | N/A | Retried with backoff |
SSE Errors
| Error | HTTP Status | Cause | Resolution |
|---|---|---|---|
| Connection refused | 401 | Invalid or expired token | Refresh token and reconnect |
| Connection refused | 403 | No access to resource | Check permissions for room/workflow |
| Resource not found | 404 | Invalid room_id or instance_id | Verify resource exists |
| Connection dropped | N/A | Network issues | Implement auto-reconnect with backoff |
| Rate limited | 429 | Too many connections | Close unused connections, respect limits |
| Server error | 500 | Interactor service issue | Retry with backoff, check status page |
Rate Limit Exceeded Behavior
When you exceed rate limits, Interactor returns:
HTTP/1.1 429 Too Many Requests Retry-After: 60 X-RateLimit-Limit: 100 X-RateLimit-Remaining: 0 X-RateLimit-Reset: 1706176800 { "error": "rate_limited", "message": "Too many requests. Please retry after 60 seconds.", "retry_after": 60 }
Rate limit headers:
: Maximum requests per windowX-RateLimit-Limit
: Requests remaining in current windowX-RateLimit-Remaining
: Unix timestamp when the window resetsX-RateLimit-Reset
: Seconds to wait before retryingRetry-After
Handling rate limits:
async function callInteractorAPI(endpoint: string, options: RequestInit) { const response = await fetch(`https://core.interactor.com/api/v1${endpoint}`, options); if (response.status === 429) { const retryAfter = parseInt(response.headers.get('Retry-After') || '60', 10); console.warn(`Rate limited. Retrying after ${retryAfter}s`); await new Promise(r => setTimeout(r, retryAfter * 1000)); return callInteractorAPI(endpoint, options); // Retry once } return response; }
Output Format
When implementing webhooks/streaming, provide this summary:
## Webhooks & Streaming Implementation Report **Date**: YYYY-MM-DD ### Webhooks Configured | Webhook ID | URL | Events | Status | |------------|-----|--------|--------| | wh_abc | https://app.com/webhooks | credential.*, workflow.* | ✓ Active | ### Event Handlers | Event | Handler | Status | |-------|---------|--------| | credential.expired | notifyUserToReconnect() | ✓ Implemented | | credential.revoked | notifyUserToReconnect() | ✓ Implemented | | workflow.instance.halted | notifyPendingApproval() | ✓ Implemented | | workflow.instance.completed | processWorkflowResult() | ✓ Implemented | | workflow.instance.failed | handleWorkflowFailure() | ✓ Implemented | | agent.room.message | forwardToRealtime() | ✓ Implemented | ### SSE Streams | Stream | Purpose | Status | |--------|---------|--------| | Room stream | Real-time chat UI | ✓ Implemented | | Workflow stream | Progress tracking | ✓ Implemented | ### Implementation Checklist **Security** - [ ] Webhook endpoint uses HTTPS (required for production) - [ ] Signature verification with timing-safe comparison - [ ] Signature header format validated (`sha256=` + 64 hex chars) - [ ] Timestamp validation to prevent replay attacks (5 min window) - [ ] Webhook secret stored in environment variable (not in code) - [ ] Key rotation procedure documented and tested - [ ] SSE tokens are short-lived (5 min) or proxied through backend **Reliability** - [ ] Respond to webhooks within 5 seconds - [ ] Async processing with background job queue - [ ] Idempotent processing (track event IDs in Redis/DB) - [ ] Dead-letter queue for failed events - [ ] Event handlers for all subscribed events - [ ] Unknown event types handled gracefully (ignore, don't fail) **Observability** - [ ] Webhook received counter (by event_type, status) - [ ] Processing duration histogram - [ ] DLQ size gauge - [ ] Error rate alerting configured - [ ] Signature validation failure alerting **SSE** - [ ] Auto-reconnect with exponential backoff - [ ] Heartbeat monitoring (45s timeout) - [ ] Connection cleanup on component unmount - [ ] Connection health indicator in UI - [ ] Max reconnect attempts with user notification **Testing** - [ ] Unit tests for signature verification - [ ] Unit tests for timestamp validation - [ ] Integration tests with test webhook endpoint - [ ] Replay script available for debugging
Related Skills
- interactor-auth: Setup authentication (prerequisite)
- interactor-credentials: Credential events to monitor
- interactor-agents: Chat streaming events
- interactor-workflows: Workflow status events