Claude-code-plugins-plus-skills openrouter-load-balancing
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/openrouter-pack/skills/openrouter-load-balancing" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-openrouter-load-balancing && rm -rf "$T"
manifest:
plugins/saas-packs/openrouter-pack/skills/openrouter-load-balancing/SKILL.mdsource content
OpenRouter Load Balancing
Overview
A single OpenRouter API key has rate limits (requests/minute and tokens/minute). To scale beyond those limits, distribute requests across multiple keys. OpenRouter also provides server-side load balancing via provider routing and the
:nitro variant for low-latency inference. This skill covers multi-key rotation, health-based routing, circuit breakers, and concurrent request patterns.
Multi-Key Round Robin
import os, itertools, time, logging from openai import OpenAI, RateLimitError from dataclasses import dataclass, field log = logging.getLogger("openrouter.lb") @dataclass class KeyPool: """Round-robin API key pool with health tracking.""" keys: list[str] _cycle: itertools.cycle = field(init=False, repr=False) _health: dict[str, dict] = field(init=False, default_factory=dict) def __post_init__(self): self._cycle = itertools.cycle(self.keys) self._health = {k: {"errors": 0, "last_error": 0, "healthy": True} for k in self.keys} def next_key(self) -> str: """Get next healthy key.""" attempts = 0 while attempts < len(self.keys): key = next(self._cycle) h = self._health[key] # Recover after 60s cooldown if not h["healthy"] and time.time() - h["last_error"] > 60: h["healthy"] = True h["errors"] = 0 if h["healthy"]: return key attempts += 1 # All keys unhealthy -- return any and hope for the best return next(self._cycle) def mark_error(self, key: str): h = self._health[key] h["errors"] += 1 h["last_error"] = time.time() if h["errors"] >= 3: # Circuit breaker: 3 errors → unhealthy h["healthy"] = False log.warning(f"Key {key[:12]}... marked unhealthy after {h['errors']} errors") def mark_success(self, key: str): self._health[key]["errors"] = 0 self._health[key]["healthy"] = True pool = KeyPool(keys=[ os.environ.get("OPENROUTER_KEY_1", ""), os.environ.get("OPENROUTER_KEY_2", ""), os.environ.get("OPENROUTER_KEY_3", ""), ]) def balanced_completion(messages, model="anthropic/claude-3.5-sonnet", **kwargs): """Send request using next healthy key from the pool.""" key = pool.next_key() client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=key, default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"}, ) try: response = client.chat.completions.create( model=model, messages=messages, **kwargs ) pool.mark_success(key) return response except RateLimitError: pool.mark_error(key) # Retry with next key return balanced_completion(messages, model, **kwargs)
Concurrent Request Processing
import asyncio from openai import AsyncOpenAI async def parallel_completions(prompts: list[str], model="openai/gpt-4o-mini", max_concurrent=5, **kwargs): """Process multiple prompts concurrently with rate limiting.""" semaphore = asyncio.Semaphore(max_concurrent) client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.environ["OPENROUTER_API_KEY"], default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"}, ) async def process_one(prompt: str): async with semaphore: response = await client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], **kwargs, ) return response.choices[0].message.content return await asyncio.gather(*[process_one(p) for p in prompts]) # Usage results = asyncio.run(parallel_completions( ["Summarize X", "Translate Y", "Analyze Z"], max_concurrent=3, max_tokens=500, ))
Provider-Level Load Balancing
# OpenRouter can distribute across providers for the same model response = client.chat.completions.create( model="anthropic/claude-3.5-sonnet", messages=[{"role": "user", "content": "Hello"}], max_tokens=200, extra_body={ "provider": { # Let OpenRouter pick the best available provider "order": ["Anthropic", "AWS Bedrock", "GCP Vertex"], "allow_fallbacks": True, }, }, )
Rate Limit Awareness
import requests def check_rate_limits(api_key: str) -> dict: """Check current rate limit status for a key.""" resp = requests.get( "https://openrouter.ai/api/v1/auth/key", headers={"Authorization": f"Bearer {api_key}"}, ) data = resp.json()["data"] return { "requests_limit": data["rate_limit"]["requests"], "interval": data["rate_limit"]["interval"], "credits_used": data["usage"], "credits_limit": data.get("limit"), } # Check all keys in pool for key in pool.keys: limits = check_rate_limits(key) print(f"Key {key[:12]}...: {limits}")
Error Handling
| Error | Cause | Fix |
|---|---|---|
| 429 on all keys | All keys rate-limited simultaneously | Add more keys; implement request queuing |
| Uneven load distribution | Round-robin not accounting for in-flight requests | Use weighted distribution based on current load |
| Key health false positive | Transient error marked key unhealthy | Use sliding window (3 errors in 60s) before marking unhealthy |
| Concurrent request failures | Too many parallel requests | Reduce semaphore limit; add backoff |
Enterprise Considerations
- Create separate API keys per service/team with individual credit limits for cost isolation
- Use 3+ keys to multiply effective rate limits (each key gets its own quota)
- Implement circuit breakers: mark keys unhealthy after N consecutive errors, recover after cooldown
- Use
to control concurrency and prevent overwhelming the APIasyncio.Semaphore - Monitor per-key error rates and latency to detect degraded keys early
- Combine multi-key rotation with provider routing for maximum resilience