Marketplace postgres-performance
High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues.
install
source · Clone the upstream repo
git clone https://github.com/aiskillstore/marketplace
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/aiskillstore/marketplace "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/cjharmath/postgres-performance" ~/.claude/skills/aiskillstore-marketplace-postgres-performance && rm -rf "$T"
manifest:
skills/cjharmath/postgres-performance/SKILL.mdsource content
PostgreSQL Performance Engineering
Problem Statement
Performance problems compound. A query that takes 50ms at 1K rows takes 5s at 100K rows. This skill covers patterns for building performant database interactions from the start and fixing performance issues.
Pattern: Query Optimization Workflow
Step 1: Identify Slow Queries
-- Enable pg_stat_statements (if not already) CREATE EXTENSION IF NOT EXISTS pg_stat_statements; -- Find slowest queries SELECT query, calls, round(mean_exec_time::numeric, 2) as avg_ms, round(total_exec_time::numeric, 2) as total_ms, rows FROM pg_stat_statements WHERE calls > 10 ORDER BY mean_exec_time DESC LIMIT 20;
Step 2: Analyze Query Plan
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT * FROM assessments WHERE user_id = 'abc-123' ORDER BY created_at DESC LIMIT 10;
What to look for:
| Warning Sign | Problem | Solution |
|---|---|---|
| Seq Scan on large table | Missing index | Add index |
High count | N+1 in join | Rewrite query, add index |
| Sort with high cost | No index for ORDER BY | Covering index |
| Hash/Merge Join with high rows | Large intermediate result | Filter earlier, better indexes |
| Buffers: shared read high | Data not cached | More RAM, or query less data |
Step 3: Fix and Verify
-- Add index CREATE INDEX CONCURRENTLY ix_assessments_user_created ON assessments (user_id, created_at DESC); -- Verify improvement EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM assessments WHERE user_id = 'abc-123' ORDER BY created_at DESC LIMIT 10; -- Should now show "Index Scan" instead of "Seq Scan"
Pattern: Covering Indexes (Index-Only Scans)
Problem: Query reads index, then fetches rows from table (heap fetch).
-- Query SELECT id, title, status FROM assessments WHERE user_id = ?; -- Regular index: requires heap fetch CREATE INDEX ix_assessments_user ON assessments (user_id); -- Plan: Index Scan + Heap Fetches -- ✅ Covering index: all columns in index CREATE INDEX ix_assessments_user_covering ON assessments (user_id) INCLUDE (id, title, status); -- Plan: Index Only Scan (no heap fetch, much faster)
When to use:
- Frequently run queries
- Queries selecting few columns
- Tables with many columns (heap fetch is expensive)
Pattern: Pagination at Scale
-- ❌ SLOW: OFFSET-based pagination SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000; -- Must scan and discard 10,000 rows! -- ✅ FAST: Cursor-based (keyset) pagination SELECT * FROM events WHERE created_at < '2024-01-15T10:30:00Z' -- Last seen timestamp ORDER BY created_at DESC LIMIT 20; -- Jumps directly to the right place via index -- For compound cursor (when duplicates possible): SELECT * FROM events WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id') ORDER BY created_at DESC, id DESC LIMIT 20;
In SQLAlchemy:
# Cursor-based pagination async def get_events_page( session: AsyncSession, cursor_time: datetime | None, cursor_id: UUID | None, limit: int = 20, ) -> list[Event]: query = select(Event).order_by(Event.created_at.desc(), Event.id.desc()) if cursor_time and cursor_id: query = query.where( tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id) ) result = await session.execute(query.limit(limit)) return result.scalars().all()
Pattern: Batch Processing
-- ❌ SLOW: One huge query/update UPDATE events SET processed = true WHERE processed = false; -- Locks millions of rows, times out -- ✅ FAST: Batch processing DO $$ DECLARE batch_size INT := 10000; rows_affected INT; BEGIN LOOP UPDATE events SET processed = true WHERE id IN ( SELECT id FROM events WHERE processed = false LIMIT batch_size FOR UPDATE SKIP LOCKED ); GET DIAGNOSTICS rows_affected = ROW_COUNT; IF rows_affected = 0 THEN EXIT; END IF; COMMIT; PERFORM pg_sleep(0.1); -- Brief pause to let other queries through END LOOP; END $$;
In Python:
async def process_in_batches(session: AsyncSession, batch_size: int = 10000): while True: result = await session.execute( text(""" UPDATE events SET processed = true WHERE id IN ( SELECT id FROM events WHERE processed = false LIMIT :batch_size FOR UPDATE SKIP LOCKED ) RETURNING id """), {"batch_size": batch_size} ) updated = result.fetchall() await session.commit() if len(updated) == 0: break await asyncio.sleep(0.1)
Pattern: Efficient Aggregations
-- ❌ SLOW: Count with complex WHERE SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active'; -- Scans all matching rows -- ✅ FAST: Approximate count (for large tables) SELECT reltuples::bigint AS estimate FROM pg_class WHERE relname = 'events'; -- ✅ FAST: Maintain counter cache -- Add column: assessments.answer_count -- Update on INSERT/DELETE to answers -- ✅ FAST: Materialized view for complex aggregations CREATE MATERIALIZED VIEW user_stats AS SELECT user_id, COUNT(*) as total_assessments, AVG(rating) as avg_rating FROM assessments GROUP BY user_id; -- Refresh periodically REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
Pattern: Connection Pool Tuning
# Async SQLAlchemy with proper pool settings from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool # For serverless/Lambda (no persistent connections) engine = create_async_engine( DATABASE_URL, poolclass=NullPool, # New connection per request ) # For long-running servers engine = create_async_engine( DATABASE_URL, poolclass=AsyncAdaptedQueuePool, pool_size=10, # Base connections max_overflow=20, # Extra connections under load pool_timeout=30, # Wait for connection pool_recycle=1800, # Recycle connections every 30 min pool_pre_ping=True, # Test connection before use )
PostgreSQL side:
-- Check max connections SHOW max_connections; -- Default 100 -- See current connections SELECT count(*) FROM pg_stat_activity; -- Connection per application SELECT application_name, count(*) FROM pg_stat_activity GROUP BY application_name;
Pattern: Read Replicas
# Route reads to replica, writes to primary from sqlalchemy import create_engine from sqlalchemy.orm import Session primary_engine = create_async_engine(PRIMARY_URL) replica_engine = create_async_engine(REPLICA_URL) class RoutingSession(Session): def get_bind(self, mapper=None, clause=None): if self._flushing or self.is_modified(): return primary_engine.sync_engine return replica_engine.sync_engine
Pattern: Denormalization for Read Performance
-- ❌ SLOW: Joining 4 tables for common query SELECT a.id, a.title, u.name as user_name, COUNT(q.id) as question_count, AVG(ans.value) as avg_score FROM assessments a JOIN users u ON a.user_id = u.id JOIN questions q ON q.assessment_id = a.id LEFT JOIN answers ans ON ans.question_id = q.id GROUP BY a.id, a.title, u.name; -- ✅ FAST: Denormalized columns ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100); ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0; ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2); -- Update via triggers or application code -- Query becomes simple: SELECT id, title, user_name, question_count, avg_score FROM assessments;
Tradeoffs:
- ✅ Much faster reads
- ❌ More complex writes (must update denormalized data)
- ❌ Potential for stale data
Pattern: Partitioning Large Tables
-- Partition events by month CREATE TABLE events ( id UUID PRIMARY KEY, user_id UUID NOT NULL, event_type VARCHAR(50), created_at TIMESTAMPTZ NOT NULL ) PARTITION BY RANGE (created_at); -- Create partitions CREATE TABLE events_2024_01 PARTITION OF events FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); CREATE TABLE events_2024_02 PARTITION OF events FOR VALUES FROM ('2024-02-01') TO ('2024-03-01'); -- Query specific partition (fast) SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01'; -- Drop old data instantly DROP TABLE events_2023_01; -- Much faster than DELETE
Pattern: Caching Strategy
# Cache frequently-read, rarely-changed data import redis.asyncio as redis import json cache = redis.from_url(REDIS_URL) async def get_user_stats(user_id: UUID) -> UserStats: cache_key = f"user_stats:{user_id}" # Try cache first cached = await cache.get(cache_key) if cached: return UserStats.model_validate_json(cached) # Query database async with get_session() as session: stats = await calculate_user_stats(session, user_id) # Cache for 5 minutes await cache.setex(cache_key, 300, stats.model_dump_json()) return stats # Invalidate on write async def update_user_assessment(user_id: UUID, ...): # ... update database ... await cache.delete(f"user_stats:{user_id}")
Performance Monitoring Queries
-- Table bloat (needs VACUUM) SELECT schemaname, relname, n_dead_tup as dead_tuples, n_live_tup as live_tuples, round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct FROM pg_stat_user_tables WHERE n_dead_tup > 10000 ORDER BY n_dead_tup DESC; -- Index bloat SELECT indexrelname as index, pg_size_pretty(pg_relation_size(indexrelid)) as size, idx_scan as scans FROM pg_stat_user_indexes WHERE idx_scan = 0 -- Unused indexes ORDER BY pg_relation_size(indexrelid) DESC; -- Cache hit ratio (should be > 99%) SELECT sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio FROM pg_stat_database; -- Long-running queries SELECT pid, now() - query_start as duration, query FROM pg_stat_activity WHERE state = 'active' AND query NOT LIKE '%pg_stat%' AND now() - query_start > interval '30 seconds';
Performance Checklist
Before deploying:
- Slow queries identified and optimized
- Indexes match query patterns
- Covering indexes for frequent queries
- Pagination uses cursor-based (not OFFSET)
- Large tables partitioned if > 10M rows
- Connection pool sized appropriately
- Cache layer for hot data
- Monitoring in place for slow queries
- VACUUM and ANALYZE scheduled