Claude-code-plugins-plus-skills klingai-reference-architecture
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-reference-architecture" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-klingai-reference-architecture && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-reference-architecture/SKILL.mdsource content
Kling AI Reference Architecture
Overview
Production architecture for video generation platforms built on Kling AI. Covers API gateway, job queue, worker pool, storage, and monitoring layers.
Architecture Diagram
User Request | [API Gateway / Load Balancer] | [Application Server] |--- validate prompt & estimate cost |--- enqueue job to Redis/SQS | [Job Queue (Redis / SQS / Pub/Sub)] | [Worker Pool (N workers)] |--- generate JWT token |--- POST https://api.klingai.com/v1/videos/text2video |--- receive task_id |--- register callback_url OR poll | [Webhook Receiver / Poller] |--- receive completion callback |--- download video from Kling CDN |--- upload to S3/GCS |--- update job status in DB |--- notify user | [Object Storage (S3 / GCS)] | [CDN (CloudFront / Cloud CDN)] | User views video
Component Details
API Layer
from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI() class VideoRequest(BaseModel): prompt: str model: str = "kling-v2-master" duration: int = 5 mode: str = "standard" @app.post("/api/videos") async def create_video(req: VideoRequest): # 1. Validate if len(req.prompt) > 2500: raise HTTPException(400, "Prompt exceeds 2500 chars") # 2. Estimate cost credits = estimate_credits(req.duration, req.mode) if not budget_guard.check(credits): raise HTTPException(402, "Budget exceeded") # 3. Enqueue job_id = await queue.enqueue({ "prompt": req.prompt, "model": req.model, "duration": str(req.duration), "mode": req.mode, }) return {"job_id": job_id, "status": "queued", "estimated_credits": credits}
Worker Service
import redis import json class VideoWorker: def __init__(self, kling_client, storage_client, redis_url="redis://localhost"): self.kling = kling_client self.storage = storage_client self.redis = redis.Redis.from_url(redis_url) def process_loop(self): while True: raw = self.redis.brpop("kling:jobs:pending", timeout=5) if not raw: continue job = json.loads(raw[1]) try: # Submit to Kling API result = self.kling.text_to_video( job["prompt"], model=job["model"], duration=int(job["duration"]), mode=job["mode"], callback_url=os.environ.get("WEBHOOK_URL"), ) # If using polling (no callback) if isinstance(result, dict) and "videos" in result: video_url = result["videos"][0]["url"] stored_url = self.storage.download_and_upload(video_url, job["id"]) self.redis.publish("kling:events", json.dumps({ "type": "completed", "job_id": job["id"], "video_url": stored_url, })) except Exception as e: self.redis.lpush("kling:jobs:failed", json.dumps({ **job, "error": str(e) }))
Scaling Guidelines
| Component | Scaling Strategy |
|---|---|
| Workers | Scale by queue depth (1 worker per 3 concurrent API tasks) |
| API servers | Horizontal, behind load balancer |
| Redis | Single instance for <1K jobs/day, cluster for more |
| Storage | S3/GCS scales automatically |
| CDN | CloudFront/Cloud CDN for global delivery |
Concurrency Limits by Tier
| Tier | Max Concurrent Tasks | Workers Needed |
|---|---|---|
| Free | 1 | 1 |
| Standard | 3 | 1 |
| Pro | 5 | 2 |
| Enterprise | 10+ | 3-4 |
Docker Compose Setup
# docker-compose.yml services: api: build: ./api ports: ["8000:8000"] environment: - REDIS_URL=redis://redis:6379 - KLING_ACCESS_KEY=${KLING_ACCESS_KEY} - KLING_SECRET_KEY=${KLING_SECRET_KEY} worker: build: ./worker deploy: replicas: 2 environment: - REDIS_URL=redis://redis:6379 - KLING_ACCESS_KEY=${KLING_ACCESS_KEY} - KLING_SECRET_KEY=${KLING_SECRET_KEY} - S3_BUCKET=${S3_BUCKET} webhook: build: ./webhook ports: ["8001:8001"] environment: - REDIS_URL=redis://redis:6379 redis: image: redis:7-alpine volumes: ["redis-data:/data"] volumes: redis-data: