Vibeship-spawner-skills websockets-realtime

WebSockets & Real-time Skill

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: backend/websockets-realtime/skill.yaml
source content

WebSockets & Real-time Skill

Building real-time features with WebSockets, SSE, and live updates

version: 1.0.0 skill_id: websockets-realtime name: WebSockets & Real-time category: backend layer: 2

description: | Expert at building real-time features using WebSockets, Server-Sent Events, and other live update mechanisms. Covers connection management, reconnection strategies, scaling with pub/sub, and common patterns like presence and typing indicators.

triggers:

  • "websocket"
  • "real-time"
  • "realtime"
  • "live updates"
  • "socket.io"
  • "SSE"
  • "server-sent events"
  • "presence"
  • "typing indicator"
  • "live notifications"
  • "chat"
  • "collaborative"

identity: role: Real-time Systems Engineer personality: | Pragmatic builder who knows when WebSockets are overkill and when they're essential. Understands the complexity of connection management at scale. Prefers SSE for unidirectional updates, WebSockets only when bidirectional is truly needed. principles: - "SSE for server-to-client, WebSockets for bidirectional" - "Always implement reconnection logic" - "Scale with pub/sub, not shared state" - "Graceful degradation to polling" - "Authentication happens before upgrade"

expertise: protocols: - "WebSocket (RFC 6455)" - "Server-Sent Events (SSE)" - "HTTP/2 Server Push" - "Long polling (fallback)"

patterns: - "Presence (online/offline status)" - "Typing indicators" - "Live notifications" - "Collaborative editing" - "Real-time dashboards" - "Chat systems"

scaling: - "Redis Pub/Sub" - "Sticky sessions" - "Horizontal scaling" - "Connection limits"

patterns: websocket_server: description: "Basic WebSocket server setup" example: | // Node.js with ws library import { WebSocketServer, WebSocket } from 'ws'; import { createServer } from 'http';

  const server = createServer();
  const wss = new WebSocketServer({ server });

  // Track connected clients
  const clients = new Map<string, WebSocket>();

  wss.on('connection', (ws, req) => {
    // Authenticate from query params or headers
    const userId = authenticateFromRequest(req);
    if (!userId) {
      ws.close(4001, 'Unauthorized');
      return;
    }

    clients.set(userId, ws);
    console.log(`User ${userId} connected`);

    ws.on('message', (data) => {
      try {
        const message = JSON.parse(data.toString());
        handleMessage(userId, message);
      } catch (e) {
        ws.send(JSON.stringify({ error: 'Invalid JSON' }));
      }
    });

    ws.on('close', () => {
      clients.delete(userId);
      console.log(`User ${userId} disconnected`);
    });

    ws.on('error', (error) => {
      console.error(`WebSocket error for ${userId}:`, error);
    });

    // Send initial state
    ws.send(JSON.stringify({ type: 'connected', userId }));
  });

  // Broadcast to all clients
  function broadcast(message: object) {
    const data = JSON.stringify(message);
    for (const [, client] of clients) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data);
      }
    }
  }

  // Send to specific user
  function sendToUser(userId: string, message: object) {
    const client = clients.get(userId);
    if (client?.readyState === WebSocket.OPEN) {
      client.send(JSON.stringify(message));
    }
  }

  server.listen(3001);

websocket_client: description: "Robust WebSocket client with reconnection" example: | // React hook for WebSocket connection import { useEffect, useRef, useCallback, useState } from 'react';

  interface UseWebSocketOptions {
    url: string;
    onMessage: (data: unknown) => void;
    onConnect?: () => void;
    onDisconnect?: () => void;
    reconnectInterval?: number;
    maxReconnectAttempts?: number;
  }

  export function useWebSocket({
    url,
    onMessage,
    onConnect,
    onDisconnect,
    reconnectInterval = 3000,
    maxReconnectAttempts = 10,
  }: UseWebSocketOptions) {
    const wsRef = useRef<WebSocket | null>(null);
    const reconnectCount = useRef(0);
    const reconnectTimer = useRef<NodeJS.Timeout>();
    const [isConnected, setIsConnected] = useState(false);

    const connect = useCallback(() => {
      // Clean up existing connection
      if (wsRef.current) {
        wsRef.current.close();
      }

      const ws = new WebSocket(url);

      ws.onopen = () => {
        setIsConnected(true);
        reconnectCount.current = 0;
        onConnect?.();
      };

      ws.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          onMessage(data);
        } catch (e) {
          console.error('Failed to parse message:', e);
        }
      };

      ws.onclose = (event) => {
        setIsConnected(false);
        onDisconnect?.();

        // Don't reconnect on intentional close
        if (event.code === 1000) return;

        // Reconnect with backoff
        if (reconnectCount.current < maxReconnectAttempts) {
          const delay = reconnectInterval * Math.pow(2, reconnectCount.current);
          reconnectCount.current++;

          reconnectTimer.current = setTimeout(() => {
            connect();
          }, Math.min(delay, 30000));
        }
      };

      ws.onerror = (error) => {
        console.error('WebSocket error:', error);
      };

      wsRef.current = ws;
    }, [url, onMessage, onConnect, onDisconnect, reconnectInterval, maxReconnectAttempts]);

    const send = useCallback((data: object) => {
      if (wsRef.current?.readyState === WebSocket.OPEN) {
        wsRef.current.send(JSON.stringify(data));
      }
    }, []);

    const disconnect = useCallback(() => {
      clearTimeout(reconnectTimer.current);
      wsRef.current?.close(1000);
    }, []);

    useEffect(() => {
      connect();
      return () => {
        clearTimeout(reconnectTimer.current);
        wsRef.current?.close(1000);
      };
    }, [connect]);

    return { isConnected, send, disconnect };
  }

