Claude-code-plugins klingai-rate-limits
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-rate-limits" ~/.claude/skills/jeremylongshore-claude-code-plugins-klingai-rate-limits && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-rate-limits/SKILL.mdsource content
Kling AI Rate Limits
Overview
Kling AI enforces rate limits per API key. When exceeded, the API returns
429 Too Many Requests. This skill covers detection, backoff strategies, request queuing, and concurrent job management.
Rate Limit Tiers
| Tier | Concurrent Tasks | Requests/Min | Notes |
|---|---|---|---|
| Free | 1 | 10 | 66 daily credits cap |
| Standard | 3 | 30 | Per API key |
| Pro | 5 | 60 | Per API key |
| Enterprise | 10+ | Custom | Contact sales |
Exponential Backoff with Jitter
import time, random, requests def exponential_backoff(attempt: int, base: float = 1.0, max_wait: float = 60.0) -> float: """Calculate wait time with jitter to avoid thundering herd.""" wait = min(base * (2 ** attempt), max_wait) jitter = random.uniform(0, wait * 0.5) return wait + jitter def request_with_retry(method, url, headers, json=None, max_retries=5): for attempt in range(max_retries + 1): response = method(url, headers=headers, json=json, timeout=30) if response.status_code == 429: if attempt == max_retries: raise RuntimeError("Rate limit: max retries exceeded") wait = exponential_backoff(attempt) print(f"429 rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})") time.sleep(wait) continue if response.status_code >= 500: if attempt == max_retries: response.raise_for_status() time.sleep(exponential_backoff(attempt, base=2.0)) continue response.raise_for_status() return response raise RuntimeError("Unreachable")
Concurrent Task Limiter (asyncio)
import asyncio class TaskLimiter: """Limit concurrent Kling AI tasks to stay within API tier.""" def __init__(self, max_concurrent: int = 3): self._semaphore = asyncio.Semaphore(max_concurrent) self._active = 0 async def submit(self, coro): async with self._semaphore: self._active += 1 try: return await coro finally: self._active -= 1 @property def active_count(self) -> int: return self._active # Usage limiter = TaskLimiter(max_concurrent=3) tasks = [limiter.submit(generate_video(p)) for p in prompts] results = await asyncio.gather(*tasks, return_exceptions=True)
Rate Limit Monitor
class RateLimitMonitor: """Track API call frequency and warn before hitting limits.""" def __init__(self, max_per_minute: int = 30): self.max_per_minute = max_per_minute self._calls = [] def record_call(self): now = time.time() self._calls = [t for t in self._calls if now - t < 60] self._calls.append(now) @property def usage_pct(self) -> float: now = time.time() recent = sum(1 for t in self._calls if now - t < 60) return (recent / self.max_per_minute) * 100 def wait_if_needed(self): if self.usage_pct > 80 and self._calls: wait = 60 - (time.time() - self._calls[0]) if wait > 0: print(f"Throttling: waiting {wait:.1f}s ({self.usage_pct:.0f}% of limit)") time.sleep(wait)
Request Queue Pattern
from collections import deque import threading class RequestQueue: """FIFO queue with rate-limit-aware dispatch.""" def __init__(self, client, max_per_minute: int = 30): self.client = client self.interval = 60.0 / max_per_minute self._queue = deque() def enqueue(self, endpoint: str, body: dict, callback=None): self._queue.append((endpoint, body, callback)) def process_all(self): while self._queue: endpoint, body, callback = self._queue.popleft() try: result = self.client._post(endpoint, body) if callback: callback(result, error=None) except Exception as e: if callback: callback(None, error=e) time.sleep(self.interval)
Error Reference
| Scenario | HTTP Code | Action |
|---|---|---|
| Soft rate limit | + | Wait specified seconds |
| Hard rate limit | no header | Backoff from 1s, double each attempt |
| Concurrent limit hit | or task rejection | Wait for active tasks to complete |
| Burst detection | Multiple s | Aggressive backoff (30-60s) |