Claude-skill-registry Concurrency and Throughput
Comprehensive guide to concurrency models, throughput optimization, worker pools, task queues, and scaling strategies for high-performance applications
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/concurrency-and-throughput" ~/.claude/skills/majiayu000-claude-skill-registry-concurrency-and-throughput && rm -rf "$T"
skills/data/concurrency-and-throughput/SKILL.mdConcurrency and Throughput
Concurrency vs Parallelism
Definitions
Concurrency:
- Structure: Dealing with many things at once
- Example: Single chef switching between multiple dishes
- Implementation: Event loop, async/await, coroutines
Parallelism:
- Execution: Doing many things at once
- Example: Multiple chefs each cooking a dish
- Implementation: Multiple threads, processes, CPU cores
Visual Representation
Concurrency (Single Core): Time → CPU: [Task A][Task B][Task A][Task C][Task B][Task A] (Context switching between tasks) Parallelism (Multi-Core): Time → CPU 1: [Task A][Task A][Task A][Task A] CPU 2: [Task B][Task B][Task B][Task B] CPU 3: [Task C][Task C][Task C][Task C] (Tasks running simultaneously)
Throughput vs Latency
Definitions
Throughput:
- Measure: Requests per second (RPS)
- Goal: Maximize total work done
- Example: 1000 requests/second
Latency:
- Measure: Time per request (milliseconds)
- Goal: Minimize time for single request
- Example: 50ms per request
Trade-off
Often inverse relationship:
- High Throughput: May increase latency (batching, queuing)
- Low Latency: May reduce throughput (immediate processing)
Example:
Scenario 1: Low Latency - Process each request immediately - Latency: 10ms - Throughput: 100 req/s Scenario 2: High Throughput - Batch 10 requests together - Latency: 50ms (includes wait time) - Throughput: 500 req/s
Node.js Concurrency Model
1. Single-Threaded Event Loop
How it Works:
┌───────────────────────────┐ │ Event Loop │ │ │ │ ┌─────────────────────┐ │ │ │ Call Stack │ │ │ └─────────────────────┘ │ │ ┌─────────────────────┐ │ │ │ Callback Queue │ │ │ └─────────────────────┘ │ │ ┌─────────────────────┐ │ │ │ Microtask Queue │ │ │ └─────────────────────┘ │ └───────────────────────────┘
Characteristics:
- Single JavaScript thread
- Non-blocking I/O
- Event-driven architecture
Example:
console.log('1'); setTimeout(() => console.log('2'), 0); Promise.resolve().then(() => console.log('3')); console.log('4'); // Output: 1, 4, 3, 2 // (Synchronous → Microtasks → Macrotasks)
2. Non-Blocking I/O
Bad (Blocking):
const fs = require('fs'); // Blocks event loop! const data = fs.readFileSync('large-file.txt'); console.log(data);
Good (Non-Blocking):
const fs = require('fs').promises; // Doesn't block event loop const data = await fs.readFile('large-file.txt'); console.log(data);
3. Worker Threads (CPU-Intensive Tasks)
Use Case: CPU-intensive operations (image processing, encryption)
const { Worker } = require('worker_threads'); function runWorker(data) { return new Promise((resolve, reject) => { const worker = new Worker('./worker.js', { workerData: data }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) { reject(new Error(`Worker stopped with exit code ${code}`)); } }); }); } // Usage const result = await runWorker({ input: 'data' });
worker.js:
const { parentPort, workerData } = require('worker_threads'); // CPU-intensive work function processData(data) { // Heavy computation let result = 0; for (let i = 0; i < 1000000000; i++) { result += i; } return result; } const result = processData(workerData); parentPort.postMessage(result);
4. Cluster Mode (Multiple Processes)
Use Case: Utilize all CPU cores
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); // Fork workers for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} died`); cluster.fork(); // Restart worker }); } else { // Workers share TCP connection http.createServer((req, res) => { res.writeHead(200); res.end('Hello World\n'); }).listen(8000); console.log(`Worker ${process.pid} started`); }
Python Concurrency
1. asyncio (Async/Await)
Use Case: I/O-bound tasks (network requests, database queries)
import asyncio import aiohttp async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def main(): urls = ['http://example.com' for _ in range(10)] # Run concurrently tasks = [fetch_url(url) for url in urls] results = await asyncio.gather(*tasks) return results # Run asyncio.run(main())
2. Threading (I/O-Bound Tasks)
Use Case: I/O-bound tasks with libraries that don't support asyncio
import threading import requests def fetch_url(url): response = requests.get(url) return response.text # Create threads threads = [] results = [] for i in range(10): thread = threading.Thread(target=lambda: results.append(fetch_url('http://example.com'))) threads.append(thread) thread.start() # Wait for all threads for thread in threads: thread.join() print(f"Fetched {len(results)} URLs")
3. Multiprocessing (CPU-Bound Tasks)
Use Case: CPU-intensive operations (data processing, ML training)
from multiprocessing import Pool def process_data(data): # CPU-intensive work result = sum(i ** 2 for i in range(1000000)) return result if __name__ == '__main__': with Pool(processes=4) as pool: results = pool.map(process_data, range(10)) print(results)
4. GIL (Global Interpreter Lock) Limitations
Problem:
- Only one thread executes Python bytecode at a time
- Threading doesn't help for CPU-bound tasks
Solution:
- Use
for CPU-bound tasksmultiprocessing - Use
orasyncio
for I/O-bound tasksthreading
Example:
# BAD: Threading for CPU-bound (GIL limits performance) import threading def cpu_intensive(): return sum(i ** 2 for i in range(10000000)) threads = [threading.Thread(target=cpu_intensive) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() # Slower than single-threaded due to GIL! # GOOD: Multiprocessing for CPU-bound from multiprocessing import Pool with Pool(4) as pool: results = pool.map(lambda _: sum(i ** 2 for i in range(10000000)), range(4)) # Actually uses 4 cores!
Concurrency Patterns
1. Worker Pools (Fixed Number of Workers)
Use Case: Limit concurrency to prevent resource exhaustion
Node.js:
class WorkerPool { constructor(maxWorkers = 10) { this.maxWorkers = maxWorkers; this.activeWorkers = 0; this.queue = []; } async execute(task) { // Wait if pool is full while (this.activeWorkers >= this.maxWorkers) { await new Promise(resolve => this.queue.push(resolve)); } this.activeWorkers++; try { return await task(); } finally { this.activeWorkers--; // Process next task in queue const next = this.queue.shift(); if (next) next(); } } } // Usage const pool = new WorkerPool(5); const tasks = Array.from({ length: 100 }, (_, i) => () => fetch(`https://api.example.com/item/${i}`) ); const results = await Promise.all( tasks.map(task => pool.execute(task)) );
Python:
import asyncio from asyncio import Semaphore async def worker_pool(tasks, max_workers=10): semaphore = Semaphore(max_workers) async def execute(task): async with semaphore: return await task() return await asyncio.gather(*[execute(task) for task in tasks]) # Usage tasks = [lambda: fetch_url(f'https://api.example.com/item/{i}') for i in range(100)] results = await worker_pool(tasks, max_workers=5)
2. Task Queues (Redis Queue, Celery, Bull)
Use Case: Decouple request from processing, handle background jobs
Architecture:
Client → API → Queue → Worker → Database ↓ (Redis)
Benefits:
- Asynchronous processing
- Retry failed jobs
- Priority queues
- Rate limiting
3. Rate Limiting (Control Concurrency)
Use Case: Prevent overwhelming external APIs or databases
Node.js (Bottleneck):
const Bottleneck = require('bottleneck'); const limiter = new Bottleneck({ maxConcurrent: 5, // Max 5 concurrent requests minTime: 200 // Min 200ms between requests }); // Wrap API calls const fetchWithLimit = limiter.wrap(async (url) => { const response = await fetch(url); return response.json(); }); // Usage const results = await Promise.all( urls.map(url => fetchWithLimit(url)) );
Python (aiolimiter):
from aiolimiter import AsyncLimiter import asyncio limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 req/s async def fetch_with_limit(url): async with limiter: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() # Usage results = await asyncio.gather(*[fetch_with_limit(url) for url in urls])
4. Backpressure (Slow Down When Overwhelmed)
Use Case: Prevent memory exhaustion from fast producers
Node.js (Streams):
const { Transform } = require('stream'); const processStream = new Transform({ highWaterMark: 10, // Buffer size async transform(chunk, encoding, callback) { // Process data const result = await processData(chunk); // Backpressure: Pause if buffer full if (!this.push(result)) { console.log('Backpressure: Pausing...'); } callback(); } }); inputStream .pipe(processStream) .pipe(outputStream);
Queue-Based Architecture
Benefits
-
Decouple Request from Processing
- API responds immediately
- Processing happens asynchronously
-
Handle Traffic Spikes
- Queue absorbs burst traffic
- Workers process at steady rate
-
Retry Failed Jobs
- Automatic retry with exponential backoff
- Dead letter queue for permanent failures
-
Priority Queues
- Process important jobs first
- Multiple queues with different priorities
Architecture
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Client │ --> │ API │ --> │ Queue │ --> │ Worker │ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │ │ │ │ │ ↓ │ │ ┌──────────┐ │ │ │ Database │ │ │ └──────────┘ ↓ ↓ Response (Redis/RabbitMQ)
Task Queue Systems
1. Node.js: Bull (Redis-Based)
Installation:
npm install bull
Producer (Add Jobs):
const Queue = require('bull'); const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } }); // Add job to queue await emailQueue.add('send-welcome', { to: 'user@example.com', subject: 'Welcome!', body: 'Thanks for signing up' }, { attempts: 3, // Retry 3 times backoff: { type: 'exponential', delay: 2000 // 2s, 4s, 8s }, priority: 1 // Higher = more important }); console.log('Email job queued');
Consumer (Process Jobs):
const Queue = require('bull'); const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } }); // Process jobs emailQueue.process('send-welcome', async (job) => { const { to, subject, body } = job.data; console.log(`Sending email to ${to}...`); await sendEmail(to, subject, body); return { sent: true }; }); // Handle completed jobs emailQueue.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result); }); // Handle failed jobs emailQueue.on('failed', (job, err) => { console.error(`Job ${job.id} failed:`, err); });
2. Python: Celery (RabbitMQ/Redis)
Installation:
pip install celery redis
celery_app.py:
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task(bind=True, max_retries=3) def send_email(self, to, subject, body): try: print(f'Sending email to {to}...') # Send email logic return {'sent': True} except Exception as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Producer:
from celery_app import send_email # Add job to queue result = send_email.delay('user@example.com', 'Welcome!', 'Thanks for signing up') print(f'Job queued: {result.id}')
Worker:
celery -A celery_app worker --loglevel=info
3. BullMQ (Modern Bull Alternative)
Features:
- Better performance
- Better TypeScript support
- More features (rate limiting, job scheduling)
Usage:
const { Queue, Worker } = require('bullmq'); const queue = new Queue('email', { connection: { host: 'localhost', port: 6379 } }); // Add job await queue.add('send-welcome', { to: 'user@example.com' }); // Process jobs const worker = new Worker('email', async (job) => { console.log(`Processing job ${job.id}`); await sendEmail(job.data.to); }, { connection: { host: 'localhost', port: 6379 }, concurrency: 5 // Process 5 jobs concurrently });
Worker Pool Sizing
I/O-Bound Tasks
Rule: More workers than CPU cores
Reasoning:
- Workers spend most time waiting for I/O
- Can handle many concurrent requests
Example:
// 4 CPU cores, but 20 workers (I/O-bound) const pool = new WorkerPool(20);
CPU-Bound Tasks
Rule: Workers = CPU cores
Reasoning:
- More workers = more context switching overhead
- No benefit from more workers than cores
Example:
const numCPUs = require('os').cpus().length; const pool = new WorkerPool(numCPUs); // 4 workers for 4 cores
Mixed Workload
Rule: Separate pools for each type
Example:
const ioPool = new WorkerPool(20); // I/O-bound tasks const cpuPool = new WorkerPool(4); // CPU-bound tasks // Route tasks to appropriate pool if (task.type === 'io') { await ioPool.execute(task); } else { await cpuPool.execute(task); }
Connection Pooling
1. Database Connection Pooling
Why:
- Creating connections is expensive (100-500ms)
- Reuse connections for better performance
PostgreSQL (Prisma):
const { PrismaClient } = require('@prisma/client'); const prisma = new PrismaClient({ datasources: { db: { url: 'postgresql://user:pass@localhost:5432/db?connection_limit=20' } } });
PostgreSQL (pg):
const { Pool } = require('pg'); const pool = new Pool({ host: 'localhost', port: 5432, database: 'mydb', user: 'user', password: 'password', max: 20, // Max connections idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000 }); // Usage const result = await pool.query('SELECT * FROM users WHERE id = $1', [123]);
Python (SQLAlchemy):
from sqlalchemy import create_engine engine = create_engine( 'postgresql://user:pass@localhost:5432/db', pool_size=20, # Max connections max_overflow=10, # Extra connections if pool full pool_timeout=30, # Wait 30s for connection pool_recycle=3600 # Recycle connections after 1h )
2. HTTP Connection Pooling (Keep-Alive)
Node.js (axios):
const axios = require('axios'); const http = require('http'); const https = require('https'); const httpAgent = new http.Agent({ keepAlive: true, maxSockets: 50 }); const httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 50 }); const client = axios.create({ httpAgent, httpsAgent }); // Reuses connections await client.get('https://api.example.com/data');
Python (requests):
import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry session = requests.Session() # Connection pooling adapter = HTTPAdapter( pool_connections=10, pool_maxsize=50, max_retries=Retry(total=3, backoff_factor=0.3) ) session.mount('http://', adapter) session.mount('https://', adapter) # Reuses connections response = session.get('https://api.example.com/data')
3. Redis Connection Pooling
Node.js (ioredis):
const Redis = require('ioredis'); const redis = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: 3, lazyConnect: true, // Connection pool managed automatically });
Async/Await Patterns
1. Promise.all() for Parallel Execution
Sequential (Slow):
const user = await getUser(1); // 100ms const posts = await getPosts(1); // 100ms const comments = await getComments(1); // 100ms // Total: 300ms
Parallel (Fast):
const [user, posts, comments] = await Promise.all([ getUser(1), getPosts(1), getComments(1) ]); // Total: 100ms (all run concurrently)
2. Promise.allSettled() for Error Handling
Problem with Promise.all():
// If one fails, all fail const results = await Promise.all([ fetch('https://api1.com'), fetch('https://api2.com'), // Fails! fetch('https://api3.com') ]); // Throws error, loses results from api1 and api3
Solution with Promise.allSettled():
const results = await Promise.allSettled([ fetch('https://api1.com'), fetch('https://api2.com'), // Fails fetch('https://api3.com') ]); // All results available results.forEach((result, i) => { if (result.status === 'fulfilled') { console.log(`API ${i + 1} succeeded:`, result.value); } else { console.error(`API ${i + 1} failed:`, result.reason); } });
3. Avoid Async Overhead for Sync Operations
Bad:
// Unnecessary async overhead async function add(a, b) { return a + b; } const result = await add(1, 2);
Good:
// Synchronous (no overhead) function add(a, b) { return a + b; } const result = add(1, 2);
Batching Strategies
1. Batch API Requests
Sequential (Slow):
for (const id of userIds) { const user = await fetch(`https://api.example.com/users/${id}`); users.push(user); } // 100 users = 100 requests
Batched (Fast):
// Single request with all IDs const users = await fetch('https://api.example.com/users/batch', { method: 'POST', body: JSON.stringify({ ids: userIds }) }); // 100 users = 1 request
2. Batch Database Writes
Sequential (Slow):
for (const user of users) { await db.users.create({ data: user }); } // 100 users = 100 queries
Batched (Fast):
await db.users.createMany({ data: users }); // 100 users = 1 query
3. Debouncing/Throttling
Debouncing (Wait for Pause):
let timeout; function debounce(func, delay) { return (...args) => { clearTimeout(timeout); timeout = setTimeout(() => func(...args), delay); }; } // Usage: Only search after user stops typing for 300ms const debouncedSearch = debounce(search, 300); input.addEventListener('input', (e) => debouncedSearch(e.target.value));
Throttling (Limit Rate):
let lastRun = 0; function throttle(func, limit) { return (...args) => { const now = Date.now(); if (now - lastRun >= limit) { func(...args); lastRun = now; } }; } // Usage: Only update scroll position every 100ms const throttledScroll = throttle(updateScrollPosition, 100); window.addEventListener('scroll', throttledScroll);
4. DataLoader Pattern (GraphQL)
Problem: N+1 Queries
// Resolver called for each user const posts = users.map(user => getPosts(user.id)); // 100 users = 100 queries
Solution: DataLoader
const DataLoader = require('dataloader'); const postLoader = new DataLoader(async (userIds) => { // Batch query const posts = await db.posts.findMany({ where: { userId: { in: userIds } } }); // Group by userId return userIds.map(id => posts.filter(post => post.userId === id)); }); // Usage (automatically batched) const posts = await Promise.all(users.map(user => postLoader.load(user.id))); // 100 users = 1 query
Streaming for Large Datasets
1. Stream Processing (Not Loading All in Memory)
Bad (Loads All in Memory):
const users = await db.users.findMany(); // 1 million users! for (const user of users) { await processUser(user); } // Memory: 1GB+
Good (Streams):
const { Readable } = require('stream'); const userStream = Readable.from(db.users.findManyStream()); userStream.on('data', async (user) => { await processUser(user); }); userStream.on('end', () => { console.log('Done processing users'); }); // Memory: ~10MB
2. Backpressure Handling
Problem:
// Producer faster than consumer for (let i = 0; i < 1000000; i++) { stream.write(data); // Fills memory! }
Solution:
for (let i = 0; i < 1000000; i++) { const canContinue = stream.write(data); if (!canContinue) { // Wait for drain event await new Promise(resolve => stream.once('drain', resolve)); } }
3. Node.js Streams
Example: Process Large CSV
const fs = require('fs'); const csv = require('csv-parser'); fs.createReadStream('large-file.csv') .pipe(csv()) .on('data', (row) => { // Process each row processRow(row); }) .on('end', () => { console.log('CSV processed'); });
4. Python Generators/Iterators
Example: Process Large File
def read_large_file(file_path): with open(file_path, 'r') as f: for line in f: yield line.strip() # Process line by line (low memory) for line in read_large_file('large-file.txt'): process_line(line)
Load Balancing
1. Round-Robin
How it Works:
- Distribute requests evenly across servers
- Server 1 → Server 2 → Server 3 → Server 1 → ...
Pros:
- Simple
- Even distribution
Cons:
- Doesn't consider server load
2. Least Connections
How it Works:
- Send request to server with fewest active connections
Pros:
- Better for long-running requests
- Adapts to server load
Cons:
- More complex
3. Consistent Hashing
How it Works:
- Hash request (e.g., user ID) to determine server
- Same user always goes to same server
Pros:
- Sticky sessions (useful for caching)
- Minimal redistribution when adding/removing servers
Cons:
- Uneven distribution possible
Implementation:
const HashRing = require('hashring'); const ring = new HashRing([ 'server1:3000', 'server2:3000', 'server3:3000' ]); // Get server for user const server = ring.get(`user:${userId}`); console.log(`Route user ${userId} to ${server}`);
4. Sticky Sessions (When Needed)
Use Case: Session data stored in server memory
Implementation (Nginx):
upstream backend { ip_hash; # Sticky sessions based on IP server server1:3000; server server2:3000; server server3:3000; }
Horizontal Scaling
1. Stateless Services (Can Add More Instances)
Stateless (Good):
// No shared state app.get('/api/users/:id', async (req, res) => { const user = await db.users.findUnique({ where: { id: req.params.id } }); res.json(user); }); // Can scale to 100 instances
Stateful (Bad):
// Shared state in memory const cache = new Map(); app.get('/api/users/:id', async (req, res) => { if (cache.has(req.params.id)) { return res.json(cache.get(req.params.id)); } const user = await db.users.findUnique({ where: { id: req.params.id } }); cache.set(req.params.id, user); res.json(user); }); // Can't scale (cache not shared across instances)
Solution: Use Redis for Shared State
app.get('/api/users/:id', async (req, res) => { const cached = await redis.get(`user:${req.params.id}`); if (cached) return res.json(JSON.parse(cached)); const user = await db.users.findUnique({ where: { id: req.params.id } }); await redis.setex(`user:${req.params.id}`, 3600, JSON.stringify(user)); res.json(user); }); // Can scale (Redis shared across instances)
2. Distributed Task Queues
See "Task Queue Systems" section above
3. Shared Nothing Architecture
Principles:
- No shared state between instances
- Each instance independent
- Communicate via message queues or APIs
Benefits:
- Easy to scale horizontally
- No single point of failure
- Better fault tolerance
Measuring Concurrency
1. Concurrent Requests (Active at One Time)
Metric: Number of requests being processed simultaneously
Monitoring:
let activeRequests = 0; app.use((req, res, next) => { activeRequests++; res.on('finish', () => { activeRequests--; }); next(); }); // Expose metric app.get('/metrics', (req, res) => { res.json({ activeRequests }); });
2. Queue Depth
Metric: Number of jobs waiting in queue
Monitoring (Bull):
const waiting = await queue.getWaitingCount(); const active = await queue.getActiveCount(); const failed = await queue.getFailedCount(); console.log(`Queue depth: ${waiting}, Active: ${active}, Failed: ${failed}`);
3. Worker Utilization
Metric: Percentage of time workers are busy
Formula:
Utilization = (Active Workers / Total Workers) × 100%
Goal: 70-80% (not 100%, leave headroom for spikes)
4. P95/P99 Latency Under Load
Metric: 95th/99th percentile latency
Monitoring:
const latencies = []; app.use((req, res, next) => { const start = Date.now(); res.on('finish', () => { latencies.push(Date.now() - start); }); next(); }); // Calculate P95 function getP95() { const sorted = latencies.sort((a, b) => a - b); const index = Math.floor(sorted.length * 0.95); return sorted[index]; }
Benchmarking Tools
1. Apache Bench (ab)
# 1000 requests, 10 concurrent ab -n 1000 -c 10 http://localhost:3000/api/users
2. wrk
# 10 threads, 100 connections, 30 seconds wrk -t10 -c100 -d30s http://localhost:3000/api/users
3. wrk2
# Constant throughput: 1000 req/s wrk2 -t10 -c100 -d30s -R1000 http://localhost:3000/api/users
4. k6
// script.js import http from 'k6/http'; export let options = { vus: 100, // 100 virtual users duration: '30s' }; export default function() { http.get('http://localhost:3000/api/users'); }
k6 run script.js
5. autocannon (Node.js)
# 50 connections, 60 seconds autocannon -c 50 -d 60 http://localhost:3000/api/users
Concurrency Anti-Patterns
1. Blocking the Event Loop (Node.js)
Bad:
app.get('/api/compute', (req, res) => { // Blocks event loop for 5 seconds! let result = 0; for (let i = 0; i < 5000000000; i++) { result += i; } res.json({ result }); });
Good:
app.get('/api/compute', async (req, res) => { // Offload to worker thread const result = await runWorker({ iterations: 5000000000 }); res.json({ result }); });
2. Unbounded Concurrency (Resource Exhaustion)
Bad:
// No limit on concurrent requests const results = await Promise.all( urls.map(url => fetch(url)) ); // 10,000 URLs = 10,000 concurrent requests!
Good:
// Limit to 10 concurrent requests const pool = new WorkerPool(10); const results = await Promise.all( urls.map(url => pool.execute(() => fetch(url))) );
3. Race Conditions
Bad:
let counter = 0; async function increment() { const current = counter; await delay(10); counter = current + 1; } // Race condition! await Promise.all([increment(), increment()]); console.log(counter); // Expected: 2, Actual: 1
Good:
// Use atomic operations (Redis INCR) await redis.incr('counter'); await redis.incr('counter'); const counter = await redis.get('counter'); console.log(counter); // 2 (correct)
4. Deadlocks
Bad:
// Lock A then Lock B async function task1() { await lockA.acquire(); await lockB.acquire(); // Work lockB.release(); lockA.release(); } // Lock B then Lock A (deadlock!) async function task2() { await lockB.acquire(); await lockA.acquire(); // Work lockA.release(); lockB.release(); }
Good:
// Always acquire locks in same order async function task1() { await lockA.acquire(); await lockB.acquire(); // Work lockB.release(); lockA.release(); } async function task2() { await lockA.acquire(); // Same order! await lockB.acquire(); // Work lockB.release(); lockA.release(); }
Real-World Scenarios
Scenario 1: High-Throughput API
Requirements:
- Handle 10,000 req/s
- P95 latency < 100ms
Solution:
// 1. Cluster mode (use all CPU cores) const cluster = require('cluster'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { // 2. Connection pooling const prisma = new PrismaClient({ datasources: { db: { url: 'postgresql://...?connection_limit=20' } } }); // 3. Caching const redis = new Redis(); app.get('/api/users/:id', async (req, res) => { // Check cache const cached = await redis.get(`user:${req.params.id}`); if (cached) return res.json(JSON.parse(cached)); // Query database const user = await prisma.user.findUnique({ where: { id: req.params.id } }); // Cache result await redis.setex(`user:${req.params.id}`, 3600, JSON.stringify(user)); res.json(user); }); app.listen(3000); }
Scenario 2: Batch Job Processing
Requirements:
- Process 1 million records
- Complete within 1 hour
Solution:
const { Queue, Worker } = require('bullmq'); // Producer: Add jobs to queue const queue = new Queue('process-records'); const records = await db.records.findMany({ take: 1000000 }); for (const record of records) { await queue.add('process', { recordId: record.id }); } // Consumer: Process jobs in parallel const worker = new Worker('process-records', async (job) => { const record = await db.records.findUnique({ where: { id: job.data.recordId } }); await processRecord(record); }, { concurrency: 100 // 100 concurrent workers }); // 1,000,000 records / 100 workers / 3600s = ~3 records/s per worker
Scenario 3: WebSocket Connections
Requirements:
- Support 100,000 concurrent WebSocket connections
Solution:
const cluster = require('cluster'); const numCPUs = require('os').cpus().length; const Redis = require('ioredis'); const { Server } = require('socket.io'); const { createAdapter } = require('@socket.io/redis-adapter'); if (cluster.isMaster) { for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { const io = new Server(3000); // Redis adapter for multi-instance support const pubClient = new Redis(); const subClient = pubClient.duplicate(); io.adapter(createAdapter(pubClient, subClient)); io.on('connection', (socket) => { console.log(`Client connected: ${socket.id}`); socket.on('message', (data) => { // Broadcast to all clients (across all instances) io.emit('message', data); }); }); } // 100,000 connections / 8 cores = 12,500 per instance
Scenario 4: File Processing Pipeline
Requirements:
- Process uploaded files asynchronously
- Support multiple file types (images, videos, PDFs)
Solution:
const { Queue, Worker } = require('bullmq'); // Upload endpoint app.post('/api/upload', upload.single('file'), async (req, res) => { const fileId = await saveFile(req.file); // Add to queue based on file type const queueName = getQueueForFileType(req.file.mimetype); await queues[queueName].add('process', { fileId }); res.json({ fileId, status: 'processing' }); }); // Separate workers for each file type const imageWorker = new Worker('images', processImage, { concurrency: 10 }); const videoWorker = new Worker('videos', processVideo, { concurrency: 5 }); const pdfWorker = new Worker('pdfs', processPDF, { concurrency: 20 });
Implementation Examples
Worker Pool in Node.js
class WorkerPool { constructor(maxWorkers = 10) { this.maxWorkers = maxWorkers; this.activeWorkers = 0; this.queue = []; } async execute(task) { while (this.activeWorkers >= this.maxWorkers) { await new Promise(resolve => this.queue.push(resolve)); } this.activeWorkers++; try { return await task(); } finally { this.activeWorkers--; const next = this.queue.shift(); if (next) next(); } } } module.exports = WorkerPool;
Celery Task Queue in Python
# celery_app.py from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def process_data(data_id): # Process data data = fetch_data(data_id) result = expensive_computation(data) save_result(result) return result # producer.py from celery_app import process_data # Add job to queue result = process_data.delay(123) print(f'Job queued: {result.id}') # worker.sh celery -A celery_app worker --loglevel=info --concurrency=10
Connection Pooling Configuration
PostgreSQL (Prisma):
const { PrismaClient } = require('@prisma/client'); const prisma = new PrismaClient({ datasources: { db: { url: 'postgresql://user:pass@localhost:5432/db?connection_limit=20&pool_timeout=20' } } });
Redis (ioredis):
const Redis = require('ioredis'); const redis = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: 3, enableReadyCheck: true, lazyConnect: true });
Summary
Quick Reference
Concurrency vs Parallelism:
- Concurrency: Structure (dealing with many things)
- Parallelism: Execution (doing many things)
Node.js:
- Event loop (single-threaded)
- Worker threads (CPU-intensive)
- Cluster mode (multi-process)
Python:
- asyncio (I/O-bound)
- Threading (I/O-bound, GIL limited)
- Multiprocessing (CPU-bound)
Patterns:
- Worker pools (limit concurrency)
- Task queues (async processing)
- Rate limiting (control rate)
- Backpressure (slow down when overwhelmed)
Worker Pool Sizing:
- I/O-bound: More workers than cores
- CPU-bound: Workers = cores
- Mixed: Separate pools
Tools:
- Bull/BullMQ (Node.js task queue)
- Celery (Python task queue)
- ab, wrk, k6 (benchmarking)
Anti-Patterns:
- Blocking event loop
- Unbounded concurrency
- Race conditions
- Deadlocks
Key Metrics:
- Concurrent requests
- Queue depth
- Worker utilization (70-80%)
- P95/P99 latency