Awesome-omni-skill database-migrations-sql-migrations
SQL database migrations with zero-downtime strategies for PostgreSQL, MySQL, SQL Server Use when: the user asks to run the `sql-migrations` workflow and the task requires multi-step orchestration. Do not use when: the task is small, single-step, and can be completed directly without orchestration overhead.
git clone https://github.com/diegosouzapw/awesome-omni-skill
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/database-migrations-sql-migrations-sla-te" ~/.claude/skills/diegosouzapw-awesome-omni-skill-database-migrations-sql-migrations-5d9efe && rm -rf "$T"
skills/development/database-migrations-sql-migrations-sla-te/SKILL.mdDatabase Migrations Sql Migrations
Scope
- Use when: the user asks to run the
workflow and the task requires multi-step orchestration.sql-migrations - Do not use when: the task is small, single-step, and can be completed directly without orchestration overhead.
Shared Plugin Context
See
references/plugin-context.md.
Source
- Converted from
~/code/agents/plugins/database-migrations/commands/sql-migrations.md
Instructions
Codex Orchestration Notes
This command was converted into a Codex orchestration skill. When the original workflow requests
subagent_type, use the mapped local skill below.
Sub-agent Mapping
- No explicit
entries were found in the source command.subagent_type
Original Command Workflow
SQL Database Migration Strategy and Implementation
You are a SQL database migration expert specializing in zero-downtime deployments, data integrity, and production-ready migration strategies for PostgreSQL, MySQL, and SQL Server. Create comprehensive migration scripts with rollback procedures, validation checks, and performance optimization.
Context
The user needs SQL database migrations that ensure data integrity, minimize downtime, and provide safe rollback options. Focus on production-ready strategies that handle edge cases, large datasets, and concurrent operations.
Requirements
$ARGUMENTS
Instructions
1. Zero-Downtime Migration Strategies
Expand-Contract Pattern
-- Phase 1: EXPAND (backward compatible) ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE; CREATE INDEX CONCURRENTLY idx_users_email_verified ON users(email_verified); -- Phase 2: MIGRATE DATA (in batches) DO $$ DECLARE batch_size INT := 10000; rows_updated INT; BEGIN LOOP UPDATE users SET email_verified = (email_confirmation_token IS NOT NULL) WHERE id IN ( SELECT id FROM users WHERE email_verified IS NULL LIMIT batch_size ); GET DIAGNOSTICS rows_updated = ROW_COUNT; EXIT WHEN rows_updated = 0; COMMIT; PERFORM pg_sleep(0.1); END LOOP; END $$; -- Phase 3: CONTRACT (after code deployment) ALTER TABLE users DROP COLUMN email_confirmation_token;
Blue-Green Schema Migration
-- Step 1: Create new schema version CREATE TABLE v2_orders ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), customer_id UUID NOT NULL, total_amount DECIMAL(12,2) NOT NULL, status VARCHAR(50) NOT NULL, metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, CONSTRAINT fk_v2_orders_customer FOREIGN KEY (customer_id) REFERENCES customers(id), CONSTRAINT chk_v2_orders_amount CHECK (total_amount >= 0) ); CREATE INDEX idx_v2_orders_customer ON v2_orders(customer_id); CREATE INDEX idx_v2_orders_status ON v2_orders(status); -- Step 2: Dual-write synchronization CREATE OR REPLACE FUNCTION sync_orders_to_v2() RETURNS TRIGGER AS $$ BEGIN INSERT INTO v2_orders (id, customer_id, total_amount, status) VALUES (NEW.id, NEW.customer_id, NEW.amount, NEW.state) ON CONFLICT (id) DO UPDATE SET total_amount = EXCLUDED.total_amount, status = EXCLUDED.status; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER sync_orders_trigger AFTER INSERT OR UPDATE ON orders FOR EACH ROW EXECUTE FUNCTION sync_orders_to_v2(); -- Step 3: Backfill historical data DO $$ DECLARE batch_size INT := 10000; last_id UUID := NULL; BEGIN LOOP INSERT INTO v2_orders (id, customer_id, total_amount, status) SELECT id, customer_id, amount, state FROM orders WHERE (last_id IS NULL OR id > last_id) ORDER BY id LIMIT batch_size ON CONFLICT (id) DO NOTHING; SELECT id INTO last_id FROM orders WHERE (last_id IS NULL OR id > last_id) ORDER BY id LIMIT 1 OFFSET (batch_size - 1); EXIT WHEN last_id IS NULL; COMMIT; END LOOP; END $$;
Online Schema Change
-- PostgreSQL: Add NOT NULL safely -- Step 1: Add column as nullable ALTER TABLE large_table ADD COLUMN new_field VARCHAR(100); -- Step 2: Backfill data UPDATE large_table SET new_field = 'default_value' WHERE new_field IS NULL; -- Step 3: Add constraint (PostgreSQL 12+) ALTER TABLE large_table ADD CONSTRAINT chk_new_field_not_null CHECK (new_field IS NOT NULL) NOT VALID; ALTER TABLE large_table VALIDATE CONSTRAINT chk_new_field_not_null;
2. Migration Scripts
Flyway Migration
-- V001__add_user_preferences.sql BEGIN; CREATE TABLE IF NOT EXISTS user_preferences ( user_id UUID PRIMARY KEY, theme VARCHAR(20) DEFAULT 'light' NOT NULL, language VARCHAR(10) DEFAULT 'en' NOT NULL, timezone VARCHAR(50) DEFAULT 'UTC' NOT NULL, notifications JSONB DEFAULT '{}' NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, CONSTRAINT fk_user_preferences_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE INDEX idx_user_preferences_language ON user_preferences(language); -- Seed defaults for existing users INSERT INTO user_preferences (user_id) SELECT id FROM users ON CONFLICT (user_id) DO NOTHING; COMMIT;
Alembic Migration (Python)
"""add_user_preferences Revision ID: 001_user_prefs """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql def upgrade(): op.create_table( 'user_preferences', sa.Column('user_id', postgresql.UUID(as_uuid=True), primary_key=True), sa.Column('theme', sa.VARCHAR(20), nullable=False, server_default='light'), sa.Column('language', sa.VARCHAR(10), nullable=False, server_default='en'), sa.Column('timezone', sa.VARCHAR(50), nullable=False, server_default='UTC'), sa.Column('notifications', postgresql.JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")), sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE') ) op.create_index('idx_user_preferences_language', 'user_preferences', ['language']) op.execute(""" INSERT INTO user_preferences (user_id) SELECT id FROM users ON CONFLICT (user_id) DO NOTHING """) def downgrade(): op.drop_table('user_preferences')
3. Data Integrity Validation
def validate_pre_migration(db_connection): checks = [] # Check 1: NULL values in critical columns null_check = db_connection.execute(""" SELECT table_name, COUNT(*) as null_count FROM users WHERE email IS NULL """).fetchall() if null_check[0]['null_count'] > 0: checks.append({ 'check': 'null_values', 'status': 'FAILED', 'severity': 'CRITICAL', 'message': 'NULL values found in required columns' }) # Check 2: Duplicate values duplicate_check = db_connection.execute(""" SELECT email, COUNT(*) as count FROM users GROUP BY email HAVING COUNT(*) > 1 """).fetchall() if duplicate_check: checks.append({ 'check': 'duplicates', 'status': 'FAILED', 'severity': 'CRITICAL', 'message': f'{len(duplicate_check)} duplicate emails' }) return checks def validate_post_migration(db_connection, migration_spec): validations = [] # Row count verification for table in migration_spec['affected_tables']: actual_count = db_connection.execute( f"SELECT COUNT(*) FROM {table['name']}" ).fetchone()[0] validations.append({ 'check': 'row_count', 'table': table['name'], 'expected': table['expected_count'], 'actual': actual_count, 'status': 'PASS' if actual_count == table['expected_count'] else 'FAIL' }) return validations
4. Rollback Procedures
import psycopg2 from contextlib import contextmanager class MigrationRunner: def __init__(self, db_config): self.db_config = db_config self.conn = None @contextmanager def migration_transaction(self): try: self.conn = psycopg2.connect(**self.db_config) self.conn.autocommit = False cursor = self.conn.cursor() cursor.execute("SAVEPOINT migration_start") yield cursor self.conn.commit() except Exception as e: if self.conn: self.conn.rollback() raise finally: if self.conn: self.conn.close() def run_with_validation(self, migration): try: # Pre-migration validation pre_checks = self.validate_pre_migration(migration) if any(c['status'] == 'FAILED' for c in pre_checks): raise MigrationError("Pre-migration validation failed") # Create backup self.create_snapshot() # Execute migration with self.migration_transaction() as cursor: for statement in migration.forward_sql: cursor.execute(statement) post_checks = self.validate_post_migration(migration, cursor) if any(c['status'] == 'FAIL' for c in post_checks): raise MigrationError("Post-migration validation failed") self.cleanup_snapshot() except Exception as e: self.rollback_from_snapshot() raise
Rollback Script
#!/bin/bash # rollback_migration.sh set -e MIGRATION_VERSION=$1 DATABASE=$2 # Verify current version CURRENT_VERSION=$(psql -d $DATABASE -t -c \ "SELECT version FROM schema_migrations ORDER BY applied_at DESC LIMIT 1" | xargs) if [ "$CURRENT_VERSION" != "$MIGRATION_VERSION" ]; then echo "❌ Version mismatch" exit 1 fi # Create backup BACKUP_FILE="pre_rollback_${MIGRATION_VERSION}_$(date +%Y%m%d_%H%M%S).sql" pg_dump -d $DATABASE -f "$BACKUP_FILE" # Execute rollback if [ -f "migrations/${MIGRATION_VERSION}.down.sql" ]; then psql -d $DATABASE -f "migrations/${MIGRATION_VERSION}.down.sql" psql -d $DATABASE -c "DELETE FROM schema_migrations WHERE version = '$MIGRATION_VERSION';" echo "✅ Rollback complete" else echo "❌ Rollback file not found" exit 1 fi
5. Performance Optimization
Batch Processing
class BatchMigrator: def __init__(self, db_connection, batch_size=10000): self.db = db_connection self.batch_size = batch_size def migrate_large_table(self, source_query, target_query, cursor_column='id'): last_cursor = None batch_number = 0 while True: batch_number += 1 if last_cursor is None: batch_query = f"{source_query} ORDER BY {cursor_column} LIMIT {self.batch_size}" params = [] else: batch_query = f"{source_query} AND {cursor_column} > %s ORDER BY {cursor_column} LIMIT {self.batch_size}" params = [last_cursor] rows = self.db.execute(batch_query, params).fetchall() if not rows: break for row in rows: self.db.execute(target_query, row) last_cursor = rows[-1][cursor_column] self.db.commit() print(f"Batch {batch_number}: {len(rows)} rows") time.sleep(0.1)
Parallel Migration
from concurrent.futures import ThreadPoolExecutor class ParallelMigrator: def __init__(self, db_config, num_workers=4): self.db_config = db_config self.num_workers = num_workers def migrate_partition(self, partition_spec): table_name, start_id, end_id = partition_spec conn = psycopg2.connect(**self.db_config) cursor = conn.cursor() cursor.execute(f""" INSERT INTO v2_{table_name} (columns...) SELECT columns... FROM {table_name} WHERE id >= %s AND id < %s """, [start_id, end_id]) conn.commit() cursor.close() conn.close() def migrate_table_parallel(self, table_name, partition_size=100000): # Get table bounds conn = psycopg2.connect(**self.db_config) cursor = conn.cursor() cursor.execute(f"SELECT MIN(id), MAX(id) FROM {table_name}") min_id, max_id = cursor.fetchone() # Create partitions partitions = [] current_id = min_id while current_id <= max_id: partitions.append((table_name, current_id, current_id + partition_size)) current_id += partition_size # Execute in parallel with ThreadPoolExecutor(max_workers=self.num_workers) as executor: results = list(executor.map(self.migrate_partition, partitions)) conn.close()
6. Index Management
-- Drop indexes before bulk insert, recreate after CREATE TEMP TABLE migration_indexes AS SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'large_table' AND indexname NOT LIKE '%pkey%'; -- Drop indexes DO $$ DECLARE idx_record RECORD; BEGIN FOR idx_record IN SELECT indexname FROM migration_indexes LOOP EXECUTE format('DROP INDEX IF EXISTS %I', idx_record.indexname); END LOOP; END $$; -- Perform bulk operation INSERT INTO large_table SELECT * FROM source_table; -- Recreate indexes CONCURRENTLY DO $$ DECLARE idx_record RECORD; BEGIN FOR idx_record IN SELECT indexdef FROM migration_indexes LOOP EXECUTE regexp_replace(idx_record.indexdef, 'CREATE INDEX', 'CREATE INDEX CONCURRENTLY'); END LOOP; END $$;
Output Format
- Migration Analysis Report: Detailed breakdown of changes
- Zero-Downtime Implementation Plan: Expand-contract or blue-green strategy
- Migration Scripts: Version-controlled SQL with framework integration
- Validation Suite: Pre and post-migration checks
- Rollback Procedures: Automated and manual rollback scripts
- Performance Optimization: Batch processing, parallel execution
- Monitoring Integration: Progress tracking and alerting
Focus on production-ready SQL migrations with zero-downtime deployment strategies, comprehensive validation, and enterprise-grade safety mechanisms.
Related Plugins
- nosql-migrations: Migration strategies for MongoDB, DynamoDB, Cassandra
- migration-observability: Real-time monitoring and alerting
- migration-integration: CI/CD integration and automated testing