Claude-skill-registry clickhouse-cdc
Use when syncing data FROM relational databases (PostgreSQL, MySQL, MongoDB) TO ClickHouse. Covers change data capture using Debezium, Airbyte, or custom triggers. Includes handling schema evolution, DELETE operations, and maintaining consistency. NOT for message queues (see clickhouse-streaming) or query optimization (see clickhouse-patterns).
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/clickhouse-cdc" ~/.claude/skills/majiayu000-claude-skill-registry-clickhouse-cdc && rm -rf "$T"
skills/data/clickhouse-cdc/SKILL.mdClickHouse CDC Patterns
Overview
Change Data Capture (CDC) replicates database changes to ClickHouse for analytics. Core principle: Capture changes at source, transform for column-storage, handle deletes gracefully.
Key challenge: ClickHouse isn't designed for updates/deletes. Use ReplacingMergeTree or append-only patterns.
When to Use
Symptoms:
- Need to sync PostgreSQL/MySQL tables to ClickHouse
- Want real-time analytics on transactional data
- Schema changes breaking replication pipeline
- Unsure how to handle DELETE operations
When NOT to use:
- Streaming from Kafka/message queues → See
clickhouse-streaming - One-time data migration → Use batch ETL
- Query optimization → See
clickhouse-patterns
Prerequisites
- Understanding of source database (PostgreSQL triggers, MySQL binlog)
- Basic ClickHouse knowledge → See
clickhouse-patterns - Familiarity with ReplacingMergeTree engine
Quick Reference
CDC Method Selection
digraph cdc_methods { rankdir=TD; node [shape=box, style=rounded]; start [label="Choose CDC Method", shape=ellipse]; volume [label="High volume?\n(>10k rows/sec)", shape=diamond]; complexity [label="Complex schema?\n(many tables)", shape=diamond]; debezium [label="Debezium\n(production-grade)"]; airbyte [label="Airbyte\n(managed service)"]; custom [label="Custom Triggers\n(simple setup)"]; start -> volume; volume -> debezium [label="yes"]; volume -> complexity [label="no"]; complexity -> airbyte [label="yes"]; complexity -> custom [label="no"]; }
| Method | Best For | Pros | Cons |
|---|---|---|---|
| Debezium | High volume, production | Log-based, no DB impact | Complex setup |
| Airbyte | Multi-source, managed | UI, pre-built connectors | Cost, less control |
| Custom Triggers | Simple, low volume | Easy to understand | DB overhead |
Critical Patterns
| Pattern | Use Case | Key Technique |
|---|---|---|
| Soft Delete | Handle DELETEs | Add flag |
| ReplacingMergeTree | Handle UPDATEs | Use version/timestamp column |
| Schema Evolution | Add columns | Nullable with defaults |
| Backfill | Initial sync | Snapshot then CDC |
Pattern 1: PostgreSQL with Debezium
Architecture
PostgreSQL (WAL) → Debezium → Kafka → ClickHouse Kafka Engine
ClickHouse Setup
-- 1. Kafka staging table CREATE TABLE users_kafka ( id UInt64, name String, email String, updated_at DateTime, _operation String -- INSERT, UPDATE, DELETE ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka:9092', kafka_topic_list = 'dbserver1.public.users', kafka_group_name = 'clickhouse_consumer', kafka_format = 'JSONEachRow'; -- 2. Target table with dedup CREATE TABLE users ( id UInt64, name String, email String, updated_at DateTime, is_deleted UInt8 DEFAULT 0 ) ENGINE = ReplacingMergeTree(updated_at) ORDER BY id; -- 3. Transform with materialized view CREATE MATERIALIZED VIEW users_mv TO users AS SELECT id, name, email, updated_at, if(_operation = 'DELETE', 1, 0) AS is_deleted FROM users_kafka; -- 4. Query active records SELECT * FROM users FINAL WHERE is_deleted = 0;
Debezium Connector
{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "replicator", "database.dbname": "mydb", "database.server.name": "dbserver1", "table.include.list": "public.users,public.orders", "plugin.name": "pgoutput" } }
Pattern 2: PostgreSQL with Custom Triggers
Use Case
- Low volume (< 1k rows/sec)
- Simple schema (1-5 tables)
- No Kafka infrastructure
PostgreSQL Trigger
CREATE OR REPLACE FUNCTION notify_changes() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN PERFORM pg_notify('table_changes', json_build_object( 'table', TG_TABLE_NAME, 'operation', TG_OP, 'data', row_to_json(NEW) )::text); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN PERFORM pg_notify('table_changes', json_build_object( 'table', TG_TABLE_NAME, 'operation', 'DELETE', 'id', OLD.id )::text); RETURN OLD; END IF; END; $$ LANGUAGE plpgsql; CREATE TRIGGER users_notify_trigger AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW EXECUTE FUNCTION notify_changes();
Node.js CDC Service
import { Client } from 'pg'; import { ClickHouse } from 'clickhouse'; const pgClient = new Client({ connectionString: process.env.PG_URL }); const clickhouse = new ClickHouse({ url: process.env.CH_URL }); async function startCDC() { await pgClient.connect(); await pgClient.query('LISTEN table_changes'); pgClient.on('notification', async (msg) => { const { table, operation, data, id } = JSON.parse(msg.payload); if (operation === 'INSERT' || operation === 'UPDATE') { await clickhouse.insert(table, [{ ...data, is_deleted: 0 }]); } else if (operation === 'DELETE') { await clickhouse.query(` ALTER TABLE ${table} UPDATE is_deleted = 1 WHERE id = ${id} `).toPromise(); } }); }
Pattern 3: MySQL Binlog
{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.user": "debezium", "database.server.id": "184054", "database.server.name": "mysql-server", "table.include.list": "mydb.orders,mydb.customers" } }
Key differences: Uses binlog (not WAL), requires
server.id, schema changes tracked separately.
Handling DELETE Operations
Option 1: Soft Delete (Recommended)
CREATE TABLE orders ( id UInt64, user_id UInt64, amount Decimal(10, 2), is_deleted UInt8 DEFAULT 0 ) ENGINE = ReplacingMergeTree(updated_at) ORDER BY id; SELECT * FROM orders FINAL WHERE is_deleted = 0;
Option 2: CollapsingMergeTree
CREATE TABLE orders ( id UInt64, amount Decimal(10, 2), sign Int8 -- 1 = active, -1 = deleted ) ENGINE = CollapsingMergeTree(sign) ORDER BY id; -- Insert INSERT INTO orders VALUES (1, 50.00, 1); -- Delete (insert negative) INSERT INTO orders VALUES (1, 50.00, -1); -- Query SELECT * FROM orders FINAL WHERE sign = 1;
Schema Evolution
Add Columns as Nullable
ALTER TABLE users ADD COLUMN phone String DEFAULT ''; ALTER TABLE users ADD COLUMN country LowCardinality(String) DEFAULT 'UNKNOWN';
Schema Registry Pattern
const schemaRegistry = { 'users': { version: 2, columns: ['id', 'name', 'email', 'phone'] } }; function transformRow(table: string, row: any) { return { ...row, phone: row.phone || '', country: row.country || 'UNKNOWN' }; }
Backfill Strategy
// 1. Snapshot existing data async function backfillTable(tableName: string) { const stream = pgClient.query(`COPY (SELECT * FROM ${tableName}) TO STDOUT`); await pipeline(stream, csvParser(), clickhouse.insert(tableName).stream()); } // 2. Start CDC from snapshot position const lsn = await pgClient.query(`SELECT pg_current_wal_lsn()`);
Common Mistakes
| Mistake | Why It Fails | Fix |
|---|---|---|
| No deduplication | Duplicate events | Use ReplacingMergeTree |
| Hard deletes | Not supported well | Soft delete with flag |
| Ignoring order | Out-of-order updates | Use version/timestamp |
| No backfill | Missing historical data | Snapshot before CDC |
Performance Tips
-- Batch Kafka consumption SETTINGS kafka_max_block_size = 65536, kafka_poll_timeout_ms = 1000; -- Async inserts SET async_insert = 1, wait_for_async_insert = 0;
Monitoring
-- Replication lag SELECT table, max(synced_at) AS last_sync, now() - max(synced_at) AS lag_seconds FROM system.parts WHERE active GROUP BY table; -- Kafka consumer errors SELECT database, table, exceptions, last_exception_time FROM system.kafka_consumers;
Best Practices
Design:
- Use ReplacingMergeTree for tables with updates
- Always include updated_at/version column
- Implement soft deletes, not hard deletes
Operations:
- Monitor replication lag (< 1 minute target)
- Alert on Kafka consumer errors
- Test schema changes in staging
Performance:
- Batch Kafka consumption (kafka_max_block_size)
- Use async inserts
- Partition by time
Red Flags
- ❌ "ALTER TABLE DELETE" → Extremely slow
- ❌ "Updates without version" → Race conditions
- ❌ "No lag monitoring" → Stale analytics
- ❌ "Hard deletes" → Data inconsistency
When to Escalate
- Replication lag > 5 minutes consistently
- Kafka consumer crashes
- Schema changes breaking pipeline
- Multi-datacenter replication needed
Resources: Debezium docs (https://debezium.io), ClickHouse Kafka engine docs
Related Skills
- ClickHouse fundamentals: See
clickhouse-patterns - Streaming from queues: See
clickhouse-streaming
Remember: CDC is about consistency. Always handle deletes, monitor lag, and test schema changes thoroughly.