Awesome-omni-skill river-ts-streaming
Type-safe Server-Sent Events (SSE) and WebSocket communication using river.ts library. Use when working with this codebase to: (1) Define typed event schemas with RiverEvents builder, (2) Implement SSE streaming on server with RiverEmitter, (3) Consume SSE streams on client with RiverClient, (4) Handle WebSocket communication with RiverSocketAdapter, (5) Implement request/response RPC patterns over WebSocket, (6) Work with chunked/streamed data events.
install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/river-ts-streaming" ~/.claude/skills/diegosouzapw-awesome-omni-skill-river-ts-streaming && rm -rf "$T"
manifest:
skills/development/river-ts-streaming/SKILL.mdsource content
Quick Reference
river.ts provides three main components:
- Type-safe event schema builderRiverEvents
- Server-side SSE streamingRiverEmitter
- Client-side SSE consumptionRiverClient
- WebSocket message handling with request/response supportRiverSocketAdapter
Event Definition
Define events using the builder pattern:
import { RiverEvents } from 'river.ts'; const events = new RiverEvents() .defineEvent('message', { message: 'Hello' }) .defineEvent('data', { data: {} as { id: number; name: string } }) .defineEvent('stream', { data: [] as string[], stream: true, chunkSize: 100 }) // Request/response pattern with explicit response type .defineEvent('rpc.call', { data: {} as { method: string; params: unknown }, response: {} as { result: unknown; error?: string } }) .build();
Reserved event types:
close, error - do not define these.
Server-Side SSE (RiverEmitter)
import { RiverEmitter } from 'river.ts/server'; const emitter = RiverEmitter.init(events); // Create SSE stream for HTTP response const stream = emitter.stream({ callback: async (emit, clientId) => { await emit('message', { message: 'Connected' }); await emit('data', { data: { id: 1, name: 'test' } }); }, clientId: 'optional-custom-id', ondisconnect: (clientId) => console.log(`${clientId} disconnected`) }); return new Response(stream, { headers: emitter.headers() }); // Broadcast to all clients await emitter.broadcast('message', { message: 'Update' }); // Send to specific client await emitter.sendToClient('client-id', 'data', { data: { id: 2, name: 'specific' } });
Client-Side SSE (RiverClient)
import { RiverClient } from 'river.ts/client'; const client = RiverClient.init(events, { reconnect: true }); client .prepare('http://localhost:3000/events', { method: 'GET' }) .on('message', (data) => console.log(data.message)) .on('data', (data) => console.log(data.id, data.name)) .stream(); // Close connection client.close();
WebSocket Adapter (RiverSocketAdapter)
import { RiverSocketAdapter } from 'river.ts/websocket'; const adapter = new RiverSocketAdapter(events, { debug: false }); // Register event handlers adapter.on('message', (data) => console.log(data)); adapter.off('message', handler); // Unregister // Handle incoming messages (call from ws.onmessage) adapter.handleMessage(messageData); // Send messages adapter.send('data', { data: { id: 1, name: 'test' } }, (msg) => ws.send(msg));
WebSocket Request/Response Pattern
For RPC-style communication with automatic type inference:
import { RiverSocketAdapter, RequestTimeoutError, WebSocketClosedError } from 'river.ts/websocket'; // Events with explicit response types const events = new RiverEvents() .defineEvent('instance.spawn', { data: {} as { cwd: string }, response: {} as { instanceId: string; status: 'created' | 'error' } }) .build(); const adapter = new RiverSocketAdapter(events); // Route messages through adapter ws.onmessage = (e) => adapter.handleMessage(e.data); ws.onclose = () => adapter.clearPendingRequests(); // Make request - response type is inferred from event definition const response = await adapter.request( 'instance.spawn', { cwd: '/app' }, (msg) => ws.send(msg), 10000 // timeout in ms (default: 30000) ); // response is typed as { instanceId: string; status: 'created' | 'error' }
Wire format for request/response:
// Request (outgoing) { "type": "instance.spawn", "data": { "cwd": "/app" }, "id": "uuid" } // Response (incoming) - server echoes back the id { "type": "instance.spawn", "data": { "instanceId": "123", "status": "created" }, "id": "uuid" }
Key Types
import { EventData, ResponseData, EmitPayload } from 'river.ts'; // EventData<T, K> - Extract data type for receiving/handling // ResponseData<T, K> - Extract response type for request() return value // EmitPayload<T, K> - Extract payload type for emitting (excludes type/stream/chunkSize)
Project Structure
src/ ├── index.ts # Main exports (RiverEvents, types) ├── builder.ts # RiverEvents builder class ├── client/ # RiverClient for SSE consumption ├── server/ # RiverEmitter for SSE streaming ├── websocket/ # RiverSocketAdapter for WebSocket └── types/ ├── core.ts # BaseEvent, EventMap, EventData, ResponseData └── http.ts # HTTPMethods type
Testing
Run tests with:
bun test
Test files are in
tests/ directory. WebSocket request tests are in tests/websocket/request.test.ts.
Build
Build with:
npm run build (uses unbuild)
Output goes to
dist/ with separate entry points for /client, /server, /websocket.