Claude-code-plugins navan-data-sync
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/navan-pack/skills/navan-data-sync" ~/.claude/skills/jeremylongshore-claude-code-plugins-navan-data-sync && rm -rf "$T"
plugins/saas-packs/navan-pack/skills/navan-data-sync/SKILL.mdNavan — Data Sync
Overview
This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tiers — scheduled full-refresh, incremental watermark-based sync, and real-time webhooks — along with Airbyte connector configuration and idempotent SQL upsert patterns.
Prerequisites
- Navan account with OAuth 2.0 API credentials (see
)navan-install-auth - Destination warehouse (Snowflake, BigQuery, PostgreSQL, or Redshift)
- For managed sync: Airbyte instance (Cloud or OSS) with source-navan v0.0.42+
- For webhooks: publicly accessible HTTPS endpoint for callbacks
- Node.js 18+ or Python 3.8+
- Environment variables:
,NAVAN_CLIENT_ID
,NAVAN_CLIENT_SECRETNAVAN_BASE_URL
Instructions
Step 1: Full-Refresh Sync for BOOKING Table
The BOOKING table is re-imported weekly by Navan. Every record is refreshed, so your sync must use merge-upsert logic to avoid duplicates while capturing updates.
const tokenRes = await fetch(`${process.env.NAVAN_BASE_URL}/ta-auth/oauth/token`, { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.NAVAN_CLIENT_ID!, client_secret: process.env.NAVAN_CLIENT_SECRET!, }), }); const { access_token } = await tokenRes.json(); const headers = { Authorization: `Bearer ${access_token}` }; // Full extraction — paginate through all bookings for weekly refresh let allBookings: any[] = []; let page = 0; const size = 50; while (true) { const res = await fetch( `${process.env.NAVAN_BASE_URL}/v1/bookings?page=${page}&size=${size}`, { headers } ); const { data } = await res.json(); if (!data || !data.length) break; allBookings.push(...data); if (data.length < size) break; page++; } console.log(`Extracted ${allBookings.length} bookings for full refresh`);
SQL merge-upsert pattern (PostgreSQL):
-- Staging table receives raw API data CREATE TABLE IF NOT EXISTS navan_booking_staging ( uuid TEXT PRIMARY KEY, traveler_email TEXT, origin TEXT, destination TEXT, start_date DATE, end_date DATE, total_cost NUMERIC(12,2), currency TEXT DEFAULT 'USD', department TEXT, cost_center TEXT, status TEXT, in_policy BOOLEAN, created_at TIMESTAMPTZ, updated_at TIMESTAMPTZ, synced_at TIMESTAMPTZ DEFAULT NOW() ); -- Merge-upsert: insert new records, update changed records INSERT INTO navan_booking AS b SELECT * FROM navan_booking_staging s ON CONFLICT (uuid) DO UPDATE SET traveler_email = EXCLUDED.traveler_email, origin = EXCLUDED.origin, destination = EXCLUDED.destination, start_date = EXCLUDED.start_date, end_date = EXCLUDED.end_date, total_cost = EXCLUDED.total_cost, status = EXCLUDED.status, in_policy = EXCLUDED.in_policy, updated_at = EXCLUDED.updated_at, synced_at = NOW() WHERE b.updated_at < EXCLUDED.updated_at;
Step 2: Incremental Sync for TRANSACTION Table
TRANSACTION data is append-only. Use watermark-based sync to pull only new records.
// Track high-watermark for incremental pulls interface SyncState { lastSyncDate: string; // ISO date of last successful sync lastTransactionId: string; } async function loadSyncState(): Promise<SyncState> { const fs = await import('fs'); try { return JSON.parse(fs.readFileSync('.navan-sync-state.json', 'utf-8')); } catch { return { lastSyncDate: '2025-01-01', lastTransactionId: '' }; } } async function saveSyncState(state: SyncState) { const fs = await import('fs'); fs.writeFileSync('.navan-sync-state.json', JSON.stringify(state, null, 2)); } // Pull bookings since last watermark const state = await loadSyncState(); const today = new Date().toISOString().split('T')[0]; const txnRes = await fetch( `${process.env.NAVAN_BASE_URL}/v1/bookings` + `?createdFrom=${state.lastSyncDate}&createdTo=${today}&page=0&size=50`, { headers } ); const { data: transactions } = await txnRes.json(); // Filter out already-seen transactions const newTxns = transactions.filter( (t: any) => t.transaction_id > state.lastTransactionId ); console.log(`New transactions since ${state.lastSyncDate}: ${newTxns.length}`); // Update watermark after successful load if (newTxns.length > 0) { await saveSyncState({ lastSyncDate: today, lastTransactionId: newTxns[newTxns.length - 1].transaction_id, }); }
Step 3: Webhook Endpoint for Real-Time Events
import { createServer } from 'http'; import { createHmac } from 'crypto'; // Webhook handler for real-time Navan events const server = createServer(async (req, res) => { if (req.method !== 'POST' || req.url !== '/navan/webhook') { res.writeHead(404); res.end(); return; } const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); const body = Buffer.concat(chunks).toString(); // Verify webhook signature const signature = req.headers['x-navan-signature'] as string; const expected = createHmac('sha256', process.env.NAVAN_WEBHOOK_SECRET!) .update(body) .digest('hex'); if (signature !== expected) { console.error('Invalid webhook signature'); res.writeHead(401); res.end('Unauthorized'); return; } const event = JSON.parse(body); console.log(`Webhook event: ${event.type}`); switch (event.type) { case 'booking.created': console.log(`New booking: ${event.data.uuid}`); break; case 'booking.updated': console.log(`Booking updated: ${event.data.uuid}`); break; case 'booking.cancelled': console.log(`Booking cancelled: ${event.data.uuid}`); break; case 'expense.submitted': console.log(`Expense submitted: ${event.data.transaction_id}`); break; case 'expense.approved': console.log(`Expense approved: ${event.data.transaction_id}`); break; default: console.log(`Unknown event type: ${event.type}`); } res.writeHead(200); res.end('OK'); }); server.listen(3000, () => console.log('Webhook listener on :3000'));
Step 4: Airbyte Connector Sync Configuration
# Airbyte source-navan connector (v0.0.42) # Production sync mode configuration source: sourceDefinitionId: source-navan connectionConfiguration: client_id: "${NAVAN_CLIENT_ID}" client_secret: "${NAVAN_CLIENT_SECRET}" # Sync catalog — bookings stream syncCatalog: streams: - stream: name: bookings jsonSchema: {} config: # Full Refresh for BOOKING (weekly re-import model) syncMode: full_refresh destinationSyncMode: overwrite # Alternative: append_dedup with uuid as primary key # syncMode: full_refresh # destinationSyncMode: append_dedup # primaryKey: [["uuid"]] # Schedule: run weekly to match Navan's BOOKING refresh cycle schedule: scheduleType: cron cronExpression: "0 2 * * 0" # Sunday 2am UTC
Step 5: Sync Monitoring and Alerting
// Monitor sync health and detect drift interface SyncMetrics { tableName: string; lastSyncTime: Date; recordCount: number; expectedFrequency: string; isStale: boolean; } async function checkSyncHealth(): Promise<SyncMetrics[]> { const state = await loadSyncState(); const lastSync = new Date(state.lastSyncDate); const hoursSinceSync = (Date.now() - lastSync.getTime()) / (1000 * 60 * 60); return [ { tableName: 'BOOKING', lastSyncTime: lastSync, recordCount: 0, // populated from warehouse query expectedFrequency: 'weekly', isStale: hoursSinceSync > 7 * 24 + 6, // alert if > 7.25 days }, { tableName: 'TRANSACTION', lastSyncTime: lastSync, recordCount: 0, expectedFrequency: 'daily', isStale: hoursSinceSync > 25, // alert if > 25 hours }, ]; } const health = await checkSyncHealth(); health.forEach(m => { const status = m.isStale ? 'STALE' : 'OK'; console.log(`${m.tableName}: ${status} (last sync: ${m.lastSyncTime.toISOString()})`); });
Step 6: Idempotent Load Pattern
// Ensure loads are idempotent — safe to re-run without side effects async function idempotentLoad(records: any[], tableName: string) { const batchId = `${tableName}-${new Date().toISOString()}`; // 1. Write to staging with batch ID console.log(`Loading ${records.length} records to ${tableName}_staging (batch: ${batchId})`); // 2. Merge-upsert from staging to target // Uses UUID as natural key — same record always resolves to same row console.log(`Merging ${tableName}_staging -> ${tableName}`); // 3. Record batch metadata for audit console.log(`Batch ${batchId} complete: ${records.length} records processed`); return { batchId, recordCount: records.length, status: 'complete' }; }
Output
Successful execution produces:
- Full-refresh BOOKING sync with merge-upsert deduplication
- Incremental TRANSACTION sync with watermark state tracking
- Webhook endpoint for real-time event processing
- Configured Airbyte connector with production-ready sync schedule
- Sync health monitoring with staleness alerting
Error Handling
| Error | HTTP Code | Cause | Solution |
|---|---|---|---|
| Unauthorized | 401 | Expired or invalid bearer token | Re-authenticate via POST /ta-auth/oauth/token |
| Rate Limited | 429 | Too many API requests | Use exponential backoff; increase sync interval |
| Timeout | 504 | Full refresh too large | Chunk by date range (30-day windows) |
| Webhook Sig Invalid | 401 | Tampered or replayed event | Verify NAVAN_WEBHOOK_SECRET; check clock skew |
| Duplicate Records | N/A | Missing UUID dedup in BOOKING sync | Apply merge-upsert with ON CONFLICT (uuid) |
| Sync Drift | N/A | Missed incremental window | Fall back to full refresh; reset watermark |
Examples
Python — Incremental TRANSACTION sync with watermark:
import requests import json import os from datetime import datetime base_url = os.environ.get('NAVAN_BASE_URL', 'https://api.navan.com') auth = requests.post(f'{base_url}/ta-auth/oauth/token', data={ 'grant_type': 'client_credentials', 'client_id': os.environ['NAVAN_CLIENT_ID'], 'client_secret': os.environ['NAVAN_CLIENT_SECRET'], }) headers = {'Authorization': f'Bearer {auth.json()["access_token"]}'} # Load watermark try: with open('.navan-sync-state.json') as f: state = json.load(f) except FileNotFoundError: state = {'last_sync_date': '2025-01-01'} today = datetime.now().strftime('%Y-%m-%d') resp = requests.get( f'{base_url}/v1/bookings', params={'createdFrom': state['last_sync_date'], 'createdTo': today, 'page': 0, 'size': 50}, headers=headers, ).json() txns = resp['data'] print(f'Fetched {len(txns)} records since {state["last_sync_date"]}') # Save updated watermark with open('.navan-sync-state.json', 'w') as f: json.dump({'last_sync_date': today}, f)
Resources
- Navan Help Center — Official documentation and support
- Booking Data Integration — BOOKING table refresh schedule and schema
- Navan Integrations — Fivetran, Airbyte, and Estuary connector details
Next Steps
After configuring data sync, proceed to
navan-observability for pipeline monitoring or navan-performance-tuning for optimizing large-volume syncs.