数据迁移实战

# 数据迁移实战

# 概述

数据迁移是企业信息化建设中的重要环节,涉及从旧系统到新系统的数据转移、数据格式转换、数据清洗等多个方面。本章节通过实际案例,展示如何使用SQL进行高效、安全的数据迁移。

# 迁移策略与规划

# 1. 迁移类型

# 完全迁移 (Big Bang)

  • 一次性迁移所有数据
  • 适用于小型系统或可接受停机的场景
  • 风险较高但实施简单

# 增量迁移 (Incremental)

  • 分批次迁移数据
  • 适用于大型系统或不能停机的场景
  • 风险较低但实施复杂

# 并行运行 (Parallel)

  • 新旧系统同时运行
  • 逐步切换业务模块
  • 风险最低但成本最高

# 2. 迁移阶段

-- 迁移状态跟踪表
CREATE TABLE migration_status (
    migration_id VARCHAR(50) PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    migration_type ENUM('full', 'incremental', 'delta') NOT NULL,
    status ENUM('pending', 'running', 'completed', 'failed', 'rollback') DEFAULT 'pending',
    total_records BIGINT DEFAULT 0,
    migrated_records BIGINT DEFAULT 0,
    failed_records BIGINT DEFAULT 0,
    start_time TIMESTAMP NULL,
    end_time TIMESTAMP NULL,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_table (status, table_name),
    INDEX idx_migration_type (migration_type)
);

-- 迁移日志表
CREATE TABLE migration_log (
    log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    migration_id VARCHAR(50) NOT NULL,
    log_level ENUM('INFO', 'WARN', 'ERROR', 'DEBUG') NOT NULL,
    message TEXT NOT NULL,
    details JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_migration_level (migration_id, log_level),
    INDEX idx_created_at (created_at),
    FOREIGN KEY (migration_id) REFERENCES migration_status(migration_id)
);

# 数据迁移实战案例

# 案例1:电商系统升级迁移

# 源系统表结构 (Legacy)

-- 旧系统用户表
CREATE TABLE old_users (
    user_id INT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    password_hash VARCHAR(255),
    full_name VARCHAR(100),
    phone VARCHAR(20),
    address TEXT,
    registration_date DATETIME,
    last_login DATETIME,
    status TINYINT -- 0:inactive, 1:active, 2:suspended
);

-- 旧系统订单表
CREATE TABLE old_orders (
    order_id INT PRIMARY KEY,
    user_id INT,
    order_date DATETIME,
    total_amount DECIMAL(10,2),
    order_status VARCHAR(20), -- 'pending', 'paid', 'shipped', 'delivered', 'cancelled'
    shipping_address TEXT,
    payment_method VARCHAR(50),
    notes TEXT
);

# 目标系统表结构 (New)

-- 新系统用户表
CREATE TABLE users (
    user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_uuid VARCHAR(36) UNIQUE NOT NULL,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    phone VARCHAR(20),
    status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
    email_verified BOOLEAN DEFAULT FALSE,
    phone_verified BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    last_login_at TIMESTAMP NULL
);

-- 新系统地址表
CREATE TABLE user_addresses (
    address_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    address_type ENUM('billing', 'shipping', 'both') DEFAULT 'both',
    street_address VARCHAR(255),
    city VARCHAR(100),
    state VARCHAR(100),
    postal_code VARCHAR(20),
    country VARCHAR(100) DEFAULT 'China',
    is_default BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(user_id)
);

-- 新系统订单表
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_uuid VARCHAR(36) UNIQUE NOT NULL,
    user_id BIGINT NOT NULL,
    order_number VARCHAR(50) UNIQUE NOT NULL,
    order_status ENUM('pending', 'confirmed', 'paid', 'shipped', 'delivered', 'cancelled', 'refunded') DEFAULT 'pending',
    subtotal DECIMAL(12,2) NOT NULL,
    tax_amount DECIMAL(12,2) DEFAULT 0,
    shipping_amount DECIMAL(12,2) DEFAULT 0,
    total_amount DECIMAL(12,2) NOT NULL,
    currency VARCHAR(3) DEFAULT 'CNY',
    payment_method VARCHAR(50),
    payment_status ENUM('pending', 'paid', 'failed', 'refunded') DEFAULT 'pending',
    shipping_address_id BIGINT,
    notes TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(user_id),
    FOREIGN KEY (shipping_address_id) REFERENCES user_addresses(address_id)
);

