Claude-code-plugins-plus-skills klingai-async-workflows
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-async-workflows" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-klingai-async-workflows && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-async-workflows/SKILL.mdsource content
Kling AI Async Workflows
Overview
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
Core Pattern: Submit + Callback
import jwt, time, os, requests BASE = "https://api.klingai.com/v1" def get_headers(): ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"] token = jwt.encode( {"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5}, sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"} ) return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def submit_async(prompt, callback_url=None, **kwargs): """Submit task and return immediately.""" body = { "model_name": kwargs.get("model", "kling-v2-master"), "prompt": prompt, "duration": str(kwargs.get("duration", 5)), "mode": kwargs.get("mode", "standard"), } if callback_url: body["callback_url"] = callback_url r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json=body) return r.json()["data"]["task_id"]
Redis Queue Workflow
import redis import json r = redis.Redis() # Producer: enqueue video generation requests def enqueue_video_job(prompt, metadata=None): job = { "id": f"job_{int(time.time() * 1000)}", "prompt": prompt, "metadata": metadata or {}, "status": "queued", "created_at": time.time(), } r.lpush("kling:jobs:pending", json.dumps(job)) return job["id"] # Worker: process jobs from queue def process_jobs(max_concurrent=3): active_tasks = {} while True: # Submit new jobs if under concurrency limit while len(active_tasks) < max_concurrent: raw = r.rpop("kling:jobs:pending") if not raw: break job = json.loads(raw) task_id = submit_async(job["prompt"]) active_tasks[task_id] = job r.hset("kling:jobs:active", task_id, json.dumps(job)) # Check active tasks completed = [] for task_id, job in active_tasks.items(): result = requests.get( f"{BASE}/videos/text2video/{task_id}", headers=get_headers() ).json() status = result["data"]["task_status"] if status == "succeed": job["status"] = "completed" job["video_url"] = result["data"]["task_result"]["videos"][0]["url"] r.lpush("kling:jobs:completed", json.dumps(job)) completed.append(task_id) elif status == "failed": job["status"] = "failed" job["error"] = result["data"].get("task_status_msg") r.lpush("kling:jobs:failed", json.dumps(job)) completed.append(task_id) for tid in completed: active_tasks.pop(tid) r.hdel("kling:jobs:active", tid) time.sleep(10)
State Machine Pattern
from enum import Enum from dataclasses import dataclass, field from typing import Optional class JobState(Enum): QUEUED = "queued" SUBMITTING = "submitting" PROCESSING = "processing" DOWNLOADING = "downloading" COMPLETED = "completed" FAILED = "failed" RETRYING = "retrying" @dataclass class VideoJob: prompt: str state: JobState = JobState.QUEUED task_id: Optional[str] = None video_url: Optional[str] = None error: Optional[str] = None attempts: int = 0 max_attempts: int = 3 def can_retry(self) -> bool: return self.state == JobState.FAILED and self.attempts < self.max_attempts def transition(self, new_state: JobState): valid = { JobState.QUEUED: {JobState.SUBMITTING}, JobState.SUBMITTING: {JobState.PROCESSING, JobState.FAILED}, JobState.PROCESSING: {JobState.DOWNLOADING, JobState.FAILED}, JobState.DOWNLOADING: {JobState.COMPLETED, JobState.FAILED}, JobState.FAILED: {JobState.RETRYING}, JobState.RETRYING: {JobState.SUBMITTING}, } if new_state not in valid.get(self.state, set()): raise ValueError(f"Invalid transition: {self.state} -> {new_state}") self.state = new_state
Multi-Step Pipeline
async def video_pipeline(prompt, steps=None): """Chain: generate -> extend -> download -> upload.""" steps = steps or ["generate", "extend", "download"] # Step 1: Generate task_id = submit_async(prompt, duration=5) result = poll_task("/videos/text2video", task_id) # from job-monitoring skill video_url = result["videos"][0]["url"] # Step 2: Extend (optional) if "extend" in steps: ext_r = requests.post(f"{BASE}/videos/video-extend", headers=get_headers(), json={ "task_id": task_id, "prompt": f"Continue: {prompt}", "duration": "5", }).json() ext_result = poll_task("/videos/video-extend", ext_r["data"]["task_id"]) video_url = ext_result["videos"][0]["url"] # Step 3: Download if "download" in steps: video_data = requests.get(video_url).content filepath = f"output/{task_id}.mp4" with open(filepath, "wb") as f: f.write(video_data) return filepath return video_url
Event-Driven with Webhook
# Use callback_url to avoid polling entirely task_id = submit_async( "Sunset over ocean with sailboats", callback_url="https://your-app.com/webhooks/kling" ) # Your webhook handler triggers next pipeline step # See klingai-webhook-config skill for receiver implementation