Claude-code-plugins-plus-skills klingai-sdk-patterns
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-sdk-patterns" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-klingai-sdk-patterns && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-sdk-patterns/SKILL.mdsource content
Kling AI SDK Patterns
Overview
Production-ready client patterns for the Kling AI API. Covers auto-refreshing JWT, typed request/response models, exponential backoff polling, async batch submission, and structured error handling.
Python Client Wrapper
import jwt import time import os import requests from dataclasses import dataclass, field from typing import Optional @dataclass class KlingConfig: access_key: str = field(default_factory=lambda: os.environ["KLING_ACCESS_KEY"]) secret_key: str = field(default_factory=lambda: os.environ["KLING_SECRET_KEY"]) base_url: str = "https://api.klingai.com/v1" token_buffer_sec: int = 300 poll_interval_sec: int = 10 max_poll_attempts: int = 120 # 20 minutes max timeout_sec: int = 30 class KlingClient: """Production Kling AI client with auto-refreshing JWT.""" def __init__(self, config: Optional[KlingConfig] = None): self.config = config or KlingConfig() self._token = None self._token_expires = 0 @property def _headers(self) -> dict: now = int(time.time()) if now >= (self._token_expires - self.config.token_buffer_sec): payload = {"iss": self.config.access_key, "exp": now + 1800, "nbf": now - 5} self._token = jwt.encode(payload, self.config.secret_key, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}) self._token_expires = now + 1800 return {"Authorization": f"Bearer {self._token}", "Content-Type": "application/json"} def _post(self, path: str, body: dict) -> dict: r = requests.post(f"{self.config.base_url}{path}", headers=self._headers, json=body, timeout=self.config.timeout_sec) r.raise_for_status() return r.json() def _get(self, path: str) -> dict: r = requests.get(f"{self.config.base_url}{path}", headers=self._headers, timeout=self.config.timeout_sec) r.raise_for_status() return r.json() def _poll_task(self, endpoint: str, task_id: str) -> dict: """Poll with exponential backoff until task completes.""" interval = self.config.poll_interval_sec for attempt in range(self.config.max_poll_attempts): time.sleep(interval) result = self._get(f"{endpoint}/{task_id}") status = result["data"]["task_status"] if status == "succeed": return result["data"]["task_result"] elif status == "failed": raise KlingGenerationError(result["data"].get("task_status_msg", "Unknown")) # Increase interval up to 30s max interval = min(interval * 1.2, 30) raise KlingTimeoutError(f"Task {task_id} did not complete in time") # --- Public API --- def text_to_video(self, prompt: str, **kwargs) -> dict: body = {"model_name": kwargs.get("model", "kling-v2-master"), "prompt": prompt, "duration": str(kwargs.get("duration", 5)), "aspect_ratio": kwargs.get("aspect_ratio", "16:9"), "mode": kwargs.get("mode", "standard")} if kwargs.get("negative_prompt"): body["negative_prompt"] = kwargs["negative_prompt"] if kwargs.get("cfg_scale") is not None: body["cfg_scale"] = kwargs["cfg_scale"] if kwargs.get("callback_url"): body["callback_url"] = kwargs["callback_url"] task = self._post("/videos/text2video", body) task_id = task["data"]["task_id"] if kwargs.get("wait", True): return self._poll_task("/videos/text2video", task_id) return {"task_id": task_id} def image_to_video(self, image_url: str, **kwargs) -> dict: body = {"model_name": kwargs.get("model", "kling-v2-1"), "image": image_url, "duration": str(kwargs.get("duration", 5)), "mode": kwargs.get("mode", "standard")} if kwargs.get("prompt"): body["prompt"] = kwargs["prompt"] task = self._post("/videos/image2video", body) task_id = task["data"]["task_id"] if kwargs.get("wait", True): return self._poll_task("/videos/image2video", task_id) return {"task_id": task_id} def extend_video(self, task_id: str, **kwargs) -> dict: body = {"task_id": task_id, "prompt": kwargs.get("prompt", ""), "duration": str(kwargs.get("duration", 5)), "mode": kwargs.get("mode", "standard")} result = self._post("/videos/video-extend", body) new_task_id = result["data"]["task_id"] if kwargs.get("wait", True): return self._poll_task("/videos/video-extend", new_task_id) return {"task_id": new_task_id} class KlingError(Exception): pass class KlingGenerationError(KlingError): pass class KlingTimeoutError(KlingError): pass
Usage
client = KlingClient() # Synchronous (waits for result) result = client.text_to_video( "A cat playing piano in a jazz club", model="kling-v2-6", mode="professional", duration=5, ) print(result["videos"][0]["url"]) # Fire-and-forget (returns task_id) task = client.text_to_video("Ocean waves at sunset", wait=False) print(f"Submitted: {task['task_id']}")
Node.js Client
import jwt from "jsonwebtoken"; class KlingClient { #token = null; #tokenExp = 0; constructor(ak = process.env.KLING_ACCESS_KEY, sk = process.env.KLING_SECRET_KEY) { this.ak = ak; this.sk = sk; this.base = "https://api.klingai.com/v1"; } #getHeaders() { const now = Math.floor(Date.now() / 1000); if (now >= this.#tokenExp - 300) { this.#token = jwt.sign( { iss: this.ak, exp: now + 1800, nbf: now - 5 }, this.sk, { algorithm: "HS256", header: { typ: "JWT" } } ); this.#tokenExp = now + 1800; } return { Authorization: `Bearer ${this.#token}`, "Content-Type": "application/json" }; } async textToVideo(prompt, opts = {}) { const res = await fetch(`${this.base}/videos/text2video`, { method: "POST", headers: this.#getHeaders(), body: JSON.stringify({ model_name: opts.model ?? "kling-v2-master", prompt, duration: String(opts.duration ?? 5), aspect_ratio: opts.aspectRatio ?? "16:9", mode: opts.mode ?? "standard", }), }); const { data } = await res.json(); return opts.wait === false ? data : this.#poll("/videos/text2video", data.task_id); } async #poll(endpoint, taskId, interval = 10000) { for (let i = 0; i < 120; i++) { await new Promise((r) => setTimeout(r, interval)); const res = await fetch(`${this.base}${endpoint}/${taskId}`, { headers: this.#getHeaders(), }); const { data } = await res.json(); if (data.task_status === "succeed") return data.task_result; if (data.task_status === "failed") throw new Error(data.task_status_msg); interval = Math.min(interval * 1.2, 30000); } throw new Error(`Timeout: task ${taskId}`); } }
Retry Decorator
import functools def retry_on_transient(max_retries=3, backoff_base=2): """Retry on 429 (rate limit) and 5xx (server) errors.""" def decorator(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): for attempt in range(max_retries + 1): try: return fn(*args, **kwargs) except requests.HTTPError as e: if e.response.status_code in (429, 500, 502, 503) and attempt < max_retries: wait = backoff_base ** attempt time.sleep(wait) continue raise return wrapper return decorator # Apply to client methods KlingClient._post = retry_on_transient()(KlingClient._post)