Skills audit-logging
install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/audit-logging" ~/.claude/skills/terminalskills-skills-audit-logging && rm -rf "$T"
manifest:
skills/audit-logging/SKILL.mdsource content
Audit Logging
Overview
Compliance audit logs must answer: who did what to which resource, when, from where, and with what result. They must also be tamper-evident — an auditor must be able to verify logs haven't been modified.
Regulatory retention requirements:
| Standard | Retention |
|---|---|
| HIPAA | 6 years |
| PCI DSS | 12 months online + 12 months archive |
| SOC 2 (typical) | 1 year |
| GDPR | Defined by purpose (often 1-3 years) |
Required Log Fields
interface AuditLogEntry { // Core required fields id: string; // UUID — unique event identifier timestamp: string; // ISO 8601 UTC — when the event occurred actor_id: string; // User/service that performed the action actor_type: 'user' | 'service' | 'admin' | 'system'; action: string; // What happened: "read" | "write" | "delete" | "login" | "export" resource_type: string; // "patient_record" | "invoice" | "user_account" resource_id: string; // ID of the affected resource result: 'success' | 'failure' | 'denied'; // Context fields ip_address: string; // Source IP user_agent?: string; // Browser/client identifier session_id?: string; // Session identifier // Optional fields changed_fields?: string[]; // For write operations: which fields changed old_values?: Record<string, unknown>; // Previous values (be careful with PII) reason?: string; // Clinical justification (HIPAA), business reason // Tamper-evidence prev_hash?: string; // Hash of previous log entry (hash chaining) hash: string; // SHA-256 of this entry }
Hash Chain Implementation
Hash chaining makes log tampering detectable: each entry includes a hash of the previous entry. Modifying any entry breaks the chain.
import json import hashlib import uuid from datetime import datetime, timezone from typing import Optional class AuditLogger: def __init__(self, db_connection): self.db = db_connection self._last_hash: Optional[str] = None def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str: """Compute SHA-256 of the log entry for tamper-evidence.""" hashable = { "id": entry["id"], "timestamp": entry["timestamp"], "actor_id": entry["actor_id"], "action": entry["action"], "resource_type": entry["resource_type"], "resource_id": entry["resource_id"], "result": entry["result"], "prev_hash": prev_hash or "GENESIS" } content = json.dumps(hashable, sort_keys=True) return hashlib.sha256(content.encode()).hexdigest() async def log( self, actor_id: str, actor_type: str, action: str, resource_type: str, resource_id: str, result: str, ip_address: str, **kwargs ) -> dict: """Create and persist a tamper-evident audit log entry.""" # Get last hash for chaining prev_hash = self._last_hash or await self._get_last_hash() entry = { "id": str(uuid.uuid4()), "timestamp": datetime.now(timezone.utc).isoformat(), "actor_id": actor_id, "actor_type": actor_type, "action": action, "resource_type": resource_type, "resource_id": resource_id, "result": result, "ip_address": ip_address, "prev_hash": prev_hash or "GENESIS", **kwargs } entry["hash"] = self._compute_hash(entry, prev_hash) self._last_hash = entry["hash"] # Persist to append-only storage await self.db.audit_logs.insert_one(entry) return entry async def verify_chain(self, limit: int = 1000) -> bool: """Verify the hash chain integrity.""" entries = await self.db.audit_logs.find( sort=[("timestamp", 1)], limit=limit ) prev_hash = "GENESIS" for i, entry in enumerate(entries): expected_hash = self._compute_hash(entry, prev_hash) if entry["hash"] != expected_hash: print(f"❌ Chain broken at entry {i}: id={entry['id']}") return False if entry["prev_hash"] != prev_hash: print(f"❌ prev_hash mismatch at entry {i}") return False prev_hash = entry["hash"] print(f"✅ Chain verified: {len(entries)} entries intact") return True async def _get_last_hash(self) -> Optional[str]: last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)]) return last["hash"] if last else None
Express.js Audit Middleware
const { v4: uuidv4 } = require('uuid'); const crypto = require('crypto'); class AuditLogger { constructor(db) { this.db = db; } async log({ actorId, actorType = 'user', action, resourceType, resourceId, result, ipAddress, sessionId, reason, changedFields }) { const prevHash = await this.getLastHash(); const entry = { id: uuidv4(), timestamp: new Date().toISOString(), actor_id: actorId, actor_type: actorType, action, resource_type: resourceType, resource_id: resourceId, result, ip_address: ipAddress, session_id: sessionId, reason, changed_fields: changedFields, prev_hash: prevHash || 'GENESIS', }; entry.hash = this.computeHash(entry); await this.db.auditLogs.create(entry); return entry; } computeHash(entry) { const hashable = JSON.stringify({ id: entry.id, timestamp: entry.timestamp, actor_id: entry.actor_id, action: entry.action, resource_type: entry.resource_type, resource_id: entry.resource_id, result: entry.result, prev_hash: entry.prev_hash, }); return crypto.createHash('sha256').update(hashable).digest('hex'); } async getLastHash() { const last = await this.db.auditLogs.findOne({ order: [['timestamp', 'DESC']] }); return last?.hash || null; } } // Express middleware — auto-log all requests const createAuditMiddleware = (auditLogger) => async (req, res, next) => { const start = Date.now(); // Capture response const originalSend = res.send; res.send = function(body) { res.send = originalSend; // Log after response sent setImmediate(async () => { try { await auditLogger.log({ actorId: req.user?.id || 'anonymous', actorType: req.user ? 'user' : 'anonymous', action: `${req.method.toLowerCase()}_${req.route?.path || req.path}`, resourceType: req.params.resourceType || 'api_request', resourceId: req.params.id || req.path, result: res.statusCode < 400 ? 'success' : res.statusCode === 403 ? 'denied' : 'failure', ipAddress: req.ip, sessionId: req.session?.id, duration_ms: Date.now() - start, }); } catch (err) { console.error('Audit log failed:', err); // Never let audit logging errors break the main flow } }); return originalSend.apply(this, arguments); }; next(); };
PostgreSQL Append-Only Table
-- Append-only audit log table -- The CHECK constraint and trigger prevent UPDATE/DELETE CREATE TABLE audit_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), actor_id TEXT NOT NULL, actor_type TEXT NOT NULL CHECK (actor_type IN ('user', 'service', 'admin', 'system')), action TEXT NOT NULL, resource_type TEXT NOT NULL, resource_id TEXT NOT NULL, result TEXT NOT NULL CHECK (result IN ('success', 'failure', 'denied')), ip_address INET, session_id TEXT, reason TEXT, changed_fields TEXT[], prev_hash TEXT, hash TEXT NOT NULL UNIQUE ); -- Create append-only trigger — prevent modification CREATE OR REPLACE FUNCTION prevent_audit_modification() RETURNS TRIGGER AS $$ BEGIN RAISE EXCEPTION 'Audit logs are immutable — modification not allowed'; END; $$ LANGUAGE plpgsql; CREATE TRIGGER audit_logs_immutable BEFORE UPDATE OR DELETE ON audit_logs FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification(); -- Indexes for common queries CREATE INDEX idx_audit_actor_id ON audit_logs(actor_id); CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id); CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC); CREATE INDEX idx_audit_action ON audit_logs(action); -- Query: All actions by a user in the last 30 days SELECT timestamp, action, resource_type, resource_id, result, ip_address FROM audit_logs WHERE actor_id = 'user-123' AND timestamp > NOW() - INTERVAL '30 days' ORDER BY timestamp DESC; -- Query: All access to a specific patient record SELECT timestamp, actor_id, action, result, ip_address, reason FROM audit_logs WHERE resource_type = 'patient_record' AND resource_id = 'patient-456' ORDER BY timestamp DESC; -- Query: Failed access attempts (security monitoring) SELECT actor_id, COUNT(*) as failed_attempts, MAX(timestamp) as last_attempt FROM audit_logs WHERE result IN ('failure', 'denied') AND timestamp > NOW() - INTERVAL '24 hours' GROUP BY actor_id HAVING COUNT(*) > 5 ORDER BY failed_attempts DESC;
AWS CloudTrail + Immutable S3
# Create CloudTrail that logs to immutable S3 bucket aws cloudtrail create-trail \ --name compliance-audit-trail \ --s3-bucket-name my-audit-logs-bucket \ --include-global-service-events \ --is-multi-region-trail \ --enable-log-file-validation # Cryptographic log integrity # Create S3 bucket with Object Lock (WORM - Write Once Read Many) aws s3api create-bucket \ --bucket my-audit-logs-bucket \ --object-lock-enabled-for-bucket # Set default retention: compliance mode (cannot delete even by admin) aws s3api put-object-lock-configuration \ --bucket my-audit-logs-bucket \ --object-lock-configuration '{ "ObjectLockEnabled": "Enabled", "Rule": { "DefaultRetention": { "Mode": "COMPLIANCE", "Years": 6 } } }' # Verify log file integrity (CloudTrail) aws cloudtrail validate-logs \ --trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/compliance-audit-trail \ --start-time 2024-01-01T00:00:00Z
FastAPI Audit Middleware (Python)
from fastapi import FastAPI, Request import time app = FastAPI() @app.middleware("http") async def audit_middleware(request: Request, call_next): start_time = time.time() # Extract context before processing actor_id = None if hasattr(request.state, 'user'): actor_id = request.state.user.id response = await call_next(request) duration_ms = int((time.time() - start_time) * 1000) # Determine result from status code if response.status_code < 400: result = "success" elif response.status_code == 403: result = "denied" else: result = "failure" # Log asynchronously to avoid blocking response import asyncio asyncio.create_task(audit_logger.log( actor_id=actor_id or "anonymous", actor_type="user" if actor_id else "anonymous", action=f"{request.method.lower()}:{request.url.path}", resource_type="api_endpoint", resource_id=str(request.url.path), result=result, ip_address=request.client.host, duration_ms=duration_ms, http_status=response.status_code, )) return response
Compliance Checklist
- All required fields logged (actor, action, resource, timestamp, IP, result)
- Tamper-evidence implemented (hash chaining or digital signatures)
- Append-only storage (trigger prevents UPDATE/DELETE, or S3 Object Lock)
- Log aggregation centralized (avoid local file logs that can be deleted)
- Retention policy matches regulation (HIPAA: 6yr, PCI: 1yr+1yr archive)
- Alerts for anomalies (failed logins, access spikes, unusual hours)
- Logs reviewed periodically (monthly for SOC 2, daily for high-security)
- Log access itself is audited (who viewed audit logs)
- Backup of audit logs separate from primary storage
- WORM storage for regulatory compliance archives