# 数据迁移脚本

-- 1. 用户数据迁移
DELIMITER //
CREATE PROCEDURE MigrateUsers()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE v_user_id INT;
    DECLARE v_username VARCHAR(50);
    DECLARE v_email VARCHAR(100);
    DECLARE v_password_hash VARCHAR(255);
    DECLARE v_full_name VARCHAR(100);
    DECLARE v_phone VARCHAR(20);
    DECLARE v_address TEXT;
    DECLARE v_registration_date DATETIME;
    DECLARE v_last_login DATETIME;
    DECLARE v_status TINYINT;
    DECLARE v_new_user_id BIGINT;
    DECLARE v_first_name VARCHAR(50);
    DECLARE v_last_name VARCHAR(50);
    
    DECLARE user_cursor CURSOR FOR 
        SELECT user_id, username, email, password_hash, full_name, phone, 
               address, registration_date, last_login, status
        FROM old_users
        WHERE user_id NOT IN (SELECT user_id FROM migration_user_mapping);
    
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        ROLLBACK;
        GET DIAGNOSTICS CONDITION 1
            @error_message = MESSAGE_TEXT;
        INSERT INTO migration_log (migration_id, log_level, message)
        VALUES ('USER_MIGRATION', 'ERROR', @error_message);
        RESIGNAL;
    END;
    
    -- 更新迁移状态
    INSERT INTO migration_status (migration_id, table_name, migration_type, status, start_time)
    VALUES ('USER_MIGRATION', 'users', 'full', 'running', NOW())
    ON DUPLICATE KEY UPDATE 
        status = 'running', 
        start_time = NOW(),
        migrated_records = 0,
        failed_records = 0;
    
    START TRANSACTION;
    
    OPEN user_cursor;
    
    user_loop: LOOP
        FETCH user_cursor INTO v_user_id, v_username, v_email, v_password_hash, 
                              v_full_name, v_phone, v_address, v_registration_date, 
                              v_last_login, v_status;
        
        IF done THEN
            LEAVE user_loop;
        END IF;
        
        -- 分割姓名
        SET v_first_name = SUBSTRING_INDEX(COALESCE(v_full_name, ''), ' ', 1);
        SET v_last_name = CASE 
            WHEN LOCATE(' ', COALESCE(v_full_name, '')) > 0 
            THEN SUBSTRING(v_full_name, LOCATE(' ', v_full_name) + 1)
            ELSE ''
        END;
        
        -- 插入用户数据
        INSERT INTO users (
            user_uuid, username, email, password_hash, first_name, last_name,
            phone, status, created_at, last_login_at
        ) VALUES (
            UUID(),
            v_username,
            v_email,
            v_password_hash,
            v_first_name,
            v_last_name,
            v_phone,
            CASE v_status 
                WHEN 0 THEN 'inactive'
                WHEN 1 THEN 'active'
                WHEN 2 THEN 'suspended'
                ELSE 'inactive'
            END,
            v_registration_date,
            v_last_login
        );
        
        SET v_new_user_id = LAST_INSERT_ID();
        
        -- 记录映射关系
        INSERT INTO migration_user_mapping (old_user_id, new_user_id)
        VALUES (v_user_id, v_new_user_id);
        
        -- 迁移地址信息
        IF v_address IS NOT NULL AND TRIM(v_address) != '' THEN
            INSERT INTO user_addresses (
                user_id, address_type, street_address, is_default
            ) VALUES (
                v_new_user_id, 'both', v_address, TRUE
            );
        END IF;
        
        -- 更新迁移进度
        UPDATE migration_status 
        SET migrated_records = migrated_records + 1
        WHERE migration_id = 'USER_MIGRATION';
        
    END LOOP;
    
    CLOSE user_cursor;
    COMMIT;
    
    -- 更新迁移状态为完成
    UPDATE migration_status 
    SET status = 'completed', end_time = NOW()
    WHERE migration_id = 'USER_MIGRATION';
    
    INSERT INTO migration_log (migration_id, log_level, message)
    VALUES ('USER_MIGRATION', 'INFO', 'User migration completed successfully');
    
