Claude-code-plugins-plus-skills klingai-audit-logging
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/klingai-pack/skills/klingai-audit-logging" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-klingai-audit-logging && rm -rf "$T"
manifest:
plugins/saas-packs/klingai-pack/skills/klingai-audit-logging/SKILL.mdsource content
Kling AI Audit Logging
Overview
Compliance-grade audit logging for Kling AI API operations. Every task submission, status change, and credential usage is captured in tamper-evident structured logs.
Audit Event Schema
import json import hashlib import time from datetime import datetime from pathlib import Path class AuditLogger: """Append-only audit log with integrity checksums.""" def __init__(self, log_dir: str = "audit"): self.log_dir = Path(log_dir) self.log_dir.mkdir(exist_ok=True) self._prev_hash = "genesis" def _compute_hash(self, entry: dict) -> str: raw = json.dumps(entry, sort_keys=True) + self._prev_hash return hashlib.sha256(raw.encode()).hexdigest()[:16] def log(self, event_type: str, actor: str, details: dict): """Write a tamper-evident audit entry.""" entry = { "timestamp": datetime.utcnow().isoformat() + "Z", "event_type": event_type, "actor": actor, "details": details, "prev_hash": self._prev_hash, } entry["hash"] = self._compute_hash(entry) self._prev_hash = entry["hash"] date = datetime.utcnow().strftime("%Y-%m-%d") filepath = self.log_dir / f"audit-{date}.jsonl" with open(filepath, "a") as f: f.write(json.dumps(entry) + "\n") return entry["hash"]
Audit Events for Kling AI
class KlingAuditClient: """Kling client with full audit trail.""" def __init__(self, base_client, audit: AuditLogger, actor: str = "system"): self.client = base_client self.audit = audit self.actor = actor def text_to_video(self, prompt: str, **kwargs): # Log submission self.audit.log("task_submitted", self.actor, { "action": "text_to_video", "model": kwargs.get("model", "kling-v2-master"), "duration": kwargs.get("duration", 5), "mode": kwargs.get("mode", "standard"), "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16], "prompt_length": len(prompt), }) result = self.client.text_to_video(prompt, **kwargs) # Log completion self.audit.log("task_completed", self.actor, { "action": "text_to_video", "status": "succeed", "video_count": len(result.get("videos", [])), }) return result def log_auth_event(self, event: str, success: bool): self.audit.log("auth_event", self.actor, { "event": event, "success": success, "access_key_prefix": self.client.config.access_key[:8] + "...", })
Audit Log Verification
def verify_audit_chain(log_file: str) -> bool: """Verify tamper-evidence of audit log chain.""" prev_hash = "genesis" entries = [] with open(log_file) as f: for line_num, line in enumerate(f, 1): entry = json.loads(line) entries.append(entry) if entry["prev_hash"] != prev_hash: print(f"Chain broken at line {line_num}: " f"expected prev_hash={prev_hash}, got {entry['prev_hash']}") return False # Recompute hash check_entry = {k: v for k, v in entry.items() if k != "hash"} raw = json.dumps(check_entry, sort_keys=True) + prev_hash expected_hash = hashlib.sha256(raw.encode()).hexdigest()[:16] if entry["hash"] != expected_hash: print(f"Hash mismatch at line {line_num}") return False prev_hash = entry["hash"] print(f"Verified {len(entries)} entries -- chain intact") return True
Audit Report Generator
def generate_audit_report(log_dir: str = "audit", days: int = 30) -> dict: """Generate compliance audit report.""" from collections import Counter from datetime import timedelta log_path = Path(log_dir) events = [] cutoff = datetime.utcnow() - timedelta(days=days) for filepath in sorted(log_path.glob("audit-*.jsonl")): with open(filepath) as f: for line in f: entry = json.loads(line) if entry["timestamp"] >= cutoff.isoformat(): events.append(entry) event_types = Counter(e["event_type"] for e in events) actors = Counter(e["actor"] for e in events) report = { "period_days": days, "total_events": len(events), "event_types": dict(event_types), "unique_actors": len(actors), "actors": dict(actors), "first_event": events[0]["timestamp"] if events else None, "last_event": events[-1]["timestamp"] if events else None, } print(f"\n=== Audit Report ({days} days) ===") print(f"Total events: {report['total_events']}") for event_type, count in event_types.most_common(): print(f" {event_type}: {count}") print(f"Actors: {', '.join(actors.keys())}") return report
Compliance Checklist
- All API calls logged with timestamp, actor, action
- Prompts stored as hashes (not plaintext) for privacy
- Audit chain integrity verifiable
- Logs retained for required period (typically 1-7 years)
- Log access restricted to authorized personnel
- Regular verification of chain integrity