server_sent_events: description: "SSE for server-to-client updates" example: | // Server: Next.js API route // app/api/events/route.ts export async function GET(request: Request) { const encoder = new TextEncoder();

    const stream = new ReadableStream({
      async start(controller) {
        // Send initial connection message
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ type: 'connected' })}\n\n`)
        );

        // Subscribe to updates (e.g., from Redis)
        const subscription = subscribeToUpdates((event) => {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify(event)}\n\n`)
          );
        });

        // Handle client disconnect
        request.signal.addEventListener('abort', () => {
          subscription.unsubscribe();
          controller.close();
        });

        // Heartbeat to keep connection alive
        const heartbeat = setInterval(() => {
          controller.enqueue(encoder.encode(`: heartbeat\n\n`));
        }, 30000);

        request.signal.addEventListener('abort', () => {
          clearInterval(heartbeat);
        });
      },
    });

    return new Response(stream, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
      },
    });
  }


  // Client: React hook
  export function useSSE(url: string, onMessage: (data: unknown) => void) {
    useEffect(() => {
      const eventSource = new EventSource(url);

      eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        onMessage(data);
      };

      eventSource.onerror = () => {
        // EventSource automatically reconnects
        console.log('SSE connection error, reconnecting...');
      };

      return () => eventSource.close();
    }, [url, onMessage]);
  }

presence_system: description: "Track online/offline status" example: | // Server: Presence tracking with Redis import Redis from 'ioredis';

  const redis = new Redis();
  const PRESENCE_KEY = 'presence:online';
  const PRESENCE_TTL = 60; // seconds

  // User connects
  async function userOnline(userId: string) {
    await redis.zadd(PRESENCE_KEY, Date.now(), userId);

    // Publish presence change
    await redis.publish('presence', JSON.stringify({
      userId,
      status: 'online',
      timestamp: Date.now(),
    }));
  }

  // Heartbeat (call every 30s)
  async function heartbeat(userId: string) {
    await redis.zadd(PRESENCE_KEY, Date.now(), userId);
  }

  // User disconnects
  async function userOffline(userId: string) {
    await redis.zrem(PRESENCE_KEY, userId);

    await redis.publish('presence', JSON.stringify({
      userId,
      status: 'offline',
      timestamp: Date.now(),
    }));
  }

  // Get online users
  async function getOnlineUsers(): Promise<string[]> {
    const cutoff = Date.now() - (PRESENCE_TTL * 1000);
    // Remove stale entries
    await redis.zremrangebyscore(PRESENCE_KEY, 0, cutoff);
    // Get current online users
    return redis.zrange(PRESENCE_KEY, 0, -1);
  }

  // Check if user is online
  async function isUserOnline(userId: string): Promise<boolean> {
    const score = await redis.zscore(PRESENCE_KEY, userId);
    if (!score) return false;
    return parseInt(score) > Date.now() - (PRESENCE_TTL * 1000);
  }


  // Client: Show presence
  function UserList({ users }) {
    const [onlineUsers, setOnlineUsers] = useState<Set<string>>(new Set());

    useSSE('/api/presence', (event) => {
      if (event.status === 'online') {
        setOnlineUsers(prev => new Set([...prev, event.userId]));
      } else {
        setOnlineUsers(prev => {
          const next = new Set(prev);
          next.delete(event.userId);
          return next;
        });
      }
    });

    return (
      <ul>
        {users.map(user => (
          <li key={user.id}>
            <span className={onlineUsers.has(user.id) ? 'online' : 'offline'} />
            {user.name}
          </li>
        ))}
      </ul>
    );
  }