END //
DELIMITER ;
-- 2. 订单数据迁移
DELIMITER //
CREATE PROCEDURE MigrateOrders()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE v_order_id INT;
    DECLARE v_user_id INT;
    DECLARE v_new_user_id BIGINT;
    DECLARE v_order_date DATETIME;
    DECLARE v_total_amount DECIMAL(10,2);
    DECLARE v_order_status VARCHAR(20);
    DECLARE v_shipping_address TEXT;
    DECLARE v_payment_method VARCHAR(50);
    DECLARE v_notes TEXT;
    DECLARE v_address_id BIGINT DEFAULT NULL;
    
    DECLARE order_cursor CURSOR FOR 
        SELECT o.order_id, o.user_id, o.order_date, o.total_amount, 
               o.order_status, o.shipping_address, o.payment_method, o.notes
        FROM old_orders o
        INNER JOIN migration_user_mapping m ON o.user_id = m.old_user_id
        WHERE o.order_id NOT IN (SELECT old_order_id FROM migration_order_mapping);
    
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        ROLLBACK;
        GET DIAGNOSTICS CONDITION 1
            @error_message = MESSAGE_TEXT;
        INSERT INTO migration_log (migration_id, log_level, message)
        VALUES ('ORDER_MIGRATION', 'ERROR', @error_message);
        RESIGNAL;
    END;
    
    -- 更新迁移状态
    INSERT INTO migration_status (migration_id, table_name, migration_type, status, start_time)
    VALUES ('ORDER_MIGRATION', 'orders', 'full', 'running', NOW())
    ON DUPLICATE KEY UPDATE 
        status = 'running', 
        start_time = NOW(),
        migrated_records = 0,
        failed_records = 0;
    
    START TRANSACTION;
    
    OPEN order_cursor;
    
    order_loop: LOOP
        FETCH order_cursor INTO v_order_id, v_user_id, v_order_date, v_total_amount,
                               v_order_status, v_shipping_address, v_payment_method, v_notes;
        
        IF done THEN
            LEAVE order_loop;
        END IF;
        
        -- 获取新的用户ID
        SELECT new_user_id INTO v_new_user_id
        FROM migration_user_mapping
        WHERE old_user_id = v_user_id;
        
        -- 处理配送地址
        IF v_shipping_address IS NOT NULL AND TRIM(v_shipping_address) != '' THEN
            -- 查找是否已有相同地址
            SELECT address_id INTO v_address_id
            FROM user_addresses
            WHERE user_id = v_new_user_id 
              AND street_address = v_shipping_address
            LIMIT 1;
            
            -- 如果没有找到,创建新地址
            IF v_address_id IS NULL THEN
                INSERT INTO user_addresses (
                    user_id, address_type, street_address, is_default
                ) VALUES (
                    v_new_user_id, 'shipping', v_shipping_address, FALSE
                );
                SET v_address_id = LAST_INSERT_ID();
            END IF;
        END IF;
        
        -- 插入订单数据
        INSERT INTO orders (
            order_uuid, user_id, order_number, order_status,
            subtotal, total_amount, payment_method, shipping_address_id,
            notes, created_at
        ) VALUES (
            UUID(),
            v_new_user_id,
            CONCAT('ORD', LPAD(v_order_id, 8, '0')),
            CASE v_order_status
                WHEN 'pending' THEN 'pending'
                WHEN 'paid' THEN 'paid'
                WHEN 'shipped' THEN 'shipped'
                WHEN 'delivered' THEN 'delivered'
                WHEN 'cancelled' THEN 'cancelled'
                ELSE 'pending'
            END,
            v_total_amount,
            v_total_amount,
            v_payment_method,
            v_address_id,
            v_notes,
            v_order_date
        );
        
        -- 记录映射关系
        INSERT INTO migration_order_mapping (old_order_id, new_order_id)
        VALUES (v_order_id, LAST_INSERT_ID());
        
        -- 更新迁移进度
        UPDATE migration_status 
        SET migrated_records = migrated_records + 1
        WHERE migration_id = 'ORDER_MIGRATION';
        
    END LOOP;
    
    CLOSE order_cursor;
    COMMIT;
    
    -- 更新迁移状态为完成
    UPDATE migration_status 
    SET status = 'completed', end_time = NOW()
    WHERE migration_id = 'ORDER_MIGRATION';
    
    INSERT INTO migration_log (migration_id, log_level, message)
    VALUES ('ORDER_MIGRATION', 'INFO', 'Order migration completed successfully');
    
