Claude-code-plugins-plus klingai-batch-processing
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-batch-processing" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-klingai-batch-processing && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-batch-processing/SKILL.mdsource content
Kling AI Batch Processing
Overview
Generate multiple videos efficiently using controlled parallelism, rate-limit-aware submission, progress tracking, and result collection. All requests go through
https://api.klingai.com/v1.
Batch Submission with Rate Limiting
import jwt, time, os, requests BASE = "https://api.klingai.com/v1" def get_headers(): ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"] token = jwt.encode( {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5}, sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"} ) return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def submit_batch(prompts, model="kling-v2-master", duration="5", mode="standard", max_concurrent=3, delay=2.0): """Submit batch with controlled concurrency and pacing.""" tasks = [] active = [] for i, prompt in enumerate(prompts): # Wait if at concurrency limit while len(active) >= max_concurrent: active = [t for t in active if not check_complete(t["task_id"])] if len(active) >= max_concurrent: time.sleep(5) response = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={ "model_name": model, "prompt": prompt, "duration": duration, "mode": mode, }) data = response.json()["data"] task = {"task_id": data["task_id"], "prompt": prompt, "index": i} tasks.append(task) active.append(task) print(f"[{i+1}/{len(prompts)}] Submitted: {data['task_id']}") time.sleep(delay) # pace requests return tasks def check_complete(task_id): r = requests.get(f"{BASE}/videos/text2video/{task_id}", headers=get_headers()).json() return r["data"]["task_status"] in ("succeed", "failed")
Collect Results
def collect_results(tasks, timeout=600): """Wait for all tasks and collect results.""" results = {} start = time.monotonic() while len(results) < len(tasks) and time.monotonic() - start < timeout: for task in tasks: if task["task_id"] in results: continue r = requests.get( f"{BASE}/videos/text2video/{task['task_id']}", headers=get_headers() ).json() status = r["data"]["task_status"] if status == "succeed": results[task["task_id"]] = { "status": "succeed", "url": r["data"]["task_result"]["videos"][0]["url"], "prompt": task["prompt"], } elif status == "failed": results[task["task_id"]] = { "status": "failed", "error": r["data"].get("task_status_msg", "Unknown"), "prompt": task["prompt"], } if len(results) < len(tasks): time.sleep(15) return results
Async Batch with asyncio
import asyncio import aiohttp async def async_batch(prompts, max_concurrent=3): """Async batch processing with semaphore-controlled concurrency.""" semaphore = asyncio.Semaphore(max_concurrent) results = {} async def generate_one(prompt, index): async with semaphore: async with aiohttp.ClientSession() as session: # Submit async with session.post( f"{BASE}/videos/text2video", headers=get_headers(), json={"model_name": "kling-v2-master", "prompt": prompt, "duration": "5", "mode": "standard"}, ) as resp: data = (await resp.json())["data"] task_id = data["task_id"] # Poll while True: await asyncio.sleep(10) async with session.get( f"{BASE}/videos/text2video/{task_id}", headers=get_headers(), ) as resp: data = (await resp.json())["data"] if data["task_status"] == "succeed": results[index] = data["task_result"]["videos"][0]["url"] return elif data["task_status"] == "failed": results[index] = f"FAILED: {data.get('task_status_msg')}" return await asyncio.gather(*[generate_one(p, i) for i, p in enumerate(prompts)]) return results
Batch with Callbacks (No Polling)
def submit_batch_with_callbacks(prompts, callback_url): """Submit batch with webhook callbacks -- no polling needed.""" tasks = [] for prompt in prompts: r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={ "model_name": "kling-v2-master", "prompt": prompt, "duration": "5", "mode": "standard", "callback_url": callback_url, }).json() tasks.append(r["data"]["task_id"]) time.sleep(2) # rate limit pacing return tasks
Cost Estimation Before Batch
def estimate_batch_cost(count, duration=5, mode="standard", audio=False): credits_map = {(5, "standard"): 10, (5, "professional"): 35, (10, "standard"): 20, (10, "professional"): 70} per_video = credits_map.get((duration, mode), 10) if audio: per_video *= 5 total = count * per_video print(f"Batch: {count} videos x {per_video} credits = {total} credits") print(f"Estimated cost: ${total * 0.14:.2f}") return total # Check before submitting needed = estimate_batch_cost(50, duration=5, mode="standard")