Claude-code-plugins-plus klingai-job-monitoring
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-job-monitoring" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-klingai-job-monitoring && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-job-monitoring/SKILL.mdsource content
Kling AI Job Monitoring
Overview
Every Kling AI generation returns a
task_id. This skill covers polling strategies, batch tracking, timeout handling, and callback-based monitoring for the /v1/videos/text2video, /v1/videos/image2video, and /v1/videos/video-extend endpoints.
Task Lifecycle
| Status | Meaning | Typical Duration |
|---|---|---|
| Queued for processing | 0-30s |
| Generation in progress | 30-120s (standard), 60-300s (professional) |
| Complete, video URL available | Terminal |
| Generation failed | Terminal |
Polling a Single Task
import jwt, time, os, requests BASE = "https://api.klingai.com/v1" def get_headers(): ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"] token = jwt.encode( {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5}, sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"} ) return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def poll_task(endpoint: str, task_id: str, interval: int = 10, timeout: int = 600): """Poll with adaptive interval and timeout.""" start = time.monotonic() attempts = 0 while time.monotonic() - start < timeout: time.sleep(interval) attempts += 1 r = requests.get(f"{BASE}{endpoint}/{task_id}", headers=get_headers(), timeout=30) data = r.json()["data"] status = data["task_status"] elapsed = int(time.monotonic() - start) print(f"[{elapsed}s] Poll #{attempts}: {status}") if status == "succeed": return data["task_result"] elif status == "failed": raise RuntimeError(f"Task failed: {data.get('task_status_msg', 'unknown')}") if attempts > 5: interval = min(interval * 1.2, 30) raise TimeoutError(f"Task {task_id} timed out after {timeout}s")
Batch Job Tracker
from dataclasses import dataclass, field from datetime import datetime from typing import Optional @dataclass class TrackedTask: task_id: str endpoint: str prompt: str status: str = "submitted" created_at: float = field(default_factory=time.time) result_url: Optional[str] = None error_msg: Optional[str] = None class BatchTracker: def __init__(self): self.tasks: dict[str, TrackedTask] = {} def add(self, task_id, endpoint, prompt): self.tasks[task_id] = TrackedTask(task_id=task_id, endpoint=endpoint, prompt=prompt) def update_all(self): active = [t for t in self.tasks.values() if t.status in ("submitted", "processing")] for task in active: try: r = requests.get( f"{BASE}{task.endpoint}/{task.task_id}", headers=get_headers(), timeout=30 ).json() data = r["data"] task.status = data["task_status"] if task.status == "succeed": task.result_url = data["task_result"]["videos"][0]["url"] elif task.status == "failed": task.error_msg = data.get("task_status_msg") except Exception as e: print(f"Error polling {task.task_id}: {e}") def print_report(self): by_status = {} for t in self.tasks.values(): by_status.setdefault(t.status, 0) by_status[t.status] += 1 active = sum(v for k, v in by_status.items() if k in ("submitted", "processing")) print(f"\n=== Batch: {len(self.tasks)} tasks, {active} active ===") for status, count in sorted(by_status.items()): print(f" {status}: {count}")
Stuck Task Detection
def detect_stuck(tracker: BatchTracker, threshold_sec: int = 600): """Flag tasks processing longer than threshold.""" now = time.time() stuck = [] for t in tracker.tasks.values(): if t.status in ("submitted", "processing"): elapsed = now - t.created_at if elapsed > threshold_sec: stuck.append((t.task_id, int(elapsed))) if stuck: print(f"WARNING: {len(stuck)} stuck tasks:") for tid, secs in stuck: print(f" {tid}: {secs}s") return stuck
Batch Monitor Loop
tracker = BatchTracker() # Submit batch for prompt in prompts: r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={ "model_name": "kling-v2-master", "prompt": prompt, "duration": "5" }).json() tracker.add(r["data"]["task_id"], "/videos/text2video", prompt) # Monitor until all complete while any(t.status in ("submitted", "processing") for t in tracker.tasks.values()): time.sleep(15) tracker.update_all() tracker.print_report() detect_stuck(tracker)