Qaskills Streaming API Testing
Streaming API testing skill covering Server-Sent Events testing, chunked transfer encoding, gRPC streaming, real-time data validation, backpressure testing, connection resilience, and AI/LLM streaming response testing.
install
source · Clone the upstream repo
git clone https://github.com/PramodDutta/qaskills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/PramodDutta/qaskills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/seed-skills/streaming-api-testing" ~/.claude/skills/pramoddutta-qaskills-streaming-api-testing && rm -rf "$T"
manifest:
seed-skills/streaming-api-testing/SKILL.mdtags
source content
Streaming API Testing Skill
You are an expert software engineer specializing in testing streaming APIs, real-time data protocols, and event-driven architectures. When the user asks you to write, review, or debug tests for streaming endpoints including SSE, gRPC streaming, chunked responses, and AI/LLM streaming, follow these detailed instructions.
Core Principles
- Test the stream lifecycle -- Verify connection establishment, data flow, and graceful termination.
- Validate event ordering -- Streaming data must arrive in the correct sequence; test for out-of-order delivery.
- Test partial and incremental data -- Unlike REST, streaming responses arrive in chunks; validate intermediate states.
- Verify backpressure handling -- Ensure the system behaves correctly when the consumer is slower than the producer.
- Test connection resilience -- Simulate network drops, reconnection logic, and timeout handling.
- Assert on timing constraints -- Streaming has latency requirements; measure time-to-first-byte and inter-event intervals.
- Clean up resources -- Always close streams, abort controllers, and event sources in test teardown.
Project Structure
project/ src/ api/ sse-endpoint.ts grpc-service.ts chunked-endpoint.ts llm-stream.ts tests/ sse/ sse-basic.test.ts sse-reconnection.test.ts sse-backpressure.test.ts grpc/ server-streaming.test.ts client-streaming.test.ts bidirectional.test.ts chunked/ chunked-transfer.test.ts chunked-json.test.ts llm/ llm-stream.test.ts token-validation.test.ts helpers/ stream-collector.ts mock-sse-server.ts mock-grpc-server.ts timing-utils.ts vitest.config.ts
SSE Endpoint Testing
Basic SSE Connection and Event Validation
// tests/sse/sse-basic.test.ts import { describe, it, expect, beforeAll, afterAll, afterEach } from 'vitest'; import { createServer, Server } from 'http'; import EventSource from 'eventsource'; let server: Server; let baseUrl: string; beforeAll(async () => { server = createServer((req, res) => { if (req.url === '/events') { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }); let count = 0; const interval = setInterval(() => { count++; res.write(`id: ${count}\n`); res.write(`event: message\n`); res.write(`data: ${JSON.stringify({ count, timestamp: Date.now() })}\n\n`); if (count >= 5) { clearInterval(interval); res.end(); } }, 100); req.on('close', () => clearInterval(interval)); } }); await new Promise<void>((resolve) => { server.listen(0, () => { const addr = server.address(); if (typeof addr === 'object' && addr) { baseUrl = `http://localhost:${addr.port}`; } resolve(); }); }); }); afterAll(() => { server.close(); }); describe('SSE Basic', () => { it('should receive all events in order', async () => { const events: any[] = []; await new Promise<void>((resolve, reject) => { const es = new EventSource(`${baseUrl}/events`); const timeout = setTimeout(() => { es.close(); reject(new Error('Timeout waiting for events')); }, 5000); es.onmessage = (event) => { events.push(JSON.parse(event.data)); }; es.onerror = () => { clearTimeout(timeout); es.close(); resolve(); // SSE error fires on stream end }; }); expect(events).toHaveLength(5); expect(events.map((e) => e.count)).toEqual([1, 2, 3, 4, 5]); }); it('should set correct SSE headers', async () => { const response = await fetch(`${baseUrl}/events`); expect(response.headers.get('content-type')).toBe('text/event-stream'); expect(response.headers.get('cache-control')).toBe('no-cache'); expect(response.headers.get('connection')).toBe('keep-alive'); // Clean up the stream await response.body?.cancel(); }); it('should include event IDs for resumption', async () => { const eventIds: string[] = []; await new Promise<void>((resolve) => { const es = new EventSource(`${baseUrl}/events`); es.onmessage = (event) => { eventIds.push(event.lastEventId); if (eventIds.length >= 5) { es.close(); resolve(); } }; es.onerror = () => { es.close(); resolve(); }; }); expect(eventIds).toEqual(['1', '2', '3', '4', '5']); }); });
SSE Reconnection Testing
// tests/sse/sse-reconnection.test.ts import { describe, it, expect } from 'vitest'; import { createServer, Server, IncomingMessage, ServerResponse } from 'http'; describe('SSE Reconnection', () => { it('should reconnect and resume from last event ID', async () => { let connectionCount = 0; const server = createServer((req: IncomingMessage, res: ServerResponse) => { connectionCount++; const lastEventId = req.headers['last-event-id']; res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }); const startFrom = lastEventId ? parseInt(lastEventId as string) + 1 : 1; if (connectionCount === 1) { // First connection: send events 1-3, then drop for (let i = startFrom; i <= 3; i++) { res.write(`id: ${i}\ndata: ${JSON.stringify({ n: i })}\n\n`); } res.destroy(); // Simulate connection drop } else { // Reconnection: send events 4-6 for (let i = startFrom; i <= 6; i++) { res.write(`id: ${i}\ndata: ${JSON.stringify({ n: i })}\n\n`); } res.end(); } }); const port = await new Promise<number>((resolve) => { server.listen(0, () => { const addr = server.address(); resolve(typeof addr === 'object' ? addr!.port : 0); }); }); const allEvents: number[] = []; await new Promise<void>((resolve) => { const es = new EventSource(`http://localhost:${port}/events`); const timeout = setTimeout(() => { es.close(); resolve(); }, 5000); es.onmessage = (event) => { const data = JSON.parse(event.data); allEvents.push(data.n); if (data.n >= 6) { clearTimeout(timeout); es.close(); resolve(); } }; }); server.close(); // Verify all events received across reconnection expect(allEvents).toEqual([1, 2, 3, 4, 5, 6]); expect(connectionCount).toBe(2); }); it('should handle server-sent retry interval', async () => { const server = createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', }); // Set custom retry interval (500ms) res.write('retry: 500\n\n'); res.write('data: hello\n\n'); res.end(); }); const port = await new Promise<number>((resolve) => { server.listen(0, () => { const addr = server.address(); resolve(typeof addr === 'object' ? addr!.port : 0); }); }); const reconnectTimes: number[] = []; let lastDisconnect = 0; await new Promise<void>((resolve) => { let messageCount = 0; const es = new EventSource(`http://localhost:${port}/events`); es.onmessage = () => { messageCount++; }; es.onerror = () => { if (lastDisconnect > 0) { reconnectTimes.push(Date.now() - lastDisconnect); } lastDisconnect = Date.now(); if (messageCount >= 2) { es.close(); resolve(); } }; setTimeout(() => { es.close(); resolve(); }, 3000); }); server.close(); // Reconnect time should be approximately 500ms (within tolerance) if (reconnectTimes.length > 0) { expect(reconnectTimes[0]).toBeGreaterThan(400); expect(reconnectTimes[0]).toBeLessThan(1000); } }); });
Chunked Transfer Encoding Testing
// tests/chunked/chunked-transfer.test.ts import { describe, it, expect } from 'vitest'; import { createServer, Server } from 'http'; describe('Chunked Transfer Encoding', () => { let server: Server; let baseUrl: string; beforeAll(async () => { server = createServer((req, res) => { if (req.url === '/chunked-json') { res.writeHead(200, { 'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked', }); const items = [ { id: 1, name: 'first' }, { id: 2, name: 'second' }, { id: 3, name: 'third' }, ]; let index = 0; const sendNext = () => { if (index < items.length) { const prefix = index === 0 ? '[' : ','; const suffix = index === items.length - 1 ? ']' : ''; res.write(`${prefix}${JSON.stringify(items[index])}${suffix}`); index++; setTimeout(sendNext, 50); } else { res.end(); } }; sendNext(); } if (req.url === '/ndjson') { res.writeHead(200, { 'Content-Type': 'application/x-ndjson', }); const lines = [ { event: 'start', ts: 1 }, { event: 'data', value: 42, ts: 2 }, { event: 'data', value: 84, ts: 3 }, { event: 'end', ts: 4 }, ]; let index = 0; const sendNext = () => { if (index < lines.length) { res.write(JSON.stringify(lines[index]) + '\n'); index++; setTimeout(sendNext, 50); } else { res.end(); } }; sendNext(); } }); await new Promise<void>((resolve) => { server.listen(0, () => { const addr = server.address(); baseUrl = `http://localhost:${typeof addr === 'object' ? addr!.port : 0}`; resolve(); }); }); }); afterAll(() => server.close()); it('should collect chunked JSON array', async () => { const response = await fetch(`${baseUrl}/chunked-json`); const data = await response.json(); expect(data).toEqual([ { id: 1, name: 'first' }, { id: 2, name: 'second' }, { id: 3, name: 'third' }, ]); }); it('should process NDJSON stream line by line', async () => { const response = await fetch(`${baseUrl}/ndjson`); const reader = response.body!.getReader(); const decoder = new TextDecoder(); const events: any[] = []; let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.trim()) { events.push(JSON.parse(line)); } } } expect(events).toHaveLength(4); expect(events[0].event).toBe('start'); expect(events[3].event).toBe('end'); }); it('should measure time-to-first-byte for chunked response', async () => { const startTime = performance.now(); const response = await fetch(`${baseUrl}/ndjson`); const reader = response.body!.getReader(); const { value } = await reader.read(); const ttfb = performance.now() - startTime; expect(value).toBeTruthy(); expect(ttfb).toBeLessThan(1000); // First byte within 1 second // Clean up await reader.cancel(); }); });
gRPC Streaming Tests
// tests/grpc/server-streaming.test.ts import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; // Proto definition for reference: // service StockService { // rpc StreamPrices (PriceRequest) returns (stream PriceUpdate); // rpc SendOrders (stream Order) returns (OrderSummary); // rpc TradeChat (stream ChatMessage) returns (stream ChatMessage); // } describe('gRPC Server Streaming', () => { let client: any; let server: grpc.Server; beforeAll(async () => { const packageDef = protoLoader.loadSync('protos/stock.proto'); const proto = grpc.loadPackageDefinition(packageDef) as any; // Set up mock gRPC server server = new grpc.Server(); server.addService(proto.stock.StockService.service, { streamPrices: (call: any) => { const symbols = call.request.symbols; let tick = 0; const interval = setInterval(() => { tick++; for (const symbol of symbols) { call.write({ symbol, price: 100 + Math.random() * 10, tick, timestamp: Date.now(), }); } if (tick >= 5) { clearInterval(interval); call.end(); } }, 100); call.on('cancelled', () => clearInterval(interval)); }, }); const port = await new Promise<number>((resolve, reject) => { server.bindAsync( '0.0.0.0:0', grpc.ServerCredentials.createInsecure(), (err, port) => { if (err) reject(err); else resolve(port); } ); }); server.start(); client = new proto.stock.StockService( `localhost:${port}`, grpc.credentials.createInsecure() ); }); afterAll(() => { server.forceShutdown(); }); it('should receive all price updates from server stream', async () => { const updates: any[] = []; await new Promise<void>((resolve, reject) => { const call = client.streamPrices({ symbols: ['AAPL', 'GOOG'] }); call.on('data', (update: any) => { updates.push(update); }); call.on('end', () => resolve()); call.on('error', (err: Error) => reject(err)); }); // 5 ticks * 2 symbols = 10 updates expect(updates).toHaveLength(10); // Verify ordering: ticks should be sequential const applUpdates = updates.filter((u) => u.symbol === 'AAPL'); const ticks = applUpdates.map((u) => u.tick); expect(ticks).toEqual([1, 2, 3, 4, 5]); // Verify price is within expected range for (const update of updates) { expect(update.price).toBeGreaterThan(90); expect(update.price).toBeLessThan(120); } }); it('should handle client cancellation of server stream', async () => { const updates: any[] = []; await new Promise<void>((resolve) => { const call = client.streamPrices({ symbols: ['AAPL'] }); call.on('data', (update: any) => { updates.push(update); if (updates.length >= 2) { call.cancel(); // Cancel after 2 updates } }); call.on('error', (err: any) => { if (err.code === grpc.status.CANCELLED) { resolve(); // Expected cancellation } }); }); expect(updates.length).toBeGreaterThanOrEqual(2); expect(updates.length).toBeLessThan(10); }); }); // tests/grpc/bidirectional.test.ts describe('gRPC Bidirectional Streaming', () => { it('should support bidirectional message exchange', async () => { const received: any[] = []; await new Promise<void>((resolve, reject) => { const call = client.tradeChat(); call.on('data', (msg: any) => { received.push(msg); }); call.on('end', () => resolve()); call.on('error', (err: Error) => reject(err)); // Send messages from client call.write({ user: 'trader1', message: 'Buy AAPL' }); call.write({ user: 'trader1', message: 'Sell GOOG' }); call.end(); }); expect(received.length).toBeGreaterThan(0); for (const msg of received) { expect(msg).toHaveProperty('user'); expect(msg).toHaveProperty('message'); } }); });
AI/LLM Streaming Response Testing
// tests/llm/llm-stream.test.ts import { describe, it, expect, beforeAll, afterAll } from 'vitest'; import { createServer, Server } from 'http'; describe('LLM Streaming Response', () => { let server: Server; let baseUrl: string; beforeAll(async () => { server = createServer((req, res) => { if (req.url === '/v1/chat/completions' && req.method === 'POST') { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', }); const tokens = ['Hello', ',', ' how', ' can', ' I', ' help', ' you', '?']; let index = 0; const sendToken = () => { if (index < tokens.length) { const chunk = { id: 'chatcmpl-abc123', object: 'chat.completion.chunk', created: Math.floor(Date.now() / 1000), model: 'gpt-4', choices: [ { index: 0, delta: { content: tokens[index] }, finish_reason: null, }, ], }; res.write(`data: ${JSON.stringify(chunk)}\n\n`); index++; setTimeout(sendToken, 30); } else { // Send final chunk with finish_reason const finalChunk = { id: 'chatcmpl-abc123', object: 'chat.completion.chunk', created: Math.floor(Date.now() / 1000), model: 'gpt-4', choices: [ { index: 0, delta: {}, finish_reason: 'stop', }, ], }; res.write(`data: ${JSON.stringify(finalChunk)}\n\n`); res.write('data: [DONE]\n\n'); res.end(); } }; sendToken(); } }); await new Promise<void>((resolve) => { server.listen(0, () => { const addr = server.address(); baseUrl = `http://localhost:${typeof addr === 'object' ? addr!.port : 0}`; resolve(); }); }); }); afterAll(() => server.close()); it('should collect all tokens from streaming response', async () => { const response = await fetch(`${baseUrl}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'gpt-4', messages: [{ role: 'user', content: 'Hello' }], stream: true, }), }); const reader = response.body!.getReader(); const decoder = new TextDecoder(); const tokens: string[] = []; let finishReason: string | null = null; let receivedDone = false; let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6).trim(); if (data === '[DONE]') { receivedDone = true; continue; } const parsed = JSON.parse(data); const delta = parsed.choices[0].delta; if (delta.content) { tokens.push(delta.content); } if (parsed.choices[0].finish_reason) { finishReason = parsed.choices[0].finish_reason; } } } } const fullText = tokens.join(''); expect(fullText).toBe('Hello, how can I help you?'); expect(finishReason).toBe('stop'); expect(receivedDone).toBe(true); expect(tokens).toHaveLength(8); }); it('should measure token latency', async () => { const response = await fetch(`${baseUrl}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'gpt-4', messages: [{ role: 'user', content: 'Hello' }], stream: true, }), }); const reader = response.body!.getReader(); const decoder = new TextDecoder(); const tokenTimestamps: number[] = []; let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ') && line.slice(6).trim() !== '[DONE]') { const parsed = JSON.parse(line.slice(6)); if (parsed.choices[0].delta.content) { tokenTimestamps.push(performance.now()); } } } } // Calculate inter-token latencies const latencies: number[] = []; for (let i = 1; i < tokenTimestamps.length; i++) { latencies.push(tokenTimestamps[i] - tokenTimestamps[i - 1]); } const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length; const maxLatency = Math.max(...latencies); // Assert reasonable latency bounds expect(avgLatency).toBeLessThan(200); // Average under 200ms expect(maxLatency).toBeLessThan(500); // No single gap over 500ms }); it('should handle abort during streaming', async () => { const controller = new AbortController(); const tokensReceived: string[] = []; const response = await fetch(`${baseUrl}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'gpt-4', messages: [{ role: 'user', content: 'Hello' }], stream: true, }), signal: controller.signal, }); const reader = response.body!.getReader(); const decoder = new TextDecoder(); try { let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ') && line.slice(6).trim() !== '[DONE]') { const parsed = JSON.parse(line.slice(6)); if (parsed.choices[0].delta.content) { tokensReceived.push(parsed.choices[0].delta.content); if (tokensReceived.length >= 3) { controller.abort(); } } } } } } catch (err: any) { expect(err.name).toBe('AbortError'); } expect(tokensReceived.length).toBeGreaterThanOrEqual(3); expect(tokensReceived.length).toBeLessThan(8); }); });
Backpressure and Flow Control Testing
// tests/sse/sse-backpressure.test.ts import { describe, it, expect } from 'vitest'; import { createServer, Server } from 'http'; import { Readable, Transform } from 'stream'; describe('Backpressure Testing', () => { it('should handle slow consumer without losing data', async () => { const totalEvents = 100; const consumerDelayMs = 10; const server = createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream' }); for (let i = 0; i < totalEvents; i++) { const canWrite = res.write(`data: ${JSON.stringify({ seq: i })}\n\n`); if (!canWrite) { // Wait for drain event when buffer is full res.once('drain', () => {}); } } res.end(); }); const port = await new Promise<number>((resolve) => { server.listen(0, () => { const addr = server.address(); resolve(typeof addr === 'object' ? addr!.port : 0); }); }); const response = await fetch(`http://localhost:${port}/events`); const reader = response.body!.getReader(); const decoder = new TextDecoder(); const received: number[] = []; let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; // Simulate slow consumer await new Promise((r) => setTimeout(r, consumerDelayMs)); buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ')) { const data = JSON.parse(line.slice(6)); received.push(data.seq); } } } server.close(); // All events should be received despite slow consumer expect(received).toHaveLength(totalEvents); expect(received).toEqual(Array.from({ length: totalEvents }, (_, i) => i)); }); it('should detect memory leaks in long-running streams', async () => { const server = createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream' }); let count = 0; const interval = setInterval(() => { count++; const largePayload = 'x'.repeat(1024); // 1KB per event res.write(`data: ${JSON.stringify({ count, payload: largePayload })}\n\n`); if (count >= 1000) { clearInterval(interval); res.end(); } }, 1); req.on('close', () => clearInterval(interval)); }); const port = await new Promise<number>((resolve) => { server.listen(0, () => { const addr = server.address(); resolve(typeof addr === 'object' ? addr!.port : 0); }); }); const memBefore = process.memoryUsage().heapUsed; const response = await fetch(`http://localhost:${port}/events`); const reader = response.body!.getReader(); let eventCount = 0; while (true) { const { done } = await reader.read(); if (done) break; eventCount++; } const memAfter = process.memoryUsage().heapUsed; const memGrowthMB = (memAfter - memBefore) / (1024 * 1024); server.close(); expect(eventCount).toBeGreaterThan(0); // Memory growth should be bounded (not proportional to total data received) expect(memGrowthMB).toBeLessThan(50); }); });
Connection Resilience Testing
// tests/helpers/stream-collector.ts export interface StreamCollectorOptions { timeoutMs?: number; maxEvents?: number; onEvent?: (event: any) => void; } export async function collectSSEEvents( url: string, options: StreamCollectorOptions = {} ): Promise<{ events: any[]; errors: Error[]; reconnections: number }> { const { timeoutMs = 10000, maxEvents = Infinity } = options; const events: any[] = []; const errors: Error[] = []; let reconnections = 0; return new Promise((resolve) => { const timeout = setTimeout(() => { es.close(); resolve({ events, errors, reconnections }); }, timeoutMs); const es = new EventSource(url); es.onopen = () => { if (events.length > 0) reconnections++; }; es.onmessage = (event) => { const data = JSON.parse(event.data); events.push(data); options.onEvent?.(data); if (events.length >= maxEvents) { clearTimeout(timeout); es.close(); resolve({ events, errors, reconnections }); } }; es.onerror = (err) => { errors.push(new Error('SSE connection error')); }; }); }
Streaming Performance Testing
# tests/performance/stream_load_test.py """ Load test for streaming endpoints using asyncio. Simulates multiple concurrent SSE consumers. """ import asyncio import aiohttp import time from dataclasses import dataclass, field from typing import List @dataclass class StreamMetrics: connection_time_ms: float = 0 time_to_first_event_ms: float = 0 total_events: int = 0 total_duration_ms: float = 0 inter_event_latencies: List[float] = field(default_factory=list) errors: List[str] = field(default_factory=list) @property def avg_inter_event_latency(self) -> float: if not self.inter_event_latencies: return 0 return sum(self.inter_event_latencies) / len(self.inter_event_latencies) @property def p99_inter_event_latency(self) -> float: if not self.inter_event_latencies: return 0 sorted_latencies = sorted(self.inter_event_latencies) idx = int(len(sorted_latencies) * 0.99) return sorted_latencies[idx] async def consume_sse_stream(url: str, max_events: int = 100) -> StreamMetrics: """Consume an SSE stream and collect performance metrics.""" metrics = StreamMetrics() start = time.monotonic() try: async with aiohttp.ClientSession() as session: connect_start = time.monotonic() async with session.get(url) as response: metrics.connection_time_ms = (time.monotonic() - connect_start) * 1000 last_event_time = None async for line in response.content: decoded = line.decode('utf-8').strip() if decoded.startswith('data: '): now = time.monotonic() if metrics.total_events == 0: metrics.time_to_first_event_ms = (now - start) * 1000 if last_event_time: latency = (now - last_event_time) * 1000 metrics.inter_event_latencies.append(latency) last_event_time = now metrics.total_events += 1 if metrics.total_events >= max_events: break except Exception as e: metrics.errors.append(str(e)) metrics.total_duration_ms = (time.monotonic() - start) * 1000 return metrics async def load_test_streams( url: str, concurrent_consumers: int = 50, events_per_consumer: int = 100, ) -> List[StreamMetrics]: """Run concurrent SSE consumers and collect aggregate metrics.""" tasks = [ consume_sse_stream(url, events_per_consumer) for _ in range(concurrent_consumers) ] return await asyncio.gather(*tasks) def print_report(results: List[StreamMetrics]): """Print a summary report of the load test.""" successful = [r for r in results if not r.errors] failed = [r for r in results if r.errors] print(f"\n{'='*60}") print(f"Streaming Load Test Report") print(f"{'='*60}") print(f"Total consumers: {len(results)}") print(f"Successful: {len(successful)}") print(f"Failed: {len(failed)}") if successful: avg_ttfe = sum(r.time_to_first_event_ms for r in successful) / len(successful) avg_conn = sum(r.connection_time_ms for r in successful) / len(successful) all_latencies = [l for r in successful for l in r.inter_event_latencies] all_latencies.sort() print(f"\nAvg connection time: {avg_conn:.1f}ms") print(f"Avg time to first event: {avg_ttfe:.1f}ms") if all_latencies: p50 = all_latencies[len(all_latencies) // 2] p95 = all_latencies[int(len(all_latencies) * 0.95)] p99 = all_latencies[int(len(all_latencies) * 0.99)] print(f"Inter-event latency p50: {p50:.1f}ms") print(f"Inter-event latency p95: {p95:.1f}ms") print(f"Inter-event latency p99: {p99:.1f}ms") if __name__ == '__main__': import sys url = sys.argv[1] if len(sys.argv) > 1 else 'http://localhost:3000/events' results = asyncio.run(load_test_streams(url, concurrent_consumers=50)) print_report(results)
Best Practices
- Always set timeouts on stream consumers -- A test that waits forever for a stream event blocks the entire suite.
- Use AbortController for fetch-based streams -- Clean cancellation prevents resource leaks in tests.
- Validate intermediate state, not just final state -- Streaming is about the journey; assert on each chunk.
- Buffer partial data correctly -- Chunks can split across read boundaries; always use a line buffer.
- Test empty streams -- A stream that opens and immediately closes should not crash the consumer.
- Measure time-to-first-byte separately -- TTFB is the most critical streaming performance metric.
- Test with realistic payload sizes -- Small test payloads may miss backpressure and buffering issues.
- Close streams in afterEach/afterAll -- Leaked connections cause flaky tests and port exhaustion.
- Test the [DONE] signal -- For LLM streams, verify the termination protocol is handled correctly.
- Use mock servers, not production endpoints -- Tests must be deterministic; real streaming services are not.
Anti-Patterns to Avoid
- Collecting entire stream before asserting -- This defeats the purpose of testing streaming; validate incrementally.
- Using setTimeout as synchronization -- Use event-driven assertions (on data, on end) instead of arbitrary delays.
- Ignoring partial reads -- A single
call may not return a complete event; always buffer.reader.read() - Not testing connection drops -- Real networks fail; simulate disconnections and verify recovery.
- Hardcoding port numbers -- Use port 0 and let the OS assign a free port to avoid conflicts.
- Skipping error event testing -- The SSE
and gRPConerror
handlers need test coverage.on('error') - Testing only happy path timing -- Measure latency under load, not just with a single consumer.
- Forgetting to drain the stream -- If a test does not consume the full stream, it may leave the server hanging.
- Not validating Content-Type headers --
for SSE is required; wrong headers cause silent failures.text/event-stream - Sharing server instances across parallel tests -- Each test should have its own server to avoid interference.
Running Tests
# Run all streaming tests npx vitest run tests/sse/ tests/grpc/ tests/chunked/ tests/llm/ # Run SSE tests only npx vitest run tests/sse/ # Run gRPC streaming tests npx vitest run tests/grpc/ # Run LLM streaming tests npx vitest run tests/llm/ # Run with verbose timing output npx vitest run tests/ --reporter=verbose # Run performance load test (Python) python3 tests/performance/stream_load_test.py http://localhost:3000/events # Run with coverage npx vitest run tests/ --coverage # Watch mode for development npx vitest watch tests/sse/ # Debug a specific test npx vitest run tests/llm/llm-stream.test.ts --reporter=verbose