Pgmicro cdc
Change Data Capture - architecture, entrypoints, bytecode emission, sync engine integration, tests
git clone https://github.com/glommer/pgmicro
T=$(mktemp -d) && git clone --depth=1 https://github.com/glommer/pgmicro "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/cdc" ~/.claude/skills/glommer-pgmicro-cdc && rm -rf "$T"
.claude/skills/cdc/SKILL.mdCDC (Change Data Capture) - Internal Feature Map
Overview
CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table (
turso_cdc by default). It is per-connection, enabled via PRAGMA, and
operates at the bytecode generation (translate) layer. The sync engine consumes CDC records
to push local changes to the remote.
Architecture Diagram
User SQL (INSERT/UPDATE/DELETE/DDL) | v ┌─────────────────────────────────────────────────┐ │ Translate layer (core/translate/) │ │ ┌───────────────────────────────────────────┐ │ │ │ prepare_cdc_if_necessary() │ │ │ │ - checks CaptureDataChangesInfo │ │ │ │ - opens CDC table cursor (OpenWrite) │ │ │ │ - skips if target == CDC table itself │ │ │ └───────────────────────────────────────────┘ │ │ ┌───────────────────────────────────────────┐ │ │ │ emit_cdc_insns() │ │ │ │ - writes (change_id, change_time, │ │ │ │ change_type, table_name, id, │ │ │ │ before, after, updates) into CDC tbl │ │ │ └───────────────────────────────────────────┘ │ │ + emit_cdc_full_record() / emit_cdc_patch_record() │ └─────────────────────────────────────────────────┘ | v CDC table (turso_cdc or custom name) | v ┌─────────────────────────────────────────────────┐ │ Sync engine (sync/engine/) │ │ DatabaseTape reads CDC table → DatabaseChange │ │ → apply/revert → push to remote │ └─────────────────────────────────────────────────┘
Core Data Types
CaptureDataChangesMode
+ CaptureDataChangesInfo
— core/lib.rs
CaptureDataChangesModeCaptureDataChangesInfocore/lib.rsCDC behavior is controlled by two types:
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[repr(u8)] enum CdcVersion { V1 = 1, V2 = 2, } const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2; enum CaptureDataChangesMode { Id, // capture only rowid Before, // capture before-image After, // capture after-image Full, // before + after + updates } struct CaptureDataChangesInfo { mode: CaptureDataChangesMode, table: String, // CDC table name version: Option<CdcVersion>, // schema version (V1 or V2) }
The connection stores
Option<CaptureDataChangesInfo> — None means CDC is off.
Key methods on
CdcVersion:
—has_commit_record()
, gates COMMIT record emissionself >= V2
/Display
— round-tripsFromStr
↔"v1"
,V1
↔"v2"V2
Key methods on
CaptureDataChangesInfo:
— parses PRAGMA argumentparse(value: &str, version: Option<CdcVersion>)
, returns"<mode>[,<table_name>]"
for "off"None
— returnscdc_version()
(panics if version is None). Single accessor replacing oldCdcVersion
/is_v1()
/is_v2()
methods.version()
/has_before()
/has_after()
— mode capability checkshas_updates()
— returns mode as stringmode_name()
Convenience trait
CaptureDataChangesExt on Option<CaptureDataChangesInfo> provides:
/has_before()
/has_after()
— delegates to inner, returns false for Nonehas_updates()
— returnstable()
, None when CDC is offOption<&str>
CDC Table Schema v1
Default table name:
turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME)
CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, -- unixepoch() change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE table_name TEXT, id <untyped>, -- rowid of changed row before BLOB, -- binary record (before-image) after BLOB, -- binary record (after-image) updates BLOB -- binary record of per-column changes );
CDC Table Schema v2 (current)
CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, -- unixepoch() change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT table_name TEXT, id <untyped>, -- rowid of changed row before BLOB, -- binary record (before-image) after BLOB, -- binary record (after-image) updates BLOB, -- binary record of per-column changes change_txn_id INTEGER -- transaction ID (groups rows into transactions) );
v2 adds:
column — groups CDC rows by transaction. Assigned viachange_txn_id
opcode which get-or-sets a per-connection transaction ID.conn_txn_id(candidate)
(COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicitchange_type=2
.COMMIT
The CDC table is created at runtime by the
InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS.
CDC Version Table
When CDC is first enabled, a version tracking table is created:
CREATE TABLE turso_cdc_version ( table_name TEXT PRIMARY KEY, version TEXT NOT NULL );
Current version:
CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs, re-exported from core/translate/pragma.rs)
Version Detection in InitCdcVersion
The
InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
- If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking)
- If CDC table doesn't exist → create with current version (v2)
- If version row already exists → use that version as-is
DatabaseChange
— sync/engine/src/types.rs:229-249
DatabaseChangesync/engine/src/types.rs:229-249Sync engine's Rust representation of a CDC row. Has
into_apply() and into_revert() methods
for forward/backward replay.
OperationMode
— core/translate/emitter.rs
OperationModecore/translate/emitter.rsUsed by
emit_cdc_insns() to determine change_type value:
→ 1INSERT
/UPDATE
→ 0SELECT
→ -1DELETE
→ 2 (v2 only, emitted byCOMMIT
)emit_cdc_commit_insns
Entry Points
1. PRAGMA — Enable/Disable CDC
Set:
core/translate/pragma.rs
- Checks MVCC is not enabled (CDC and MVCC are mutually exclusive)
- Parses mode string via
withCaptureDataChangesInfo::parse()CDC_VERSION_CURRENT - Emits a single
opcode — all CDC setup (table creation, version tracking, state change) happens at execution timeInitCdcVersion
Get (read current mode):
core/translate/pragma.rs
- Returns 3 columns:
,mode
,tableversion - When off: returns
("off", NULL, NULL) - When active: returns
(mode_name, table, version)
Pragma registration:
core/pragma.rs — CaptureDataChangesConn (and deprecated alias UnstableCaptureDataChangesConn) with columns ["mode", "table", "version"]
2. Connection State
Field:
core/connection.rs — capture_data_changes: RwLock<Option<CaptureDataChangesInfo>>
Getter: get_capture_data_changes_info() — returns read guard
Setter: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)
Default: initialized as None (CDC off)
3. ProgramBuilder Integration
Field:
core/vdbe/builder.rs — capture_data_changes_info: Option<CaptureDataChangesInfo>
Accessor: capture_data_changes_info() — returns &Option<CaptureDataChangesInfo>
Passed from: core/translate/mod.rs — read from connection when creating builder
4. PrepareContext
Field:
core/vdbe/mod.rs — capture_data_changes: Option<CaptureDataChangesInfo>
Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info()
5. InitCdcVersion Opcode — core/vdbe/execute.rs
core/vdbe/execute.rsAlways emitted by PRAGMA SET. Handles all CDC setup at execution time:
- For "off": stores
inNone
, returns earlystate.pending_cdc_info - Checks if CDC table already exists (for v1 backward compatibility)
- Creates CDC table (
) — v2 schema withCREATE TABLE IF NOT EXISTS <cdc_table_name> ...
columnchange_txn_id - Creates version table (
)CREATE TABLE IF NOT EXISTS turso_cdc_version ... - Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses
to preserve existing version rows.INSERT OR IGNORE - Reads back actual version from the table
- Stores computed
inCaptureDataChangesInfostate.pending_cdc_info
The connection's CDC state is not applied in the opcode. Instead,
pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
All table creation is done via nested
conn.prepare()/run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
Bytecode Emission (core/translate/emitter.rs)
These are the core CDC code generation functions:
| Function | Purpose |
|---|---|
| Opens CDC table cursor if CDC is active and target != CDC table |
| Reads all columns from cursor into a MakeRecord (for before/after image) |
| Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
| Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
| Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
| End-of-statement COMMIT emission. Checks at runtime — only emits COMMIT if in autocommit mode. v2 only. |
COMMIT Emission Strategy (v2)
Per-row call sites use
emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime:
- Autocommit mode: emits a COMMIT record after the statement completes
- Explicit transaction (
): skips per-statement COMMIT; the explicitBEGIN...COMMIT
statement emits the COMMIT record viaCOMMITemit_cdc_commit_insns()
This ensures multi-row statements like
INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row.
Integration Points — Where CDC Records Are Emitted
INSERT — core/translate/insert.rs
core/translate/insert.rs- Per-row:
after insert, and before delete for REPLACE/conflictemit_cdc_insns() - End-of-statement:
inemit_cdc_autocommit_commit()
after the insert loopemit_epilogue()
UPDATE — core/translate/emitter.rs
core/translate/emitter.rs- Per-row: captures before-image, after-image via patch record, emits
emit_cdc_insns() - End-of-statement:
after the update loopemit_cdc_autocommit_commit()
DELETE — core/translate/emitter.rs
core/translate/emitter.rs- Per-row: captures before-image and emits
emit_cdc_insns() - End-of-statement:
after the delete loopemit_cdc_autocommit_commit()
UPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs
core/translate/upsert.rs- Per-row:
for all three cases: pure insert, update after conflict, replaceemit_cdc_insns() - No end-of-statement COMMIT — upsert shares INSERT's epilogue
Schema Changes (DDL) — core/translate/schema.rs
core/translate/schema.rs- CREATE TABLE:
(insert intoemit_cdc_insns()
) +sqlite_schemaemit_cdc_autocommit_commit() - DROP TABLE:
per-row in metadata loop +emit_cdc_insns()
after loopemit_cdc_autocommit_commit() - CREATE INDEX:
+emit_cdc_insns()
(emit_cdc_autocommit_commit()
)core/translate/schema.rs - DROP INDEX:
per-row +emit_cdc_insns()
after loop (emit_cdc_autocommit_commit()
)core/translate/index.rs
DDL in explicit transactions (
BEGIN; CREATE TABLE t(x); COMMIT) does NOT emit per-statement COMMIT — the autocommit check prevents it.
ALTER TABLE — core/translate/update.rs
core/translate/update.rs- Sets
on the update plan when CDC has updates modecdc_update_alter_statement
Views/Triggers — Explicitly excluded
— passescore/translate/view.rs
for CDC cursorNone
— passescore/translate/trigger.rs
for CDC cursorNone
Subqueries — No CDC
—core/translate/subquery.rscdc_cursor_id: None
Helper Functions (for reading CDC data)
table_columns_json_array(table_name)
— core/function.rs
, core/vdbe/execute.rs
table_columns_json_array(table_name)core/function.rscore/vdbe/execute.rsReturns JSON array of column names for a table. Used to interpret binary records.
bin_record_json_object(columns_json, blob)
— core/function.rs
, core/vdbe/execute.rs
bin_record_json_object(columns_json, blob)core/function.rscore/vdbe/execute.rsDecodes a binary record (from
before/after/updates columns) into a JSON object using column names.
Sync Engine Integration
The sync engine is the primary consumer of CDC data.
DatabaseTape — sync/engine/src/database_tape.rs
sync/engine/src/database_tape.rs- CDC config:
,DEFAULT_CDC_TABLE_NAME = "turso_cdc"DEFAULT_CDC_MODE = "full" - PRAGMA name:
CDC_PRAGMA_NAME = "capture_data_changes_conn" - Initialization:
sets CDC pragma and cachesconnect()
fromcdc_version
table. Must be called beforeturso_cdc_version
.iterate_changes() - Version caching:
— set bycdc_version: RwLock<Option<CdcVersion>>
, read byconnect()
. Panics if not set.iterate_changes() - Iterator:
reads CDC table in batches, emitsDatabaseChangesIterator
. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch.DatabaseTapeOperation
(default) filters outignore_schema_changes: true
row changes but not COMMIT records.sqlite_schema
Sync Operations — sync/engine/src/database_sync_operations.rs
sync/engine/src/database_sync_operations.rs- Change counting:
SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?
Sync Engine — sync/engine/src/database_sync_engine.rs
sync/engine/src/database_sync_engine.rs- Initialization:
callsopen_db()
to ensure CDC is set up and version is cached before anymain_tape.connect(coro)
calls.iterate_changes() - During
, checks if CDC table existed, re-creates it after syncapply_changes
Replay Generator — sync/engine/src/database_replay_generator.rs
sync/engine/src/database_replay_generator.rs- Requires
column to be populated (full mode)updates
Bindings CDC Surface
All bindings expose
cdc_operations as part of sync stats:
| Binding | File |
|---|---|
| Python | |
| JavaScript | |
| JS (generator) | |
| Go | |
| React Native | |
| SDK Kit (C header) | |
| SDK Kit (Rust) | |
Tests
- Integration tests:
— covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered intests/integration/functions/test_cdc.rs
.tests/integration/functions/mod.rs - Sync engine tests:
— CDC table reads, tape iteration, replay of schema changes.sync/engine/src/database_tape.rs - JS binding tests:
bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
Run:
cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine).
User-facing Documentation
- CLI manual page:
— accessible viacli/manuals/cdc.md
in the REPL.manual cdc - Database manual:
— CDC section linked in TOCdocs/manual.md
Key Design Decisions
- Per-connection, not per-database. Each connection has its own CDC mode and can target different tables.
- Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers.
- Self-exclusion. Changes to the CDC table and
table are never captured (checked inturso_cdc_version
).prepare_cdc_if_necessary - Schema changes tracked. DDL operations are recorded as changes to
table.sqlite_schema - Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload).
- Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries.
- Version tracking. CDC schema version is recorded in
table and carried inturso_cdc_version
for future schema evolution.CaptureDataChangesInfo.version - Atomic PRAGMA. Connection CDC state is deferred via
inpending_cdc_info
and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.ProgramState - Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using
which checksemit_cdc_autocommit_commit()
at runtime. In explicit transactions, only the finalis_autocommit()
emits a COMMIT CDC record.COMMIT - Backward-compatible version detection. Pre-existing v1 CDC tables (without
) are detected by checking table existence before creation. Existing tables getturso_cdc_version
inserted into the version table.CdcVersion::V1 - Typed version enum.
enum withCdcVersion
and#[repr(u8)]
/Ord
enables feature gating via integer comparison (PartialOrd
=has_commit_record()
).self >= V2
/Display
handles database round-trip.FromStr - CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.