Claude-skill-registry high-availability-patterns

High Availability Patterns

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/high-availability-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-high-availability-patterns && rm -rf "$T"
manifest: skills/data/high-availability-patterns/SKILL.md
source content

High Availability Patterns

Production-ready HA patterns for cryptocurrency trading systems with 99.99% uptime.

Active-Passive Architecture with Redis Leader Election

import redis
import asyncio
import time
import uuid
from typing import Optional

class LeaderElection:
    """Redis-based leader election for active-passive HA"""

    def __init__(
        self,
        redis_client: redis.Redis,
        service_name: str,
        ttl_seconds: int = 10,
        instance_id: Optional[str] = None
    ):
        self.redis = redis_client
        self.service_name = service_name
        self.ttl = ttl_seconds
        self.instance_id = instance_id or str(uuid.uuid4())
        self.is_leader = False
        self.lock_key = f"leader:{service_name}"

    async def run_leader_election(self):
        """Continuous leader election loop"""
        while True:
            try:
                # Try to acquire leadership
                acquired = self.redis.set(
                    self.lock_key,
                    self.instance_id,
                    nx=True,  # Only set if not exists
                    ex=self.ttl  # Expire after TTL
                )

                if acquired:
                    if not self.is_leader:
                        logger.info(f"Instance {self.instance_id} became LEADER")
                        self.is_leader = True
                        await self._on_become_leader()

                    # Renew leadership
                    await self._renew_leadership()

                else:
                    # Check if we were leader before
                    if self.is_leader:
                        logger.warning(f"Instance {self.instance_id} lost leadership")
                        self.is_leader = False
                        await self._on_lose_leadership()

                # Sleep for half the TTL before renewing
                await asyncio.sleep(self.ttl / 2)

            except redis.RedisError as e:
                logger.error(f"Leader election error: {e}")
                self.is_leader = False
                await asyncio.sleep(1)

    async def _renew_leadership(self):
        """Renew leadership lock"""
        try:
            # Only renew if we still hold the lock
            current_leader = self.redis.get(self.lock_key)
            if current_leader and current_leader.decode() == self.instance_id:
                self.redis.expire(self.lock_key, self.ttl)
            else:
                self.is_leader = False
                logger.warning("Lost leadership during renewal")

        except redis.RedisError as e:
            logger.error(f"Leadership renewal failed: {e}")
            self.is_leader = False

    async def _on_become_leader(self):
        """Hook called when instance becomes leader"""
        # Perform state reconciliation
        await self._reconcile_state()

        # Start active trading
        await self._start_trading_engine()

        # Send notification
        logger.info("Transitioned to ACTIVE state")

    async def _on_lose_leadership(self):
        """Hook called when instance loses leadership"""
        # Stop trading immediately
        await self._stop_trading_engine()

        # Flush pending orders
        await self._flush_pending_orders()

        logger.info("Transitioned to PASSIVE state")

State Reconciliation on Failover

from dataclasses import dataclass
from typing import Dict, List
import asyncio

@dataclass
class ReconciliationReport:
    open_orders_found: int
    orders_cancelled: int
    positions_synced: int
    balance_drift: Decimal
    discrepancies: List[str]

class StateReconciliation:
    """Reconcile state on failover to prevent duplicate orders"""

    def __init__(self, exchange_connector, database, cache):
        self.exchange = exchange_connector
        self.db = database
        self.cache = cache

    async def reconcile_on_startup(self) -> ReconciliationReport:
        """
        Full state reconciliation when becoming active leader
        CRITICAL: Run this BEFORE starting trading engine
        """
        logger.info("Starting state reconciliation...")

        # Step 1: Fetch ground truth from exchanges
        exchange_orders = await self._fetch_all_open_orders()
        exchange_positions = await self._fetch_all_positions()
        exchange_balances = await self._fetch_all_balances()

        # Step 2: Compare with local state
        db_orders = await self.db.get_open_orders()
        db_positions = await self.db.get_positions()
        db_balances = await self.db.get_balances()

        # Step 3: Reconcile orders
        orders_cancelled = await self._reconcile_orders(exchange_orders, db_orders)

        # Step 4: Reconcile positions
        positions_synced = await self._reconcile_positions(exchange_positions, db_positions)

        # Step 5: Reconcile balances
        balance_drift = await self._reconcile_balances(exchange_balances, db_balances)

        # Step 6: Check for discrepancies
        discrepancies = await self._detect_discrepancies()

        report = ReconciliationReport(
            open_orders_found=len(exchange_orders),
            orders_cancelled=orders_cancelled,
            positions_synced=positions_synced,
            balance_drift=balance_drift,
            discrepancies=discrepancies
        )

        logger.info(f"State reconciliation complete: {report}")

        return report

    async def _reconcile_orders(
        self,
        exchange_orders: List[Dict],
        db_orders: List[Dict]
    ) -> int:
        """
        Reconcile orders between exchange and database
        Cancel any orphaned orders
        """
        cancelled_count = 0

        # Build lookup of DB orders
        db_order_ids = {o['order_id'] for o in db_orders}

        # Find orders on exchange not in DB (orphans)
        for exchange_order in exchange_orders:
            order_id = exchange_order['id']

            if order_id not in db_order_ids:
                # Orphaned order - cancel it
                logger.warning(f"Found orphaned order {order_id} - cancelling")
                try:
                    await self.exchange.cancel_order(order_id)
                    cancelled_count += 1
                except Exception as e:
                    logger.error(f"Failed to cancel orphaned order {order_id}: {e}")

        # Update DB orders with exchange status
        for db_order in db_orders:
            exchange_status = next(
                (o for o in exchange_orders if o['id'] == db_order['order_id']),
                None
            )

            if exchange_status:
                # Update status if different
                if exchange_status['status'] != db_order['status']:
                    await self.db.update_order_status(
                        db_order['order_id'],
                        exchange_status['status']
                    )
            else:
                # Order not on exchange - mark as cancelled
                await self.db.update_order_status(
                    db_order['order_id'],
                    'cancelled'
                )

        return cancelled_count

