Trending-skills pgque-postgres-queue
Expert skill for PgQue – a zero-bloat, snapshot-based Postgres queue using pure PL/pgSQL with no C extensions required.
git clone https://github.com/Aradotso/trending-skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/Aradotso/trending-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/pgque-postgres-queue" ~/.claude/skills/aradotso-trending-skills-pgque-postgres-queue && rm -rf "$T"
skills/pgque-postgres-queue/SKILL.mdPgQue – Zero-Bloat Postgres Queue
Skill by ara.so — Daily 2026 Skills collection.
PgQue is a pure PL/pgSQL reimplementation of the battle-tested PgQ (Skype/Postgres) architecture. It uses snapshot-based batching and TRUNCATE-based table rotation instead of row-level locking, delivering zero dead-tuple bloat, predictable performance under sustained load, and native fan-out — all from a single SQL file on any Postgres 14+ instance including managed providers (RDS, Aurora, Cloud SQL, Supabase, Neon).
Key Concepts
- Tick: A periodic snapshot that closes a batch of events. Nothing is delivered until a tick fires.
- Batch: A group of events captured in one tick, consumed atomically by a subscriber.
- Subscriber/Consumer: A named cursor on the event log. Multiple consumers get independent copies of every batch (fan-out).
- Zero bloat: Events are stored in rotating tables and cleared via
, neverTRUNCATE
. No dead tuples.DELETE - Latency trade-off: End-to-end delivery is ~1–2 s (one tick interval + poll). Per-call function latency is microseconds.
Installation
Requirements
- Postgres 14+
(recommended) or an external scheduler callingpg_cron
every secondpgque.ticker()
Install from SQL file
# Clone the repo git clone https://github.com/NikolayS/pgque.git cd pgque # Install in a single transaction PAGER=cat psql --no-psqlrc --single-transaction -d mydb -f sql/pgque.sql
Or inside a
psql session:
begin; \i sql/pgque.sql commit;
Start the ticker (pg_cron)
-- Creates pg_cron jobs for ticker (every 1s) and maintenance (every 30s) select pgque.start();
Start the ticker (without pg_cron)
Run these externally on a schedule:
# Every 1 second psql -d mydb -c "select pgque.ticker()" # Every 30 seconds psql -d mydb -c "select pgque.maint()"
Warning: Without a running ticker, consumers see nothing. Enqueue works, but no batches are created.
Uninstall
\i sql/pgque_uninstall.sql
Roles & Grants
| Role | Use |
|---|---|
| Dashboards, metrics, read-only |
| Producers and consumers (most apps) |
| Operators, migrations |
-- Grant producer/consumer access to app user CREATE USER app_worker WITH PASSWORD '...'; GRANT pgque_writer TO app_worker; -- Grant read-only metrics access CREATE USER metrics_reader WITH PASSWORD '...'; GRANT pgque_reader TO metrics_reader;
Core API (Modern Style)
Create a Queue
SELECT pgque.create_queue('orders');
Subscribe a Consumer
-- Register a named consumer on the queue SELECT pgque.subscribe('orders', 'order-processor');
Send Events (Enqueue)
-- Send a single event (type, data) SELECT pgque.send('orders', 'new_order', '{"order_id": 42, "amount": 99.99}'); -- Send a batch of events SELECT pgque.send_batch('orders', ARRAY[ ROW('new_order', '{"order_id": 43}')::pgque.event_data, ROW('new_order', '{"order_id": 44}')::pgque.event_data ]);
Receive and Acknowledge Events
-- Receive next batch for a consumer (returns batch_id + events) SELECT * FROM pgque.receive('orders', 'order-processor'); -- Acknowledge successful processing (batch_id from receive) SELECT pgque.ack('orders', 'order-processor', :batch_id); -- Negative-acknowledge (retry / dead-letter) SELECT pgque.nack('orders', 'order-processor', :batch_id);
Unsubscribe
SELECT pgque.unsubscribe('orders', 'order-processor');
Low-Level PgQ API
These map directly to the original PgQ primitives and are also available via
pgque_writer:
-- Enqueue a single event SELECT pgque.insert_event('orders', 'new_order', '{"order_id": 42}'); -- Register a consumer SELECT pgque.register_consumer('orders', 'order-processor'); -- Get the next available batch ID SELECT pgque.next_batch('orders', 'order-processor'); -- Returns: batch_id (bigint), or NULL if nothing ready -- Fetch all events in a batch SELECT * FROM pgque.get_batch_events(:batch_id); -- Returns: ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1..4 -- Mark batch as successfully processed SELECT pgque.finish_batch(:batch_id); -- Schedule an event for retry (with delay in seconds) SELECT pgque.event_retry(:batch_id, :ev_id, 60); -- retry in 60s -- Unregister consumer SELECT pgque.unregister_consumer('orders', 'order-processor');
Complete Working Example
Producer (Python with psycopg2)
import psycopg2 import json import os conn = psycopg2.connect(os.environ["DATABASE_URL"]) conn.autocommit = False def enqueue_order(order_id: int, amount: float): with conn.cursor() as cur: cur.execute( "SELECT pgque.send(%s, %s, %s)", ("orders", "new_order", json.dumps({"order_id": order_id, "amount": amount})) ) conn.commit() enqueue_order(42, 99.99)
Consumer (Python with psycopg2)
import psycopg2 import psycopg2.extras import json import os import time conn = psycopg2.connect(os.environ["DATABASE_URL"]) conn.autocommit = False QUEUE = "orders" CONSUMER = "order-processor" def setup(): with conn.cursor() as cur: cur.execute("SELECT pgque.subscribe(%s, %s)", (QUEUE, CONSUMER)) conn.commit() def process_batch(): with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM pgque.receive(%s, %s)", (QUEUE, CONSUMER)) rows = cur.fetchall() if not rows: conn.rollback() return False batch_id = rows[0]["batch_id"] for row in rows: event = json.loads(row["ev_data"]) print(f"Processing order {event['order_id']}") # ... your processing logic ... with conn.cursor() as cur: cur.execute("SELECT pgque.ack(%s, %s, %s)", (QUEUE, CONSUMER, batch_id)) conn.commit() return True setup() while True: if not process_batch(): time.sleep(1) # wait for next tick
Fan-Out Example (Multiple Independent Consumers)
-- One queue, multiple consumers each get ALL events independently SELECT pgque.create_queue('user-events'); SELECT pgque.subscribe('user-events', 'analytics-service'); SELECT pgque.subscribe('user-events', 'notification-service'); SELECT pgque.subscribe('user-events', 'audit-log'); -- Producer sends once SELECT pgque.send('user-events', 'user_signup', '{"user_id": 1}'); -- Each consumer independently receives the same event SELECT * FROM pgque.receive('user-events', 'analytics-service'); SELECT * FROM pgque.receive('user-events', 'notification-service'); SELECT * FROM pgque.receive('user-events', 'audit-log');
Retry / Dead Letter Pattern
-- Using low-level API with retry logic DO $$ DECLARE v_batch_id bigint; v_ev record; BEGIN -- Get next batch SELECT pgque.next_batch('orders', 'order-processor') INTO v_batch_id; IF v_batch_id IS NULL THEN RAISE NOTICE 'No batch available'; RETURN; END IF; -- Process each event FOR v_ev IN SELECT * FROM pgque.get_batch_events(v_batch_id) LOOP BEGIN -- Attempt processing RAISE NOTICE 'Processing event % type %', v_ev.ev_id, v_ev.ev_type; -- On transient failure, retry after 30 seconds -- SELECT pgque.event_retry(v_batch_id, v_ev.ev_id, 30); EXCEPTION WHEN OTHERS THEN -- Schedule retry PERFORM pgque.event_retry(v_batch_id, v_ev.ev_id, 60); RAISE NOTICE 'Event % queued for retry', v_ev.ev_id; END; END LOOP; -- Finish batch (events not retried are acked) PERFORM pgque.finish_batch(v_batch_id); END; $$;
Monitoring & Introspection
-- Queue info (depth, consumer count, last tick) SELECT * FROM pgque.get_queue_info(); SELECT * FROM pgque.get_queue_info('orders'); -- Consumer lag and position SELECT * FROM pgque.get_consumer_info(); SELECT * FROM pgque.get_consumer_info('orders'); SELECT * FROM pgque.get_consumer_info('orders', 'order-processor'); -- Batch details SELECT * FROM pgque.get_batch_info(:batch_id); -- Version SELECT pgque.version();
Configuration & Tuning
Tick Frequency
-- Default: ticker called every 1 second via pg_cron -- To change tick interval, update the pg_cron job: SELECT cron.alter_job( job_id := (SELECT jobid FROM cron.job WHERE command LIKE '%pgque.ticker%'), schedule := '* * * * *' -- every minute (coarser) );
Force Immediate Tick (Testing/Demos)
-- Force a tick right now without waiting for pg_cron SELECT pgque.force_tick('orders'); -- or SELECT pgque.ticker();
pg_cron Log Hygiene
-- Disable run logging (ticker runs every second = 3600 rows/hour) ALTER SYSTEM SET cron.log_run = off; SELECT pg_reload_conf(); -- Or periodically purge: SELECT cron.schedule('pgque-cron-purge', '0 * * * *', $$DELETE FROM cron.job_run_details WHERE end_time < now() - interval '1 hour'$$ );
pg_cron in Different Database
If pg_cron is in
postgres DB but PgQue is in mydb:
-- Run from the pg_cron database (postgres) SELECT cron.schedule_in_database( 'pgque-ticker', '* * * * *', 'SELECT pgque.ticker()', 'mydb' ); SELECT cron.schedule_in_database( 'pgque-maint', '* * * * *', 'SELECT pgque.maint()', 'mydb' );
Common Patterns
Transactional Enqueue (Send with Business Logic)
-- Event is only enqueued if the whole transaction commits BEGIN; INSERT INTO orders (id, amount) VALUES (42, 99.99); SELECT pgque.send('orders', 'new_order', '{"order_id": 42}'); COMMIT;
Queue Depth Check Before Scaling
SELECT queue_name, ev_per_sec, consumer_count, pending_events FROM pgque.get_queue_info() WHERE pending_events > 1000;
List All Consumers with Lag
SELECT queue_name, consumer_name, pending_events AS lag, last_seen FROM pgque.get_consumer_info() ORDER BY lag DESC;
Troubleshooting
Consumers receive nothing
Cause: Ticker is not running.
-- Check if ticker has fired recently SELECT * FROM pgque.get_queue_info('orders'); -- Look at last_tick timestamp -- Manually fire a tick SELECT pgque.ticker(); -- Check pg_cron jobs exist SELECT * FROM cron.job WHERE command LIKE '%pgque%';
Events not appearing after send
-- Confirm ticker is running; force one SELECT pgque.ticker(); -- Check queue exists SELECT * FROM pgque.get_queue_info('orders'); -- Check consumer is registered SELECT * FROM pgque.get_consumer_info('orders', 'my-consumer');
Performance / VACUUM pressure
PgQue is immune to dead-tuple bloat in the event path by design. If you see VACUUM activity, it's from your own application tables, not from PgQue's queue tables.
Retry events not reappearing
-- Maintenance job handles retry scheduling -- Make sure pgque.maint() is running every ~30s SELECT pgque.maint(); -- Check for events in retry state SELECT * FROM pgque.get_queue_info('orders');
Upgrade / Reinstall
Upgrade paths are still being stabilized. To safely reinstall:
psql -d mydb -c "\i sql/pgque_uninstall.sql" psql -d mydb --single-transaction -f sql/pgque.sql psql -d mydb -c "select pgque.start()"
Architecture Summary
Producer → pgque.send() → event tables (rotating) ↓ pgque.ticker() ←── pg_cron (every 1s) ↓ batch snapshot created ↓ Consumer A → pgque.receive() → batch events → pgque.ack() Consumer B → pgque.receive() → same batch → pgque.ack() Consumer C → pgque.receive() → same batch → pgque.ack() ↓ pgque.maint() → TRUNCATE old tables (zero dead tuples)