Claude-skill-registry clickhouse-streaming
Use when ingesting continuous data streams from Kafka, RabbitMQ, or Kinesis into ClickHouse. Covers backpressure handling, exactly-once semantics, stream processing patterns, and performance optimization. NOT for database replication (see clickhouse-cdc) or batch ETL (see clickhouse-patterns).
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/clickhouse-streaming" ~/.claude/skills/majiayu000-claude-skill-registry-clickhouse-streaming && rm -rf "$T"
skills/data/clickhouse-streaming/SKILL.mdClickHouse Streaming Patterns
Overview
Stream processing continuously ingests data from message queues into ClickHouse. Core principle: Balance throughput with latency, handle failures gracefully, ensure exactly-once delivery.
Key challenge: ClickHouse is optimized for batch inserts. Streaming requires careful tuning to avoid "too many parts" while maintaining low latency.
When to Use
Symptoms:
- Need to ingest Kafka/RabbitMQ messages into ClickHouse
- Real-time analytics dashboard (< 1 minute latency)
- IoT sensor data, application logs, clickstream
- Message rate varies (need backpressure handling)
When NOT to use:
- Database replication (PostgreSQL/MySQL) → See
clickhouse-cdc - Scheduled batch jobs (hourly/daily) → See
clickhouse-patterns - One-time data import → Use batch INSERT
Prerequisites
- Understanding of message queue semantics (Kafka offsets, RabbitMQ acks)
- Basic ClickHouse knowledge → See
clickhouse-patterns - Familiarity with stream processing concepts
Quick Reference
Method Selection
digraph streaming_methods { rankdir=TD; node [shape=box, style=rounded]; start [label="Choose Method", shape=ellipse]; native [label="ClickHouse native?", shape=diamond]; control [label="Need custom logic?", shape=diamond]; kafka_engine [label="Kafka Engine\n(native, simple)"]; kafka_connect [label="Kafka Connect\n(custom transform)"]; custom_service [label="Custom Service\n(full control)"]; start -> native; native -> kafka_engine [label="Kafka"]; native -> control [label="RabbitMQ/Kinesis"]; control -> kafka_connect [label="no"]; control -> custom_service [label="yes"]; }
| Method | Best For | Pros | Cons |
|---|---|---|---|
| Kafka Engine | Simple Kafka → CH | Native, no code | Limited transformation |
| Custom Service | Complex logic, any queue | Full control | Must handle failures |
Critical Settings
| Setting | Low Latency | High Throughput |
|---|---|---|
| Batch size | 1000 rows | 50000 rows |
| Flush interval | 500ms | 5000ms |
| Target latency | < 1 second | 5-10 seconds |
Pattern 1: ClickHouse Kafka Engine
Setup
-- 1. Kafka staging table CREATE TABLE events_queue ( timestamp UInt64, user_id String, event_type String, properties String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka:9092', kafka_topic_list = 'events', kafka_group_name = 'clickhouse_consumers', kafka_format = 'JSONEachRow', kafka_num_consumers = 4, -- Match Kafka partitions kafka_max_block_size = 65536, -- Batch size kafka_poll_timeout_ms = 1000; -- Wait time -- 2. Target table CREATE TABLE events ( date Date, timestamp DateTime, user_id String, event_type LowCardinality(String), properties String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (date, user_id, timestamp); -- 3. Materialized view CREATE MATERIALIZED VIEW events_mv TO events AS SELECT toDate(toDateTime(timestamp)) AS date, toDateTime(timestamp) AS timestamp, user_id, event_type, properties FROM events_queue;
Tuning Parameters
SETTINGS kafka_max_block_size = 65536, -- Batch size (larger = higher latency) kafka_poll_timeout_ms = 1000, -- Poll wait (longer = better batching) kafka_num_consumers = 4, -- Parallel consumers kafka_skip_broken_messages = 100, -- Skip errors (use carefully) kafka_commit_every_batch = 1; -- Commit frequency
Monitoring
-- Consumer status SELECT database, table, consumer_id, exceptions, last_exception_time FROM system.kafka_consumers; -- Consumption lag SELECT table, partition_id, current_offset - last_committed_offset AS lag FROM system.kafka_consumers;
Pattern 2: Custom Service (Node.js)
Use Case
- Complex transformations
- Multiple sources (RabbitMQ, Kinesis)
- Custom error handling
Implementation
import { Kafka } from 'kafkajs'; import { ClickHouse } from 'clickhouse'; class StreamProcessor { private buffer: any[] = []; private readonly BATCH_SIZE = 5000; private readonly FLUSH_INTERVAL_MS = 1000; async start() { const consumer = kafka.consumer({ groupId: 'events-processor' }); await consumer.subscribe({ topic: 'events' }); setInterval(() => this.flush(), this.FLUSH_INTERVAL_MS); await consumer.run({ eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, heartbeat }) => { for (const message of batch.messages) { const event = JSON.parse(message.value.toString()); this.buffer.push({ date: new Date(event.timestamp).toISOString().split('T')[0], timestamp: new Date(event.timestamp), user_id: event.userId, event_type: event.type, properties: JSON.stringify(event.properties) }); if (this.buffer.length >= this.BATCH_SIZE) await this.flush(); await resolveOffset(message.offset); await heartbeat(); } } }); } async flush() { if (this.buffer.length === 0) return; try { await clickhouse.insert('events', this.buffer); console.log(`Flushed ${this.buffer.length} events`); this.buffer = []; } catch (err) { await this.retryWithBackoff(this.buffer); } } async retryWithBackoff(batch: any[], attempt = 1) { if (attempt > 3) { console.error('Max retries, sending to DLQ'); return; } const delay = Math.min(1000 * Math.pow(2, attempt), 10000); await new Promise(r => setTimeout(r, delay)); try { await clickhouse.insert('events', batch); } catch (err) { await this.retryWithBackoff(batch, attempt + 1); } } }
Exactly-Once Semantics
Challenge
Kafka at-least-once + ClickHouse idempotency = exactly-once
Solution 1: Deduplication Table
CREATE TABLE processed_messages ( message_id String, processed_at DateTime ) ENGINE = MergeTree() ORDER BY message_id TTL processed_at + INTERVAL 7 DAY; -- Insert only new messages INSERT INTO events SELECT * FROM events_staging WHERE message_id NOT IN (SELECT message_id FROM processed_messages);
Solution 2: ReplacingMergeTree
CREATE TABLE events ( message_id String, -- Kafka offset or UUID timestamp DateTime, user_id String ) ENGINE = ReplacingMergeTree() ORDER BY message_id; SELECT * FROM events FINAL WHERE user_id = 'user-123';
Backpressure Handling
Bounded Buffer Pattern
class BackpressureBuffer { private buffer: any[] = []; private readonly MAX_BUFFER_SIZE = 100000; private processing = false; async add(item: any) { while (this.buffer.length >= this.MAX_BUFFER_SIZE) { await new Promise(r => setTimeout(r, 100)); } this.buffer.push(item); } async tryFlush() { if (this.processing || this.buffer.length < BATCH_SIZE) return; this.processing = true; const batch = this.buffer.splice(0, BATCH_SIZE); try { await clickhouse.insert('events', batch); } finally { this.processing = false; } } }
Pause Consumer Pattern
consumer.run({ eachBatch: async ({ batch, pause }) => { if (buffer.length > MAX_BUFFER_SIZE) { const pauseHandle = pause(); await flush(); pauseHandle.resume(); } } });
Performance Optimization
Async Inserts
SET async_insert = 1; SET wait_for_async_insert = 0; SET async_insert_max_data_size = 10485760; -- 10MB SET async_insert_busy_timeout_ms = 1000; -- 1s
Parallel Consumers
-- Match number to Kafka partitions SETTINGS kafka_num_consumers = 8;
Common Mistakes
| Mistake | Why It Fails | Fix |
|---|---|---|
| Small batches | Too many parts | Minimum 1000 rows |
| No backpressure | Memory overflow | Bounded buffer + pause |
| Sync inserts | Low throughput | Use async_insert = 1 |
| No error handling | Lost messages | Retry + DLQ |
| Single consumer | Low throughput | Parallel = partitions |
Monitoring
-- Throughput SELECT toStartOfMinute(now()) AS minute, count() AS events_per_minute FROM events WHERE timestamp >= now() - INTERVAL 5 MINUTE GROUP BY minute; -- Lag analysis SELECT quantile(0.50)(now() - timestamp) AS median_lag, quantile(0.95)(now() - timestamp) AS p95_lag FROM events WHERE timestamp >= now() - INTERVAL 1 MINUTE; -- Errors SELECT toStartOfMinute(event_time) AS minute, count() AS errors FROM system.kafka_consumers WHERE last_exception_time >= now() - INTERVAL 1 HOUR GROUP BY minute;
Best Practices
Design:
- Use Kafka Engine for simple pipelines
- Custom service for complex transformations
- Always implement deduplication
Performance:
- Batch size: 1000-50000 (tune for latency)
- Flush interval: 500ms-5s
- Parallel consumers = Kafka partitions
Reliability:
- Bounded buffer for backpressure
- Exponential backoff retry
- Dead letter queue for failures
- Monitor lag continuously
Red Flags
- ❌ "Insert per message" → Too many parts
- ❌ "Unbounded buffer" → Memory overflow
- ❌ "No lag monitoring" → Stale data
- ❌ "Ignore errors" → Data loss
- ❌ "Sync inserts" → Low throughput
When to Escalate
- Lag > 1 minute despite tuning
- Memory exhaustion with backpressure
- "Too many parts" errors
- Multi-datacenter streaming needed
Resources: ClickHouse Kafka engine docs, system.kafka_consumers table
Related Skills
- ClickHouse fundamentals: See
clickhouse-patterns - Database replication: See
clickhouse-cdc
Remember: Balance latency, throughput, and reliability. Monitor lag, implement backpressure, tune batch sizes for your SLA.