Health Check and Monitoring

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional

@dataclass
class HealthStatus:
    healthy: bool
    timestamp: datetime
    checks: Dict[str, bool]
    latency_ms: Optional[float]
    message: str

class HealthChecker:
    """Comprehensive health checking for active/passive instances"""

    def __init__(self, exchange, database, redis_client):
        self.exchange = exchange
        self.db = database
        self.redis = redis_client
        self.last_trade_time = None
        self.health_history = []

    async def check_health(self) -> HealthStatus:
        """Run all health checks"""
        start_time = time.time()

        checks = {
            'exchange_connection': await self._check_exchange_connection(),
            'database_connection': await self._check_database_connection(),
            'redis_connection': await self._check_redis_connection(),
            'data_freshness': await self._check_data_freshness(),
            'order_execution': await self._check_order_execution(),
            'websocket_alive': await self._check_websocket_connection(),
        }

        latency_ms = (time.time() - start_time) * 1000
        all_healthy = all(checks.values())

        status = HealthStatus(
            healthy=all_healthy,
            timestamp=datetime.utcnow(),
            checks=checks,
            latency_ms=latency_ms,
            message="All systems operational" if all_healthy else "System degraded"
        )

        self.health_history.append(status)

        # Keep only last 100 health checks
        if len(self.health_history) > 100:
            self.health_history = self.health_history[-100:]

        return status

    async def _check_exchange_connection(self) -> bool:
        """Check if exchange API is responsive"""
        try:
            # Simple ping to exchange
            await asyncio.wait_for(
                self.exchange.fetch_time(),
                timeout=5.0
            )
            return True
        except asyncio.TimeoutError:
            logger.error("Exchange connection check timed out")
            return False
        except Exception as e:
            logger.error(f"Exchange connection check failed: {e}")
            return False

    async def _check_data_freshness(self) -> bool:
        """Check if market data is fresh (< 5 seconds old)"""
        try:
            last_update = await self.redis.get('last_ticker_update')
            if not last_update:
                return False

            last_update_time = datetime.fromisoformat(last_update.decode())
            age = datetime.utcnow() - last_update_time

            return age < timedelta(seconds=5)

        except Exception as e:
            logger.error(f"Data freshness check failed: {e}")
            return False

Graceful Shutdown Pattern

import signal
import asyncio

class GracefulShutdown:
    """Handle graceful shutdown on SIGTERM/SIGINT"""

    def __init__(self, trading_engine):
        self.trading_engine = trading_engine
        self.shutdown_event = asyncio.Event()
        self.shutdown_complete = asyncio.Event()

        # Register signal handlers
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

    def _signal_handler(self, signum, frame):
        """Handle shutdown signals"""
        logger.info(f"Received signal {signum} - initiating graceful shutdown")
        self.shutdown_event.set()

    async def shutdown_sequence(self):
        """Execute graceful shutdown sequence"""
        logger.info("Starting graceful shutdown sequence...")

        # Step 1: Stop accepting new orders (30 seconds timeout)
        logger.info("Step 1/5: Stopping new order acceptance")
        await asyncio.wait_for(
            self.trading_engine.stop_new_orders(),
            timeout=30
        )

        # Step 2: Cancel all open orders (60 seconds timeout)
        logger.info("Step 2/5: Cancelling all open orders")
        await asyncio.wait_for(
            self.trading_engine.cancel_all_orders(),
            timeout=60
        )

        # Step 3: Close all positions (optional, 120 seconds timeout)
        logger.info("Step 3/5: Closing positions (if configured)")
        if self.trading_engine.config.get('close_positions_on_shutdown'):
            await asyncio.wait_for(
                self.trading_engine.close_all_positions(),
                timeout=120
            )

        # Step 4: Flush all pending database writes (30 seconds timeout)
        logger.info("Step 4/5: Flushing pending writes")
        await asyncio.wait_for(
            self.trading_engine.flush_pending_writes(),
            timeout=30
        )

        # Step 5: Close connections
        logger.info("Step 5/5: Closing connections")
        await self.trading_engine.close_connections()

        logger.info("Graceful shutdown complete")
        self.shutdown_complete.set()

Failover Time Monitoring

class FailoverMonitor:
    """Monitor and alert on failover times"""

    def __init__(self, target_failover_seconds: int = 30):
        self.target_failover_time = target_failover_seconds
        self.failover_history = []

    def record_failover(
        self,
        old_leader: str,
        new_leader: str,
        failover_time_seconds: float
    ):
        """Record failover event"""
        event = {
            'timestamp': datetime.utcnow(),
            'old_leader': old_leader,
            'new_leader': new_leader,
            'failover_time': failover_time_seconds,
            'met_target': failover_time_seconds <= self.target_failover_time
        }

        self.failover_history.append(event)

        # Alert if target not met
        if not event['met_target']:
            logger.critical(
                f"Failover took {failover_time_seconds:.1f}s - "
                f"exceeds target of {self.target_failover_time}s"
            )

        # Calculate statistics
        if len(self.failover_history) >= 5:
            recent_times = [e['failover_time'] for e in self.failover_history[-5:]]
            avg_time = sum(recent_times) / len(recent_times)

            logger.info(
                f"Failover stats - Current: {failover_time_seconds:.1f}s, "
                f"Avg (last 5): {avg_time:.1f}s, "
                f"Target: {self.target_failover_time}s"
            )

Critical for production: Always implement leader election, state reconciliation, and graceful shutdown for 99.99% uptime.