Vibecosystem concurrency-security
TOCTOU prevention, distributed locking, idempotency keys, race condition detection for Node.js and serverless environments.
install
source · Clone the upstream repo
git clone https://github.com/vibeeval/vibecosystem
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/vibeeval/vibecosystem "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/concurrency-security" ~/.claude/skills/vibeeval-vibecosystem-concurrency-security && rm -rf "$T"
manifest:
skills/concurrency-security/SKILL.mdsource content
Concurrency Security
Patterns for preventing race conditions, double-execution, and state corruption in concurrent systems.
TOCTOU Prevention
Time-of-Check to Time-of-Use: the gap between reading state and acting on it.
// WRONG: check then act - another process can change state between lines const balance = await db.accounts.findUnique({ where: { id } }) if (balance.amount >= amount) { await db.accounts.update({ where: { id }, data: { amount: balance.amount - amount } }) } // CORRECT: atomic check-and-update in a single statement const updated = await db.$executeRaw` UPDATE accounts SET amount = amount - ${amount} WHERE id = ${id} AND amount >= ${amount} RETURNING * ` if (updated.count === 0) throw new Error('Insufficient funds or concurrent update')
// File system TOCTOU (Node.js) // WRONG if (fs.existsSync(filePath)) { fs.writeFileSync(filePath, data) // another process may have deleted it } // CORRECT: use O_EXCL flag for exclusive creation import { open } from 'fs/promises' try { const fh = await open(filePath, 'wx') // fail if file exists await fh.writeFile(data) await fh.close() } catch (err: any) { if (err.code === 'EEXIST') { /* already exists, handle */ } throw err }
Distributed Locking with Redis
import Redis from 'ioredis' const redis = new Redis(process.env.REDIS_URL!) // Simple SETNX + TTL lock async function acquireLock(key: string, ttlMs: number): Promise<string | null> { const token = crypto.randomUUID() // SET key token NX PX ttlMs — atomic, returns OK or null const result = await redis.set(`lock:${key}`, token, 'NX', 'PX', ttlMs) return result === 'OK' ? token : null } async function releaseLock(key: string, token: string): Promise<void> { // Lua script: only delete if we own the lock (atomic compare-and-delete) const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end ` await redis.eval(script, 1, `lock:${key}`, token) } // Usage async function processPayment(paymentId: string) { const token = await acquireLock(paymentId, 30_000) // 30s TTL if (!token) throw new Error('Payment already being processed') try { await doPaymentWork(paymentId) } finally { await releaseLock(paymentId, token) } }
Redlock Algorithm (multi-node)
import Redlock from 'redlock' import Redis from 'ioredis' // Connect to 3+ independent Redis nodes for Redlock quorum const clients = [ new Redis('redis://redis1:6379'), new Redis('redis://redis2:6379'), new Redis('redis://redis3:6379'), ] const redlock = new Redlock(clients, { retryCount: 3, retryDelay: 200, retryJitter: 100, }) async function criticalSection(resourceId: string) { await redlock.using([`resource:${resourceId}`], 10_000, async (signal) => { if (signal.aborted) throw signal.error await performAtomicOperation(resourceId) if (signal.aborted) throw signal.error // check after long operations }) }
PostgreSQL Advisory Locks
import { Pool } from 'pg' const pool = new Pool() // Session-level advisory lock (held until released or connection closes) async function withAdvisoryLock<T>(lockId: number, fn: () => Promise<T>): Promise<T> { const client = await pool.connect() try { await client.query('SELECT pg_advisory_lock($1)', [lockId]) return await fn() } finally { await client.query('SELECT pg_advisory_unlock($1)', [lockId]) client.release() } } // Non-blocking try variant — returns false if lock is already held async function tryAdvisoryLock(lockId: number): Promise<boolean> { const client = await pool.connect() try { const { rows } = await client.query('SELECT pg_try_advisory_lock($1) AS acquired', [lockId]) return rows[0].acquired } finally { client.release() } } // Usage const PAYMENT_PROCESSOR_LOCK = 12345 // stable integer per operation type await withAdvisoryLock(PAYMENT_PROCESSOR_LOCK, async () => { await processQueue() })
Idempotency Key Implementation
// Middleware: extract idempotency key from header and dedup in DB import { Request, Response, NextFunction } from 'express' import { db } from './db' export async function idempotencyMiddleware(req: Request, res: Response, next: NextFunction) { const idempotencyKey = req.headers['idempotency-key'] as string | undefined if (!idempotencyKey || req.method === 'GET') return next() // Look up existing result const existing = await db.idempotencyKeys.findUnique({ where: { key: idempotencyKey }, }) if (existing) { // Return cached response — same status and body return res.status(existing.statusCode).json(existing.responseBody) } // Capture response to store it const originalJson = res.json.bind(res) res.json = (body: unknown) => { // Store before sending db.idempotencyKeys.create({ data: { key: idempotencyKey, statusCode: res.statusCode, responseBody: body, expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24h TTL }, }).catch(console.error) return originalJson(body) } next() }
-- DB schema for idempotency keys CREATE TABLE idempotency_keys ( key TEXT PRIMARY KEY, status_code INT NOT NULL, response_body JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL ); CREATE INDEX ON idempotency_keys (expires_at); -- Clean up expired keys via pg_cron or a scheduled job
Atomic Database Operations
// SELECT FOR UPDATE: pessimistic lock on row async function debitAccount(accountId: string, amount: number) { await db.$transaction(async (tx) => { const account = await tx.$queryRaw<Account[]>` SELECT * FROM accounts WHERE id = ${accountId} FOR UPDATE ` if (!account[0] || account[0].balance < amount) { throw new Error('Insufficient funds') } await tx.$executeRaw` UPDATE accounts SET balance = balance - ${amount} WHERE id = ${accountId} ` }) } // UPSERT: atomic insert-or-update (no read-then-write gap) await db.$executeRaw` INSERT INTO user_stats (user_id, login_count, last_login) VALUES (${userId}, 1, NOW()) ON CONFLICT (user_id) DO UPDATE SET login_count = user_stats.login_count + 1, last_login = NOW() `
Double-Submit Prevention
// Form submit: disable button + server-side idempotency key // Client side async function submitForm(data: FormData) { const key = crypto.randomUUID() // generate once per form render const response = await fetch('/api/checkout', { method: 'POST', headers: { 'Idempotency-Key': key }, body: JSON.stringify(data), }) return response.json() } // Payment webhook: verify signature + mark event processed async function handleStripeWebhook(req: Request, res: Response) { const event = stripe.webhooks.constructEvent( req.body, req.headers['stripe-signature']!, process.env.STRIPE_WEBHOOK_SECRET! ) // Atomic insert — unique constraint prevents double-processing try { await db.$executeRaw` INSERT INTO processed_webhook_events (event_id, processed_at) VALUES (${event.id}, NOW()) ` } catch (err: any) { if (err.code === '23505') { // unique_violation return res.status(200).json({ received: true }) // already handled } throw err } await fulfillOrder(event) res.status(200).json({ received: true }) }
Optimistic vs Pessimistic Concurrency Control
// Optimistic: version field — no lock held, conflict detected on save interface Document { id: string content: string version: number } async function updateDocument(id: string, content: string, expectedVersion: number) { const result = await db.$executeRaw` UPDATE documents SET content = ${content}, version = version + 1 WHERE id = ${id} AND version = ${expectedVersion} ` if (result === 0) throw new Error('Conflict: document was modified by another process') } // Pessimistic: FOR UPDATE — lock row for duration of transaction // Use when conflicts are frequent or the cost of retry is high async function updateDocumentPessimistic(id: string, content: string) { await db.$transaction(async (tx) => { await tx.$queryRaw`SELECT 1 FROM documents WHERE id = ${id} FOR UPDATE` await tx.$executeRaw`UPDATE documents SET content = ${content} WHERE id = ${id}` }) }
Serverless Cold Start Race Conditions
// Problem: two cold starts may both try to initialize shared state // Solution: use atomic cloud primitives, not in-process flags // DynamoDB conditional write for distributed init lock import { DynamoDB } from '@aws-sdk/client-dynamodb' const ddb = new DynamoDB({}) async function initializeOnce(jobId: string): Promise<boolean> { try { await ddb.putItem({ TableName: 'distributed-locks', Item: { pk: { S: `init:${jobId}` } }, ConditionExpression: 'attribute_not_exists(pk)', }) return true // this instance won the race } catch (err: any) { if (err.name === 'ConditionalCheckFailedException') return false // another won throw err } }
Testing for Race Conditions
// Run the same operation N times in parallel and assert idempotency async function testConcurrentDebit() { const accountId = await createTestAccount({ balance: 100 }) const debitAmount = 100 // Fire 10 concurrent debit requests const results = await Promise.allSettled( Array.from({ length: 10 }, () => debitAccount(accountId, debitAmount)) ) const successes = results.filter(r => r.status === 'fulfilled') const failures = results.filter(r => r.status === 'rejected') // Exactly one should succeed console.assert(successes.length === 1, `Expected 1 success, got ${successes.length}`) console.assert(failures.length === 9, `Expected 9 failures, got ${failures.length}`) const account = await getAccount(accountId) console.assert(account.balance === 0, `Balance should be 0, got ${account.balance}`) }
Core rule: Every shared mutable resource needs either an atomic operation, a lock, or an idempotency guard. "It works in testing" is not enough — test with parallel load.