Rei-skills database-migrations-migration-observability
Migration monitoring, CDC, and observability infrastructure
git clone https://github.com/rootcastleco/rei-skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/rootcastleco/rei-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/database-migrations-migration-observability" ~/.claude/skills/rootcastleco-rei-skills-database-migrations-migration-observability && rm -rf "$T"
skills/database-migrations-migration-observability/SKILL.md⚠️ AUTHORIZED USE ONLY — This skill is intended for authorized security professionals only. Use only against systems you own or have explicit written permission to test. Unauthorized use may violate applicable laws.
Migration Observability and Real-time Monitoring
You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.
Use this skill when
- Working on migration observability and real-time monitoring tasks or workflows
- Needing guidance, best practices, or checklists for migration observability and real-time monitoring
Do not use this skill when
- The task is unrelated to migration observability and real-time monitoring
- You need a different domain or tool outside this scope
Context
The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.
Requirements
$ARGUMENTS
Instructions
1. Observable MongoDB Migrations
const { MongoClient } = require('mongodb'); const { createLogger, transports } = require('winston'); const prometheus = require('prom-client'); class ObservableAtlasMigration { constructor(connectionString) { this.client = new MongoClient(connectionString); this.logger = createLogger({ transports: [ new transports.File({ filename: 'migrations.log' }), new transports.Console() ] }); this.metrics = this.setupMetrics(); } setupMetrics() { const register = new prometheus.Registry(); return { migrationDuration: new prometheus.Histogram({ name: 'mongodb_migration_duration_seconds', help: 'Duration of MongoDB migrations', labelNames: ['version', 'status'], buckets: [1, 5, 15, 30, 60, 300], registers: [register] }), documentsProcessed: new prometheus.Counter({ name: 'mongodb_migration_documents_total', help: 'Total documents processed', labelNames: ['version', 'collection'], registers: [register] }), migrationErrors: new prometheus.Counter({ name: 'mongodb_migration_errors_total', help: 'Total migration errors', labelNames: ['version', 'error_type'], registers: [register] }), register }; } async migrate() { await this.client.connect(); const db = this.client.db(); for (const [version, migration] of this.migrations) { await this.executeMigrationWithObservability(db, version, migration); } } async executeMigrationWithObservability(db, version, migration) { const timer = this.metrics.migrationDuration.startTimer({ version }); const session = this.client.startSession(); try { this.logger.info(`Starting migration ${version}`); await session.withTransaction(async () => { await migration.up(db, session, (collection, count) => { this.metrics.documentsProcessed.inc({ version, collection }, count); }); }); timer({ status: 'success' }); this.logger.info(`Migration ${version} completed`); } catch (error) { this.metrics.migrationErrors.inc({ version, error_type: error.name }); timer({ status: 'failed' }); throw error; } finally { await session.endSession(); } } }
2. Change Data Capture with Debezium
import asyncio import json from kafka import KafkaConsumer, KafkaProducer from prometheus_client import Counter, Histogram, Gauge from datetime import datetime class CDCObservabilityManager: def __init__(self, config): self.config = config self.metrics = self.setup_metrics() def setup_metrics(self): return { 'events_processed': Counter( 'cdc_events_processed_total', 'Total CDC events processed', ['source', 'table', 'operation'] ), 'consumer_lag': Gauge( 'cdc_consumer_lag_messages', 'Consumer lag in messages', ['topic', 'partition'] ), 'replication_lag': Gauge( 'cdc_replication_lag_seconds', 'Replication lag', ['source_table', 'target_table'] ) } async def setup_cdc_pipeline(self): self.consumer = KafkaConsumer( 'database.changes', bootstrap_servers=self.config['kafka_brokers'], group_id='migration-consumer', value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) self.producer = KafkaProducer( bootstrap_servers=self.config['kafka_brokers'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) async def process_cdc_events(self): for message in self.consumer: event = self.parse_cdc_event(message.value) self.metrics['events_processed'].labels( source=event.source_db, table=event.table, operation=event.operation ).inc() await self.apply_to_target( event.table, event.operation, event.data, event.timestamp ) async def setup_debezium_connector(self, source_config): connector_config = { "name": f"migration-connector-{source_config['name']}", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": source_config['host'], "database.port": source_config['port'], "database.dbname": source_config['database'], "plugin.name": "pgoutput", "heartbeat.interval.ms": "10000" } } response = requests.post( f"{self.config['kafka_connect_url']}/connectors", json=connector_config )
3. Enterprise Monitoring and Alerting
from prometheus_client import Counter, Gauge, Histogram, Summary import numpy as np class EnterpriseMigrationMonitor: def __init__(self, config): self.config = config self.registry = prometheus.CollectorRegistry() self.metrics = self.setup_metrics() self.alerting = AlertingSystem(config.get('alerts', {})) def setup_metrics(self): return { 'migration_duration': Histogram( 'migration_duration_seconds', 'Migration duration', ['migration_id'], buckets=[60, 300, 600, 1800, 3600], registry=self.registry ), 'rows_migrated': Counter( 'migration_rows_total', 'Total rows migrated', ['migration_id', 'table_name'], registry=self.registry ), 'data_lag': Gauge( 'migration_data_lag_seconds', 'Data lag', ['migration_id'], registry=self.registry ) } async def track_migration_progress(self, migration_id): while migration.status == 'running': stats = await self.calculate_progress_stats(migration) self.metrics['rows_migrated'].labels( migration_id=migration_id, table_name=migration.table ).inc(stats.rows_processed) anomalies = await self.detect_anomalies(migration_id, stats) if anomalies: await self.handle_anomalies(migration_id, anomalies) await asyncio.sleep(30) async def detect_anomalies(self, migration_id, stats): anomalies = [] if stats.rows_per_second < stats.expected_rows_per_second * 0.5: anomalies.append({ 'type': 'low_throughput', 'severity': 'warning', 'message': f'Throughput below expected' }) if stats.error_rate > 0.01: anomalies.append({ 'type': 'high_error_rate', 'severity': 'critical', 'message': f'Error rate exceeds threshold' }) return anomalies async def setup_migration_dashboard(self): dashboard_config = { "dashboard": { "title": "Database Migration Monitoring", "panels": [ { "title": "Migration Progress", "targets": [{ "expr": "rate(migration_rows_total[5m])" }] }, { "title": "Data Lag", "targets": [{ "expr": "migration_data_lag_seconds" }] } ] } } response = requests.post( f"{self.config['grafana_url']}/api/dashboards/db", json=dashboard_config, headers={'Authorization': f"Bearer {self.config['grafana_token']}"} ) class AlertingSystem: def __init__(self, config): self.config = config async def send_alert(self, title, message, severity, **kwargs): if 'slack' in self.config: await self.send_slack_alert(title, message, severity) if 'email' in self.config: await self.send_email_alert(title, message, severity) async def send_slack_alert(self, title, message, severity): color = { 'critical': 'danger', 'warning': 'warning', 'info': 'good' }.get(severity, 'warning') payload = { 'text': title, 'attachments': [{ 'color': color, 'text': message }] } requests.post(self.config['slack']['webhook_url'], json=payload)
4. Grafana Dashboard Configuration
dashboard_panels = [ { "id": 1, "title": "Migration Progress", "type": "graph", "targets": [{ "expr": "rate(migration_rows_total[5m])", "legendFormat": "{{migration_id}} - {{table_name}}" }] }, { "id": 2, "title": "Data Lag", "type": "stat", "targets": [{ "expr": "migration_data_lag_seconds" }], "fieldConfig": { "thresholds": { "steps": [ {"value": 0, "color": "green"}, {"value": 60, "color": "yellow"}, {"value": 300, "color": "red"} ] } } }, { "id": 3, "title": "Error Rate", "type": "graph", "targets": [{ "expr": "rate(migration_errors_total[5m])" }] } ]
5. CI/CD Integration
name: Migration Monitoring on: push: branches: [main] jobs: monitor-migration: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Start Monitoring run: | python migration_monitor.py start \ --migration-id ${{ github.sha }} \ --prometheus-url ${{ secrets.PROMETHEUS_URL }} - name: Run Migration run: | python migrate.py --environment production - name: Check Migration Health run: | python migration_monitor.py check \ --migration-id ${{ github.sha }} \ --max-lag 300
Output Format
- Observable MongoDB Migrations: Atlas framework with metrics and validation
- CDC Pipeline with Monitoring: Debezium integration with Kafka
- Enterprise Metrics Collection: Prometheus instrumentation
- Anomaly Detection: Statistical analysis
- Multi-channel Alerting: Email, Slack, PagerDuty integrations
- Grafana Dashboard Automation: Programmatic dashboard creation
- Replication Lag Tracking: Source-to-target lag monitoring
- Health Check Systems: Continuous pipeline monitoring
Focus on real-time visibility, proactive alerting, and comprehensive observability for zero-downtime migrations.
Cross-Plugin Integration
This plugin integrates with:
- sql-migrations: Provides observability for SQL migrations
- nosql-migrations: Monitors NoSQL transformations
- migration-integration: Coordinates monitoring across workflows
🏰 Rei Skills — Curated by Rootcastle Engineering & Innovation | Batuhan Ayrıbaş
Engineering Beyond Boundaries | admin@rootcastle.com