typing_indicator: description: "Show when users are typing" example: | // Server: Typing indicator with debounce const typingUsers = new Map<string, NodeJS.Timeout>();

  function handleTypingStart(roomId: string, userId: string) {
    // Clear existing timeout
    const existing = typingUsers.get(`${roomId}:${userId}`);
    if (existing) clearTimeout(existing);

    // Broadcast typing start
    broadcastToRoom(roomId, {
      type: 'typing_start',
      userId,
    });

    // Auto-stop after 3 seconds of no activity
    const timeout = setTimeout(() => {
      handleTypingStop(roomId, userId);
    }, 3000);

    typingUsers.set(`${roomId}:${userId}`, timeout);
  }

  function handleTypingStop(roomId: string, userId: string) {
    const timeout = typingUsers.get(`${roomId}:${userId}`);
    if (timeout) {
      clearTimeout(timeout);
      typingUsers.delete(`${roomId}:${userId}`);
    }

    broadcastToRoom(roomId, {
      type: 'typing_stop',
      userId,
    });
  }


  // Client: Typing indicator component
  function TypingIndicator({ roomId }) {
    const [typingUsers, setTypingUsers] = useState<string[]>([]);
    const { send } = useWebSocket();

    // Handle incoming typing events
    useEffect(() => {
      // Subscribed via WebSocket...
    }, []);

    // Debounced typing notification
    const inputRef = useRef<HTMLInputElement>(null);
    const lastTypingRef = useRef(0);

    const handleInput = () => {
      const now = Date.now();
      if (now - lastTypingRef.current > 1000) {
        send({ type: 'typing_start', roomId });
        lastTypingRef.current = now;
      }
    };

    const handleBlur = () => {
      send({ type: 'typing_stop', roomId });
    };

    if (typingUsers.length === 0) return null;

    return (
      <div className="typing-indicator">
        {typingUsers.length === 1
          ? `${typingUsers[0]} is typing...`
          : typingUsers.length === 2
          ? `${typingUsers[0]} and ${typingUsers[1]} are typing...`
          : `${typingUsers.length} people are typing...`}
      </div>
    );
  }

scaling_with_redis: description: "Scale WebSockets with Redis pub/sub" example: | // Each server subscribes to Redis channels import { WebSocketServer } from 'ws'; import Redis from 'ioredis';

  const pub = new Redis();
  const sub = new Redis();

  // Local clients on this server
  const localClients = new Map<string, WebSocket>();

  // Subscribe to Redis for cross-server messages
  sub.subscribe('broadcast', 'user-messages');

  sub.on('message', (channel, message) => {
    const data = JSON.parse(message);

    if (channel === 'broadcast') {
      // Send to all local clients
      for (const [, ws] of localClients) {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(message);
        }
      }
    } else if (channel === 'user-messages') {
      // Send to specific user if they're on this server
      const client = localClients.get(data.userId);
      if (client?.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify(data.payload));
      }
    }
  });

  // When a message needs to go to all servers
  function broadcast(message: object) {
    pub.publish('broadcast', JSON.stringify(message));
  }

  // When a message needs to go to a specific user
  function sendToUser(userId: string, payload: object) {
    pub.publish('user-messages', JSON.stringify({ userId, payload }));
  }


  // Connection handling
  wss.on('connection', (ws, req) => {
    const userId = authenticate(req);
    localClients.set(userId, ws);

    ws.on('close', () => {
      localClients.delete(userId);
    });
  });

anti_patterns: no_reconnection: description: "Not handling connection drops" wrong: "new WebSocket(url) with no reconnection logic" right: "Implement exponential backoff reconnection"

shared_state: description: "Storing connections in memory with multiple servers" wrong: "const clients = new Map() with load balancer" right: "Use Redis pub/sub for cross-server communication"

sync_in_handler: description: "Blocking operations in message handler" wrong: "await heavyDatabaseQuery() in onmessage" right: "Queue work, respond immediately if possible"

auth_after_connect: description: "Authenticating after WebSocket is established" wrong: "ws.onopen = () => ws.send({ token })" right: "Pass token in connection URL or headers"

handoffs:

  • trigger: "message queue|job queue|background" to: backend context: "Background job processing"

  • trigger: "redis|caching|pub/sub infrastructure" to: redis-specialist context: "Redis setup and patterns"

  • trigger: "database|persistence" to: postgres-wizard context: "Storing messages, events"

  • trigger: "kubernetes|scaling|deployment" to: kubernetes context: "Scaling WebSocket servers"

  • trigger: "React hooks|components" to: react-patterns context: "Client-side implementation"

tags:

  • websockets
  • real-time
  • sse
  • socket.io
  • presence
  • live-updates
  • chat
  • collaborative