END //
DELIMITER ;

# 案例2:增量数据同步

# 增量同步策略

-- 创建增量同步配置表
CREATE TABLE sync_config (
    config_id INT PRIMARY KEY AUTO_INCREMENT,
    table_name VARCHAR(100) NOT NULL,
    sync_type ENUM('insert', 'update', 'delete', 'all') DEFAULT 'all',
    timestamp_column VARCHAR(100),
    primary_key_column VARCHAR(100),
    last_sync_timestamp TIMESTAMP NULL,
    sync_interval_minutes INT DEFAULT 60,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 插入同步配置
INSERT INTO sync_config (table_name, sync_type, timestamp_column, primary_key_column, sync_interval_minutes)
VALUES 
('users', 'all', 'updated_at', 'user_id', 30),
('orders', 'all', 'updated_at', 'order_id', 15),
('products', 'all', 'updated_at', 'product_id', 60);
-- 增量同步存储过程
DELIMITER //
CREATE PROCEDURE IncrementalSync(
    IN p_table_name VARCHAR(100)
)
BEGIN
    DECLARE v_last_sync TIMESTAMP;
    DECLARE v_timestamp_column VARCHAR(100);
    DECLARE v_primary_key VARCHAR(100);
    DECLARE v_sync_sql TEXT;
    DECLARE v_count INT DEFAULT 0;
    
    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        ROLLBACK;
        GET DIAGNOSTICS CONDITION 1
            @error_message = MESSAGE_TEXT;
        INSERT INTO migration_log (migration_id, log_level, message, details)
        VALUES (CONCAT('SYNC_', UPPER(p_table_name)), 'ERROR', @error_message, 
                JSON_OBJECT('table', p_table_name, 'timestamp', NOW()));
        RESIGNAL;
    END;
    
    -- 获取同步配置
    SELECT last_sync_timestamp, timestamp_column, primary_key_column
    INTO v_last_sync, v_timestamp_column, v_primary_key
    FROM sync_config
    WHERE table_name = p_table_name AND is_active = TRUE;
    
    IF v_last_sync IS NULL THEN
        SET v_last_sync = DATE_SUB(NOW(), INTERVAL 1 DAY);
    END IF;
    
    START TRANSACTION;
    
    -- 根据表名执行不同的同步逻辑
    CASE p_table_name
        WHEN 'users' THEN
            -- 同步用户数据
            INSERT INTO target_users (
                user_id, username, email, first_name, last_name, 
                phone, status, created_at, updated_at
            )
            SELECT 
                user_id, username, email, first_name, last_name,
                phone, status, created_at, updated_at
            FROM source_users
            WHERE updated_at > v_last_sync
            ON DUPLICATE KEY UPDATE
                username = VALUES(username),
                email = VALUES(email),
                first_name = VALUES(first_name),
                last_name = VALUES(last_name),
                phone = VALUES(phone),
                status = VALUES(status),
                updated_at = VALUES(updated_at);
            
            SET v_count = ROW_COUNT();
            
        WHEN 'orders' THEN
            -- 同步订单数据
            INSERT INTO target_orders (
                order_id, user_id, order_number, order_status,
                total_amount, payment_method, created_at, updated_at
            )
            SELECT 
                order_id, user_id, order_number, order_status,
                total_amount, payment_method, created_at, updated_at
            FROM source_orders
            WHERE updated_at > v_last_sync
            ON DUPLICATE KEY UPDATE
                order_status = VALUES(order_status),
                total_amount = VALUES(total_amount),
                payment_method = VALUES(payment_method),
                updated_at = VALUES(updated_at);
            
            SET v_count = ROW_COUNT();
            
        ELSE
            SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Unsupported table for sync';
    END CASE;
    
    -- 更新最后同步时间
    UPDATE sync_config
    SET last_sync_timestamp = NOW(),
        updated_at = NOW()
    WHERE table_name = p_table_name;
    
    COMMIT;
    
    -- 记录同步日志
    INSERT INTO migration_log (migration_id, log_level, message, details)
    VALUES (CONCAT('SYNC_', UPPER(p_table_name)), 'INFO', 
            CONCAT('Incremental sync completed for ', p_table_name),
            JSON_OBJECT('table', p_table_name, 'records_synced', v_count, 
                       'last_sync', v_last_sync, 'current_sync', NOW()));
    
END //
DELIMITER ;

# 案例3:数据清洗与转换

# 数据质量检查

-- 数据质量检查函数
DELIMITER //
CREATE FUNCTION ValidateEmail(email VARCHAR(255))
RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
    RETURN email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$';
END //

CREATE FUNCTION ValidatePhone(phone VARCHAR(20))
RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
    -- 简单的中国手机号验证
    RETURN phone REGEXP '^1[3-9][0-9]{9}$';
END //
DELIMITER ;

-- 数据清洗存储过程
DELIMITER //
CREATE PROCEDURE CleanUserData()
BEGIN
    DECLARE v_cleaned_count INT DEFAULT 0;
    DECLARE v_error_count INT DEFAULT 0;
    
    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        ROLLBACK;
        GET DIAGNOSTICS CONDITION 1
            @error_message = MESSAGE_TEXT;
        INSERT INTO migration_log (migration_id, log_level, message)
        VALUES ('DATA_CLEANING', 'ERROR', @error_message);
        RESIGNAL;
    END;
    
    START TRANSACTION;
    
    -- 清理邮箱格式
    UPDATE users 
    SET email = LOWER(TRIM(email))
    WHERE email IS NOT NULL;
    
    -- 标记无效邮箱
    UPDATE users 
    SET status = 'inactive'
    WHERE email IS NOT NULL 
      AND NOT ValidateEmail(email)
      AND status = 'active';
    
    SET v_error_count = ROW_COUNT();
    
    -- 清理手机号格式
    UPDATE users 
    SET phone = REGEXP_REPLACE(phone, '[^0-9]', '')
    WHERE phone IS NOT NULL;
    
    -- 标记无效手机号
    UPDATE users 
    SET phone_verified = FALSE
    WHERE phone IS NOT NULL 
      AND NOT ValidatePhone(phone);
    
    -- 清理姓名字段
    UPDATE users 
    SET first_name = TRIM(first_name),
        last_name = TRIM(last_name)
    WHERE first_name IS NOT NULL OR last_name IS NOT NULL;
    
    -- 处理重复用户
    CREATE TEMPORARY TABLE duplicate_users AS
    SELECT email, COUNT(*) as cnt
    FROM users
    WHERE email IS NOT NULL
    GROUP BY email
    HAVING COUNT(*) > 1;
    
    -- 保留最新的用户记录,标记其他为重复
    UPDATE users u1
    INNER JOIN duplicate_users du ON u1.email = du.email
    SET u1.status = 'inactive'
    WHERE u1.user_id NOT IN (
        SELECT MAX(u2.user_id)
        FROM users u2
        WHERE u2.email = u1.email
    );
    
    SET v_cleaned_count = ROW_COUNT();
    
    DROP TEMPORARY TABLE duplicate_users;
    
    COMMIT;
    
    -- 记录清洗结果
    INSERT INTO migration_log (migration_id, log_level, message, details)
    VALUES ('DATA_CLEANING', 'INFO', 'Data cleaning completed',
            JSON_OBJECT('cleaned_records', v_cleaned_count, 
                       'invalid_records', v_error_count));
    
END //
DELIMITER ;

# 性能优化策略

# 1. 批量处理

-- 批量迁移优化
DELIMITER //
CREATE PROCEDURE BatchMigration(
    IN p_table_name VARCHAR(100),
    IN p_batch_size INT DEFAULT 1000
)
BEGIN
    DECLARE v_offset INT DEFAULT 0;
    DECLARE v_total_count INT;
    DECLARE v_batch_count INT;
    DECLARE v_migration_id VARCHAR(100);
    
    SET v_migration_id = CONCAT('BATCH_', UPPER(p_table_name), '_', UNIX_TIMESTAMP());
    
    -- 获取总记录数
    SET @sql = CONCAT('SELECT COUNT(*) FROM ', p_table_name, ' INTO @total_count');
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
    SET v_total_count = @total_count;
    
    -- 初始化迁移状态
    INSERT INTO migration_status (migration_id, table_name, migration_type, 
                                 status, total_records, start_time)
    VALUES (v_migration_id, p_table_name, 'full', 'running', v_total_count, NOW());
    
    -- 批量处理循环
    WHILE v_offset < v_total_count DO
        
        -- 处理当前批次
        SET @sql = CONCAT(
            'INSERT INTO target_', p_table_name, ' ',
            'SELECT * FROM source_', p_table_name, ' ',
            'LIMIT ', p_batch_size, ' OFFSET ', v_offset
        );
        
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
        
        SET v_batch_count = ROW_COUNT();
        SET v_offset = v_offset + p_batch_size;
        
        -- 更新进度
        UPDATE migration_status
        SET migrated_records = v_offset,
            updated_at = NOW()
        WHERE migration_id = v_migration_id;
        
        -- 记录批次完成
        INSERT INTO migration_log (migration_id, log_level, message, details)
        VALUES (v_migration_id, 'INFO', 'Batch completed',
                JSON_OBJECT('offset', v_offset - p_batch_size, 
                           'batch_size', v_batch_count,
                           'progress_percent', ROUND(v_offset * 100.0 / v_total_count, 2)));
        
        -- 短暂休息,避免长时间锁表
        DO SLEEP(0.1);
        
    END WHILE;
    
    -- 完成迁移
    UPDATE migration_status
    SET status = 'completed', end_time = NOW()
    WHERE migration_id = v_migration_id;
    
END //
DELIMITER ;

# 2. 并行处理

-- 并行迁移配置
CREATE TABLE parallel_migration_tasks (
    task_id INT PRIMARY KEY AUTO_INCREMENT,
    migration_id VARCHAR(100) NOT NULL,
    table_name VARCHAR(100) NOT NULL,
    partition_key VARCHAR(100),
    start_value VARCHAR(100),
    end_value VARCHAR(100),
    status ENUM('pending', 'running', 'completed', 'failed') DEFAULT 'pending',
    worker_id VARCHAR(50),
    start_time TIMESTAMP NULL,
    end_time TIMESTAMP NULL,
    records_processed INT DEFAULT 0,
    INDEX idx_migration_status (migration_id, status)
);

-- 创建并行任务
DELIMITER //
CREATE PROCEDURE CreateParallelTasks(
    IN p_migration_id VARCHAR(100),
    IN p_table_name VARCHAR(100),
    IN p_partition_key VARCHAR(100),
    IN p_num_partitions INT DEFAULT 4
)
BEGIN
    DECLARE v_min_value BIGINT;
    DECLARE v_max_value BIGINT;
    DECLARE v_partition_size BIGINT;
    DECLARE v_start_value BIGINT;
    DECLARE v_end_value BIGINT;
    DECLARE v_i INT DEFAULT 0;
    
    -- 获取分区范围
    SET @sql = CONCAT('SELECT MIN(', p_partition_key, '), MAX(', p_partition_key, ') ',
                     'FROM ', p_table_name, ' INTO @min_val, @max_val');
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
    
    SET v_min_value = @min_val;
    SET v_max_value = @max_val;
    SET v_partition_size = CEIL((v_max_value - v_min_value + 1) / p_num_partitions);
    
    -- 创建分区任务
    WHILE v_i < p_num_partitions DO
        SET v_start_value = v_min_value + (v_i * v_partition_size);
        SET v_end_value = LEAST(v_start_value + v_partition_size - 1, v_max_value);
        
        INSERT INTO parallel_migration_tasks (
            migration_id, table_name, partition_key, start_value, end_value
        ) VALUES (
            p_migration_id, p_table_name, p_partition_key, 
            v_start_value, v_end_value
        );
        
        SET v_i = v_i + 1;
    END WHILE;
    
END //
DELIMITER ;

# 数据验证与回滚

# 1. 数据一致性验证

-- 数据验证存储过程
DELIMITER //
CREATE PROCEDURE ValidateMigration(
    IN p_migration_id VARCHAR(100)
)
BEGIN
    DECLARE v_source_count BIGINT;
    DECLARE v_target_count BIGINT;
    DECLARE v_checksum_match BOOLEAN DEFAULT TRUE;
    DECLARE v_validation_result JSON;
    
    -- 记录数量验证
    SELECT COUNT(*) INTO v_source_count FROM old_users;
    SELECT COUNT(*) INTO v_target_count FROM users;
    
    -- 数据校验和验证
    SET @source_checksum = (
        SELECT SUM(CRC32(CONCAT(user_id, username, email))) 
        FROM old_users
    );
    
    SET @target_checksum = (
        SELECT SUM(CRC32(CONCAT(user_id, username, email))) 
        FROM users u
        INNER JOIN migration_user_mapping m ON u.user_id = m.new_user_id
    );
    
    SET v_checksum_match = (@source_checksum = @target_checksum);
    
    -- 业务逻辑验证
    SET @business_validation = (
        SELECT COUNT(*) = 0
        FROM users
        WHERE email IS NULL OR email = ''
           OR NOT ValidateEmail(email)
    );
    
    -- 构建验证结果
    SET v_validation_result = JSON_OBJECT(
        'record_count_match', (v_source_count = v_target_count),
        'source_count', v_source_count,
        'target_count', v_target_count,
        'checksum_match', v_checksum_match,
        'business_validation_passed', @business_validation,
        'validation_time', NOW()
    );
    
    -- 记录验证结果
    INSERT INTO migration_log (migration_id, log_level, message, details)
    VALUES (p_migration_id, 'INFO', 'Migration validation completed', v_validation_result);
    
    -- 如果验证失败,标记迁移状态
    IF v_source_count != v_target_count OR NOT v_checksum_match OR NOT @business_validation THEN
        UPDATE migration_status
        SET status = 'failed'
        WHERE migration_id = p_migration_id;
        
        INSERT INTO migration_log (migration_id, log_level, message)
        VALUES (p_migration_id, 'ERROR', 'Migration validation failed');
    END IF;
    
END //
DELIMITER ;

# 2. 回滚机制

-- 回滚存储过程
DELIMITER //
CREATE PROCEDURE RollbackMigration(
    IN p_migration_id VARCHAR(100)
)
BEGIN
    DECLARE v_table_name VARCHAR(100);
    DECLARE v_backup_table VARCHAR(100);
    
    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        ROLLBACK;
        GET DIAGNOSTICS CONDITION 1
            @error_message = MESSAGE_TEXT;
        INSERT INTO migration_log (migration_id, log_level, message)
        VALUES (p_migration_id, 'ERROR', CONCAT('Rollback failed: ', @error_message));
        RESIGNAL;
    END;
    
    -- 获取迁移信息
    SELECT table_name INTO v_table_name
    FROM migration_status
    WHERE migration_id = p_migration_id;
    
    SET v_backup_table = CONCAT(v_table_name, '_backup_', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'));
    
    START TRANSACTION;
    
    -- 备份当前数据
    SET @sql = CONCAT('CREATE TABLE ', v_backup_table, ' AS SELECT * FROM ', v_table_name);
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
    
    -- 删除迁移的数据
    CASE v_table_name
        WHEN 'users' THEN
            DELETE u FROM users u
            INNER JOIN migration_user_mapping m ON u.user_id = m.new_user_id;
            
            DELETE FROM migration_user_mapping
            WHERE migration_id = p_migration_id;
            
        WHEN 'orders' THEN
            DELETE o FROM orders o
            INNER JOIN migration_order_mapping m ON o.order_id = m.new_order_id;
            
            DELETE FROM migration_order_mapping
            WHERE migration_id = p_migration_id;
    END CASE;
    
    -- 更新迁移状态
    UPDATE migration_status
    SET status = 'rollback', end_time = NOW()
    WHERE migration_id = p_migration_id;
    
    COMMIT;
    
    -- 记录回滚日志
    INSERT INTO migration_log (migration_id, log_level, message, details)
    VALUES (p_migration_id, 'INFO', 'Migration rollback completed',
            JSON_OBJECT('backup_table', v_backup_table, 'rollback_time', NOW()));
    
END //
DELIMITER ;

# 监控与告警

# 1. 迁移监控

-- 迁移进度监控视图
CREATE VIEW migration_progress AS
SELECT 
    ms.migration_id,
    ms.table_name,
    ms.migration_type,
    ms.status,
    ms.total_records,
    ms.migrated_records,
    ms.failed_records,
    ROUND(ms.migrated_records * 100.0 / NULLIF(ms.total_records, 0), 2) as progress_percent,
    TIMESTAMPDIFF(MINUTE, ms.start_time, COALESCE(ms.end_time, NOW())) as duration_minutes,
    CASE 
        WHEN ms.status = 'running' AND ms.migrated_records > 0 THEN
            ROUND((ms.total_records - ms.migrated_records) * 
                  TIMESTAMPDIFF(MINUTE, ms.start_time, NOW()) / ms.migrated_records, 0)
        ELSE NULL
    END as estimated_remaining_minutes,
    ms.start_time,
    ms.end_time
FROM migration_status ms;

-- 迁移异常监控
CREATE VIEW migration_alerts AS
SELECT 
    mp.migration_id,
    mp.table_name,
    mp.status,
    mp.progress_percent,
    mp.duration_minutes,
    CASE 
        WHEN mp.status = 'failed' THEN 'CRITICAL'
        WHEN mp.status = 'running' AND mp.duration_minutes > 120 THEN 'WARNING'
        WHEN mp.status = 'running' AND mp.progress_percent < 10 AND mp.duration_minutes > 30 THEN 'WARNING'
        ELSE 'NORMAL'
    END as alert_level,
    CASE 
        WHEN mp.status = 'failed' THEN '迁移失败'
        WHEN mp.status = 'running' AND mp.duration_minutes > 120 THEN '迁移时间过长'
        WHEN mp.status = 'running' AND mp.progress_percent < 10 AND mp.duration_minutes > 30 THEN '迁移进度缓慢'
        ELSE '正常'
    END as alert_message
FROM migration_progress mp
WHERE mp.status IN ('running', 'failed')
   OR (mp.status = 'running' AND (mp.duration_minutes > 120 OR 
       (mp.progress_percent < 10 AND mp.duration_minutes > 30)));

# 2. 自动化告警

-- 告警检查存储过程
DELIMITER //
CREATE PROCEDURE CheckMigrationAlerts()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE v_migration_id VARCHAR(100);
    DECLARE v_alert_level VARCHAR(20);
    DECLARE v_alert_message TEXT;
    
    DECLARE alert_cursor CURSOR FOR 
        SELECT migration_id, alert_level, alert_message
        FROM migration_alerts
        WHERE alert_level IN ('CRITICAL', 'WARNING');
    
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
    
    OPEN alert_cursor;
    
    alert_loop: LOOP
        FETCH alert_cursor INTO v_migration_id, v_alert_level, v_alert_message;
        
        IF done THEN
            LEAVE alert_loop;
        END IF;
        
        -- 发送告警(这里可以集成邮件、短信等通知方式)
        INSERT INTO migration_log (migration_id, log_level, message, details)
        VALUES (v_migration_id, 'WARN', v_alert_message,
                JSON_OBJECT('alert_level', v_alert_level, 'check_time', NOW()));
        
        -- 这里可以调用外部API发送通知
        -- CALL SendAlert(v_migration_id, v_alert_level, v_alert_message);
        
    END LOOP;
    
    CLOSE alert_cursor;
    
END //
DELIMITER ;

# 最佳实践总结

# 1. 迁移前准备

  • 充分的需求分析和方案设计
  • 详细的数据映射和转换规则
  • 完整的测试环境和测试数据
  • 制定详细的回滚计划

# 2. 迁移执行

  • 采用分批次、增量的迁移策略
  • 实施严格的数据验证机制
  • 建立完善的监控和告警系统
  • 保持详细的操作日志

# 3. 迁移后维护

  • 持续的数据质量监控
  • 定期的一致性检查
  • 及时的性能优化
  • 完整的文档和知识传递

# 4. 风险控制

  • 制定详细的风险评估
  • 建立多层次的备份机制
  • 实施渐进式的切换策略
  • 准备应急响应预案

通过这些实战案例和最佳实践,可以帮助团队成功完成各种复杂的数据迁移项目,确保数据的完整性、一致性和业务的连续性。