Claude-skill-registry etl-sync-job-builder
Designs reliable ETL and data synchronization jobs with incremental updates, idempotency guarantees, watermark tracking, error handling, and retry logic. Use for "ETL jobs", "data sync", "incremental sync", or "data pipeline".
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/etl-sync-job-builder" ~/.claude/skills/majiayu000-claude-skill-registry-etl-sync-job-builder && rm -rf "$T"
manifest:
skills/data/etl-sync-job-builder/SKILL.mdsource content
ETL/Sync Job Builder
Build reliable, incremental data synchronization pipelines.
ETL Job Pattern
// jobs/sync-users.ts interface SyncJob { name: string; source: "database" | "api" | "file"; destination: "database" | "warehouse" | "s3"; schedule: string; } export class ETLJob { constructor(private name: string, private watermarkKey: string) {} async run() { console.log(`🔄 Starting ${this.name}...`); try { // 1. Get last watermark const lastSync = await this.getWatermark(); console.log(` Last sync: ${lastSync}`); // 2. Extract data const data = await this.extract(lastSync); console.log(` Extracted ${data.length} records`); // 3. Transform data const transformed = await this.transform(data); // 4. Load data await this.load(transformed); // 5. Update watermark await this.updateWatermark(new Date()); console.log(`✅ ${this.name} complete`); } catch (error) { console.error(`❌ ${this.name} failed:`, error); throw error; } } private async extract(since: Date) { // Extract logic return []; } private async transform(data: any[]) { // Transform logic return data; } private async load(data: any[]) { // Load logic } private async getWatermark(): Promise<Date> { const watermark = await prisma.syncWatermark.findUnique({ where: { key: this.watermarkKey }, }); return watermark?.lastSync || new Date(0); } private async updateWatermark(timestamp: Date) { await prisma.syncWatermark.upsert({ where: { key: this.watermarkKey }, create: { key: this.watermarkKey, lastSync: timestamp }, update: { lastSync: timestamp }, }); } }
Watermark Strategy
// Track sync progress model SyncWatermark { key String @id lastSync DateTime metadata Json? @@index([lastSync]) }
// Incremental sync using watermark async function syncOrdersIncremental() { // Get last sync time const watermark = await prisma.syncWatermark.findUnique({ where: { key: "orders_sync" }, }); const lastSync = watermark?.lastSync || new Date(0); // Fetch only new/updated records const newOrders = await sourceDb.order.findMany({ where: { updated_at: { gt: lastSync }, }, orderBy: { updated_at: "asc" }, }); console.log(`📦 Syncing ${newOrders.length} orders...`); // Process in batches for (let i = 0; i < newOrders.length; i += 100) { const batch = newOrders.slice(i, i + 100); await destinationDb.order.createMany({ data: batch, skipDuplicates: true, // Idempotency }); } // Update watermark to latest record if (newOrders.length > 0) { const latestTimestamp = newOrders[newOrders.length - 1].updated_at; await prisma.syncWatermark.upsert({ where: { key: "orders_sync" }, create: { key: "orders_sync", lastSync: latestTimestamp }, update: { lastSync: latestTimestamp }, }); } console.log(`✅ Sync complete`); }
Idempotent Upsert Pattern
// Idempotent sync - safe to run multiple times async function syncUsersIdempotent(users: User[]) { for (const user of users) { await prisma.user.upsert({ where: { id: user.id }, create: user, update: { email: user.email, name: user.name, updated_at: user.updated_at, }, }); } } // Batch upsert for better performance async function syncUsersBatch(users: User[]) { // PostgreSQL: Use ON CONFLICT await prisma.$executeRaw` INSERT INTO users (id, email, name, updated_at) SELECT * FROM UNNEST( ${users.map((u) => u.id)}::bigint[], ${users.map((u) => u.email)}::text[], ${users.map((u) => u.name)}::text[], ${users.map((u) => u.updated_at)}::timestamp[] ) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, name = EXCLUDED.name, updated_at = EXCLUDED.updated_at WHERE users.updated_at < EXCLUDED.updated_at `; }
Retry Logic with Exponential Backoff
async function syncWithRetry<T>( operation: () => Promise<T>, maxRetries: number = 3, baseDelay: number = 1000 ): Promise<T> { for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await operation(); } catch (error) { if (attempt === maxRetries) throw error; const delay = baseDelay * Math.pow(2, attempt); console.log(` Retry ${attempt + 1}/${maxRetries} after ${delay}ms`); await sleep(delay); } } throw new Error("Max retries exceeded"); } // Usage await syncWithRetry( async () => { return await syncOrders(); }, 3, 1000 );
Change Data Capture (CDC)
// Listen to database changes import { PrismaClient } from "@prisma/client"; const prisma = new PrismaClient(); // PostgreSQL: Listen to logical replication async function setupCDC() { await prisma.$executeRaw` CREATE PUBLICATION orders_publication FOR TABLE orders; `; // Subscribe to changes (using pg library) const client = await pg.connect(); client.query("LISTEN orders_changed;"); client.on("notification", async (msg) => { const change = JSON.parse(msg.payload); if (change.operation === "INSERT" || change.operation === "UPDATE") { await syncOrder(change.data); } }); }
Conflict Resolution
interface ConflictResolution { strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge"; } async function syncWithConflictResolution( sourceRecord: any, destRecord: any, strategy: ConflictResolution["strategy"] ) { if (strategy === "source-wins") { return sourceRecord; } if (strategy === "dest-wins") { return destRecord; } if (strategy === "latest-wins") { return sourceRecord.updated_at > destRecord.updated_at ? sourceRecord : destRecord; } if (strategy === "merge") { // Merge non-null fields return { ...destRecord, ...Object.fromEntries( Object.entries(sourceRecord).filter(([_, v]) => v != null) ), }; } }
Monitoring & Observability
// Track sync job metrics interface SyncMetrics { jobName: string; startTime: Date; endTime: Date; recordsProcessed: number; recordsInserted: number; recordsUpdated: number; recordsSkipped: number; errors: number; durationMs: number; } async function logSyncMetrics(metrics: SyncMetrics) { await prisma.syncMetric.create({ data: metrics, }); console.log(` 📊 Sync Metrics Job: ${metrics.jobName} Records: ${metrics.recordsProcessed} Inserted: ${metrics.recordsInserted} Updated: ${metrics.recordsUpdated} Errors: ${metrics.errors} Duration: ${metrics.durationMs}ms `); }
Full ETL Job Example
// jobs/sync-orders-to-warehouse.ts export class OrdersETLJob extends ETLJob { constructor() { super("orders-etl", "orders_warehouse_sync"); } async extract(since: Date): Promise<Order[]> { return prisma.order.findMany({ where: { updated_at: { gt: since }, }, include: { items: true, user: true, }, orderBy: { updated_at: "asc" }, }); } async transform(orders: Order[]): Promise<WarehouseOrder[]> { return orders.map((order) => ({ order_id: order.id, user_email: order.user.email, total_amount: order.total, item_count: order.items.length, status: order.status, order_date: order.created_at, synced_at: new Date(), })); } async load(data: WarehouseOrder[]): Promise<void> { const batchSize = 100; for (let i = 0; i < data.length; i += batchSize) { const batch = data.slice(i, i + batchSize); await warehouseDb.$executeRaw` INSERT INTO orders_fact ( order_id, user_email, total_amount, item_count, status, order_date, synced_at ) VALUES ${batch .map( (o) => `( ${o.order_id}, '${o.user_email}', ${o.total_amount}, ${o.item_count}, '${o.status}', '${o.order_date}', '${o.synced_at}' )` ) .join(",")} ON CONFLICT (order_id) DO UPDATE SET total_amount = EXCLUDED.total_amount, status = EXCLUDED.status, synced_at = EXCLUDED.synced_at `; } } } // Run job new OrdersETLJob().run();
Scheduling
// Schedule ETL jobs import cron from "node-cron"; // Run every hour cron.schedule("0 * * * *", async () => { await new OrdersETLJob().run(); }); // Run every 15 minutes cron.schedule("*/15 * * * *", async () => { await syncUsersIncremental(); }); // Run nightly at 2 AM cron.schedule("0 2 * * *", async () => { await fullDataSync(); });
Error Handling & Recovery
async function syncWithErrorHandling() { const checkpoint = await getCheckpoint(); let processedRecords = 0; try { const records = await fetchRecords(checkpoint); for (const record of records) { try { await processRecord(record); processedRecords++; // Save checkpoint every 100 records if (processedRecords % 100 === 0) { await saveCheckpoint(record.id); } } catch (error) { // Log error but continue console.error(`Failed to process record ${record.id}:`, error); await logFailedRecord(record.id, error); } } await saveCheckpoint("completed"); } catch (error) { // Critical failure - job will retry from checkpoint console.error("Job failed:", error); throw error; } }
Best Practices
- Incremental sync: Use watermarks, don't full-scan
- Idempotent operations: Safe to retry
- Batch processing: Process 100-1000 records at a time
- Checkpointing: Resume from failure point
- Retry with backoff: Handle transient failures
- Monitor metrics: Track job health
- Test thoroughly: Including failure scenarios
Output Checklist
- ETL job class created
- Watermark tracking implemented
- Incremental sync logic
- Idempotent upsert operations
- Retry logic with backoff
- Conflict resolution strategy
- Monitoring and metrics
- Error handling and recovery
- Job scheduling configured
- Testing including failure cases