数据迁移实战
# 数据迁移实战
# 概述
数据迁移是企业信息化建设中的重要环节,涉及从旧系统到新系统的数据转移、数据格式转换、数据清洗等多个方面。本章节通过实际案例,展示如何使用SQL进行高效、安全的数据迁移。
# 迁移策略与规划
# 1. 迁移类型
# 完全迁移 (Big Bang)
- 一次性迁移所有数据
- 适用于小型系统或可接受停机的场景
- 风险较高但实施简单
# 增量迁移 (Incremental)
- 分批次迁移数据
- 适用于大型系统或不能停机的场景
- 风险较低但实施复杂
# 并行运行 (Parallel)
- 新旧系统同时运行
- 逐步切换业务模块
- 风险最低但成本最高
# 2. 迁移阶段
-- 迁移状态跟踪表
CREATE TABLE migration_status (
migration_id VARCHAR(50) PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
migration_type ENUM('full', 'incremental', 'delta') NOT NULL,
status ENUM('pending', 'running', 'completed', 'failed', 'rollback') DEFAULT 'pending',
total_records BIGINT DEFAULT 0,
migrated_records BIGINT DEFAULT 0,
failed_records BIGINT DEFAULT 0,
start_time TIMESTAMP NULL,
end_time TIMESTAMP NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_table (status, table_name),
INDEX idx_migration_type (migration_type)
);
-- 迁移日志表
CREATE TABLE migration_log (
log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
migration_id VARCHAR(50) NOT NULL,
log_level ENUM('INFO', 'WARN', 'ERROR', 'DEBUG') NOT NULL,
message TEXT NOT NULL,
details JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_migration_level (migration_id, log_level),
INDEX idx_created_at (created_at),
FOREIGN KEY (migration_id) REFERENCES migration_status(migration_id)
);
# 数据迁移实战案例
# 案例1:电商系统升级迁移
# 源系统表结构 (Legacy)
-- 旧系统用户表
CREATE TABLE old_users (
user_id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
password_hash VARCHAR(255),
full_name VARCHAR(100),
phone VARCHAR(20),
address TEXT,
registration_date DATETIME,
last_login DATETIME,
status TINYINT -- 0:inactive, 1:active, 2:suspended
);
-- 旧系统订单表
CREATE TABLE old_orders (
order_id INT PRIMARY KEY,
user_id INT,
order_date DATETIME,
total_amount DECIMAL(10,2),
order_status VARCHAR(20), -- 'pending', 'paid', 'shipped', 'delivered', 'cancelled'
shipping_address TEXT,
payment_method VARCHAR(50),
notes TEXT
);
# 目标系统表结构 (New)
-- 新系统用户表
CREATE TABLE users (
user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_uuid VARCHAR(36) UNIQUE NOT NULL,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
email_verified BOOLEAN DEFAULT FALSE,
phone_verified BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
last_login_at TIMESTAMP NULL
);
-- 新系统地址表
CREATE TABLE user_addresses (
address_id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
address_type ENUM('billing', 'shipping', 'both') DEFAULT 'both',
street_address VARCHAR(255),
city VARCHAR(100),
state VARCHAR(100),
postal_code VARCHAR(20),
country VARCHAR(100) DEFAULT 'China',
is_default BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
-- 新系统订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_uuid VARCHAR(36) UNIQUE NOT NULL,
user_id BIGINT NOT NULL,
order_number VARCHAR(50) UNIQUE NOT NULL,
order_status ENUM('pending', 'confirmed', 'paid', 'shipped', 'delivered', 'cancelled', 'refunded') DEFAULT 'pending',
subtotal DECIMAL(12,2) NOT NULL,
tax_amount DECIMAL(12,2) DEFAULT 0,
shipping_amount DECIMAL(12,2) DEFAULT 0,
total_amount DECIMAL(12,2) NOT NULL,
currency VARCHAR(3) DEFAULT 'CNY',
payment_method VARCHAR(50),
payment_status ENUM('pending', 'paid', 'failed', 'refunded') DEFAULT 'pending',
shipping_address_id BIGINT,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (shipping_address_id) REFERENCES user_addresses(address_id)
);
# 数据迁移脚本
-- 1. 用户数据迁移
DELIMITER //
CREATE PROCEDURE MigrateUsers()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_user_id INT;
DECLARE v_username VARCHAR(50);
DECLARE v_email VARCHAR(100);
DECLARE v_password_hash VARCHAR(255);
DECLARE v_full_name VARCHAR(100);
DECLARE v_phone VARCHAR(20);
DECLARE v_address TEXT;
DECLARE v_registration_date DATETIME;
DECLARE v_last_login DATETIME;
DECLARE v_status TINYINT;
DECLARE v_new_user_id BIGINT;
DECLARE v_first_name VARCHAR(50);
DECLARE v_last_name VARCHAR(50);
DECLARE user_cursor CURSOR FOR
SELECT user_id, username, email, password_hash, full_name, phone,
address, registration_date, last_login, status
FROM old_users
WHERE user_id NOT IN (SELECT user_id FROM migration_user_mapping);
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
GET DIAGNOSTICS CONDITION 1
@error_message = MESSAGE_TEXT;
INSERT INTO migration_log (migration_id, log_level, message)
VALUES ('USER_MIGRATION', 'ERROR', @error_message);
RESIGNAL;
END;
-- 更新迁移状态
INSERT INTO migration_status (migration_id, table_name, migration_type, status, start_time)
VALUES ('USER_MIGRATION', 'users', 'full', 'running', NOW())
ON DUPLICATE KEY UPDATE
status = 'running',
start_time = NOW(),
migrated_records = 0,
failed_records = 0;
START TRANSACTION;
OPEN user_cursor;
user_loop: LOOP
FETCH user_cursor INTO v_user_id, v_username, v_email, v_password_hash,
v_full_name, v_phone, v_address, v_registration_date,
v_last_login, v_status;
IF done THEN
LEAVE user_loop;
END IF;
-- 分割姓名
SET v_first_name = SUBSTRING_INDEX(COALESCE(v_full_name, ''), ' ', 1);
SET v_last_name = CASE
WHEN LOCATE(' ', COALESCE(v_full_name, '')) > 0
THEN SUBSTRING(v_full_name, LOCATE(' ', v_full_name) + 1)
ELSE ''
END;
-- 插入用户数据
INSERT INTO users (
user_uuid, username, email, password_hash, first_name, last_name,
phone, status, created_at, last_login_at
) VALUES (
UUID(),
v_username,
v_email,
v_password_hash,
v_first_name,
v_last_name,
v_phone,
CASE v_status
WHEN 0 THEN 'inactive'
WHEN 1 THEN 'active'
WHEN 2 THEN 'suspended'
ELSE 'inactive'
END,
v_registration_date,
v_last_login
);
SET v_new_user_id = LAST_INSERT_ID();
-- 记录映射关系
INSERT INTO migration_user_mapping (old_user_id, new_user_id)
VALUES (v_user_id, v_new_user_id);
-- 迁移地址信息
IF v_address IS NOT NULL AND TRIM(v_address) != '' THEN
INSERT INTO user_addresses (
user_id, address_type, street_address, is_default
) VALUES (
v_new_user_id, 'both', v_address, TRUE
);
END IF;
-- 更新迁移进度
UPDATE migration_status
SET migrated_records = migrated_records + 1
WHERE migration_id = 'USER_MIGRATION';
END LOOP;
CLOSE user_cursor;
COMMIT;
-- 更新迁移状态为完成
UPDATE migration_status
SET status = 'completed', end_time = NOW()
WHERE migration_id = 'USER_MIGRATION';
INSERT INTO migration_log (migration_id, log_level, message)
VALUES ('USER_MIGRATION', 'INFO', 'User migration completed successfully');
END //
DELIMITER ;
-- 2. 订单数据迁移
DELIMITER //
CREATE PROCEDURE MigrateOrders()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_order_id INT;
DECLARE v_user_id INT;
DECLARE v_new_user_id BIGINT;
DECLARE v_order_date DATETIME;
DECLARE v_total_amount DECIMAL(10,2);
DECLARE v_order_status VARCHAR(20);
DECLARE v_shipping_address TEXT;
DECLARE v_payment_method VARCHAR(50);
DECLARE v_notes TEXT;
DECLARE v_address_id BIGINT DEFAULT NULL;
DECLARE order_cursor CURSOR FOR
SELECT o.order_id, o.user_id, o.order_date, o.total_amount,
o.order_status, o.shipping_address, o.payment_method, o.notes
FROM old_orders o
INNER JOIN migration_user_mapping m ON o.user_id = m.old_user_id
WHERE o.order_id NOT IN (SELECT old_order_id FROM migration_order_mapping);
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
GET DIAGNOSTICS CONDITION 1
@error_message = MESSAGE_TEXT;
INSERT INTO migration_log (migration_id, log_level, message)
VALUES ('ORDER_MIGRATION', 'ERROR', @error_message);
RESIGNAL;
END;
-- 更新迁移状态
INSERT INTO migration_status (migration_id, table_name, migration_type, status, start_time)
VALUES ('ORDER_MIGRATION', 'orders', 'full', 'running', NOW())
ON DUPLICATE KEY UPDATE
status = 'running',
start_time = NOW(),
migrated_records = 0,
failed_records = 0;
START TRANSACTION;
OPEN order_cursor;
order_loop: LOOP
FETCH order_cursor INTO v_order_id, v_user_id, v_order_date, v_total_amount,
v_order_status, v_shipping_address, v_payment_method, v_notes;
IF done THEN
LEAVE order_loop;
END IF;
-- 获取新的用户ID
SELECT new_user_id INTO v_new_user_id
FROM migration_user_mapping
WHERE old_user_id = v_user_id;
-- 处理配送地址
IF v_shipping_address IS NOT NULL AND TRIM(v_shipping_address) != '' THEN
-- 查找是否已有相同地址
SELECT address_id INTO v_address_id
FROM user_addresses
WHERE user_id = v_new_user_id
AND street_address = v_shipping_address
LIMIT 1;
-- 如果没有找到,创建新地址
IF v_address_id IS NULL THEN
INSERT INTO user_addresses (
user_id, address_type, street_address, is_default
) VALUES (
v_new_user_id, 'shipping', v_shipping_address, FALSE
);
SET v_address_id = LAST_INSERT_ID();
END IF;
END IF;
-- 插入订单数据
INSERT INTO orders (
order_uuid, user_id, order_number, order_status,
subtotal, total_amount, payment_method, shipping_address_id,
notes, created_at
) VALUES (
UUID(),
v_new_user_id,
CONCAT('ORD', LPAD(v_order_id, 8, '0')),
CASE v_order_status
WHEN 'pending' THEN 'pending'
WHEN 'paid' THEN 'paid'
WHEN 'shipped' THEN 'shipped'
WHEN 'delivered' THEN 'delivered'
WHEN 'cancelled' THEN 'cancelled'
ELSE 'pending'
END,
v_total_amount,
v_total_amount,
v_payment_method,
v_address_id,
v_notes,
v_order_date
);
-- 记录映射关系
INSERT INTO migration_order_mapping (old_order_id, new_order_id)
VALUES (v_order_id, LAST_INSERT_ID());
-- 更新迁移进度
UPDATE migration_status
SET migrated_records = migrated_records + 1
WHERE migration_id = 'ORDER_MIGRATION';
END LOOP;
CLOSE order_cursor;
COMMIT;
-- 更新迁移状态为完成
UPDATE migration_status
SET status = 'completed', end_time = NOW()
WHERE migration_id = 'ORDER_MIGRATION';
INSERT INTO migration_log (migration_id, log_level, message)
VALUES ('ORDER_MIGRATION', 'INFO', 'Order migration completed successfully');
END //
DELIMITER ;
# 案例2:增量数据同步
# 增量同步策略
-- 创建增量同步配置表
CREATE TABLE sync_config (
config_id INT PRIMARY KEY AUTO_INCREMENT,
table_name VARCHAR(100) NOT NULL,
sync_type ENUM('insert', 'update', 'delete', 'all') DEFAULT 'all',
timestamp_column VARCHAR(100),
primary_key_column VARCHAR(100),
last_sync_timestamp TIMESTAMP NULL,
sync_interval_minutes INT DEFAULT 60,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 插入同步配置
INSERT INTO sync_config (table_name, sync_type, timestamp_column, primary_key_column, sync_interval_minutes)
VALUES
('users', 'all', 'updated_at', 'user_id', 30),
('orders', 'all', 'updated_at', 'order_id', 15),
('products', 'all', 'updated_at', 'product_id', 60);
-- 增量同步存储过程
DELIMITER //
CREATE PROCEDURE IncrementalSync(
IN p_table_name VARCHAR(100)
)
BEGIN
DECLARE v_last_sync TIMESTAMP;
DECLARE v_timestamp_column VARCHAR(100);
DECLARE v_primary_key VARCHAR(100);
DECLARE v_sync_sql TEXT;
DECLARE v_count INT DEFAULT 0;
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
GET DIAGNOSTICS CONDITION 1
@error_message = MESSAGE_TEXT;
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (CONCAT('SYNC_', UPPER(p_table_name)), 'ERROR', @error_message,
JSON_OBJECT('table', p_table_name, 'timestamp', NOW()));
RESIGNAL;
END;
-- 获取同步配置
SELECT last_sync_timestamp, timestamp_column, primary_key_column
INTO v_last_sync, v_timestamp_column, v_primary_key
FROM sync_config
WHERE table_name = p_table_name AND is_active = TRUE;
IF v_last_sync IS NULL THEN
SET v_last_sync = DATE_SUB(NOW(), INTERVAL 1 DAY);
END IF;
START TRANSACTION;
-- 根据表名执行不同的同步逻辑
CASE p_table_name
WHEN 'users' THEN
-- 同步用户数据
INSERT INTO target_users (
user_id, username, email, first_name, last_name,
phone, status, created_at, updated_at
)
SELECT
user_id, username, email, first_name, last_name,
phone, status, created_at, updated_at
FROM source_users
WHERE updated_at > v_last_sync
ON DUPLICATE KEY UPDATE
username = VALUES(username),
email = VALUES(email),
first_name = VALUES(first_name),
last_name = VALUES(last_name),
phone = VALUES(phone),
status = VALUES(status),
updated_at = VALUES(updated_at);
SET v_count = ROW_COUNT();
WHEN 'orders' THEN
-- 同步订单数据
INSERT INTO target_orders (
order_id, user_id, order_number, order_status,
total_amount, payment_method, created_at, updated_at
)
SELECT
order_id, user_id, order_number, order_status,
total_amount, payment_method, created_at, updated_at
FROM source_orders
WHERE updated_at > v_last_sync
ON DUPLICATE KEY UPDATE
order_status = VALUES(order_status),
total_amount = VALUES(total_amount),
payment_method = VALUES(payment_method),
updated_at = VALUES(updated_at);
SET v_count = ROW_COUNT();
ELSE
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Unsupported table for sync';
END CASE;
-- 更新最后同步时间
UPDATE sync_config
SET last_sync_timestamp = NOW(),
updated_at = NOW()
WHERE table_name = p_table_name;
COMMIT;
-- 记录同步日志
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (CONCAT('SYNC_', UPPER(p_table_name)), 'INFO',
CONCAT('Incremental sync completed for ', p_table_name),
JSON_OBJECT('table', p_table_name, 'records_synced', v_count,
'last_sync', v_last_sync, 'current_sync', NOW()));
END //
DELIMITER ;
# 案例3:数据清洗与转换
# 数据质量检查
-- 数据质量检查函数
DELIMITER //
CREATE FUNCTION ValidateEmail(email VARCHAR(255))
RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
RETURN email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$';
END //
CREATE FUNCTION ValidatePhone(phone VARCHAR(20))
RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
-- 简单的中国手机号验证
RETURN phone REGEXP '^1[3-9][0-9]{9}$';
END //
DELIMITER ;
-- 数据清洗存储过程
DELIMITER //
CREATE PROCEDURE CleanUserData()
BEGIN
DECLARE v_cleaned_count INT DEFAULT 0;
DECLARE v_error_count INT DEFAULT 0;
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
GET DIAGNOSTICS CONDITION 1
@error_message = MESSAGE_TEXT;
INSERT INTO migration_log (migration_id, log_level, message)
VALUES ('DATA_CLEANING', 'ERROR', @error_message);
RESIGNAL;
END;
START TRANSACTION;
-- 清理邮箱格式
UPDATE users
SET email = LOWER(TRIM(email))
WHERE email IS NOT NULL;
-- 标记无效邮箱
UPDATE users
SET status = 'inactive'
WHERE email IS NOT NULL
AND NOT ValidateEmail(email)
AND status = 'active';
SET v_error_count = ROW_COUNT();
-- 清理手机号格式
UPDATE users
SET phone = REGEXP_REPLACE(phone, '[^0-9]', '')
WHERE phone IS NOT NULL;
-- 标记无效手机号
UPDATE users
SET phone_verified = FALSE
WHERE phone IS NOT NULL
AND NOT ValidatePhone(phone);
-- 清理姓名字段
UPDATE users
SET first_name = TRIM(first_name),
last_name = TRIM(last_name)
WHERE first_name IS NOT NULL OR last_name IS NOT NULL;
-- 处理重复用户
CREATE TEMPORARY TABLE duplicate_users AS
SELECT email, COUNT(*) as cnt
FROM users
WHERE email IS NOT NULL
GROUP BY email
HAVING COUNT(*) > 1;
-- 保留最新的用户记录,标记其他为重复
UPDATE users u1
INNER JOIN duplicate_users du ON u1.email = du.email
SET u1.status = 'inactive'
WHERE u1.user_id NOT IN (
SELECT MAX(u2.user_id)
FROM users u2
WHERE u2.email = u1.email
);
SET v_cleaned_count = ROW_COUNT();
DROP TEMPORARY TABLE duplicate_users;
COMMIT;
-- 记录清洗结果
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES ('DATA_CLEANING', 'INFO', 'Data cleaning completed',
JSON_OBJECT('cleaned_records', v_cleaned_count,
'invalid_records', v_error_count));
END //
DELIMITER ;
# 性能优化策略
# 1. 批量处理
-- 批量迁移优化
DELIMITER //
CREATE PROCEDURE BatchMigration(
IN p_table_name VARCHAR(100),
IN p_batch_size INT DEFAULT 1000
)
BEGIN
DECLARE v_offset INT DEFAULT 0;
DECLARE v_total_count INT;
DECLARE v_batch_count INT;
DECLARE v_migration_id VARCHAR(100);
SET v_migration_id = CONCAT('BATCH_', UPPER(p_table_name), '_', UNIX_TIMESTAMP());
-- 获取总记录数
SET @sql = CONCAT('SELECT COUNT(*) FROM ', p_table_name, ' INTO @total_count');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET v_total_count = @total_count;
-- 初始化迁移状态
INSERT INTO migration_status (migration_id, table_name, migration_type,
status, total_records, start_time)
VALUES (v_migration_id, p_table_name, 'full', 'running', v_total_count, NOW());
-- 批量处理循环
WHILE v_offset < v_total_count DO
-- 处理当前批次
SET @sql = CONCAT(
'INSERT INTO target_', p_table_name, ' ',
'SELECT * FROM source_', p_table_name, ' ',
'LIMIT ', p_batch_size, ' OFFSET ', v_offset
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET v_batch_count = ROW_COUNT();
SET v_offset = v_offset + p_batch_size;
-- 更新进度
UPDATE migration_status
SET migrated_records = v_offset,
updated_at = NOW()
WHERE migration_id = v_migration_id;
-- 记录批次完成
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (v_migration_id, 'INFO', 'Batch completed',
JSON_OBJECT('offset', v_offset - p_batch_size,
'batch_size', v_batch_count,
'progress_percent', ROUND(v_offset * 100.0 / v_total_count, 2)));
-- 短暂休息,避免长时间锁表
DO SLEEP(0.1);
END WHILE;
-- 完成迁移
UPDATE migration_status
SET status = 'completed', end_time = NOW()
WHERE migration_id = v_migration_id;
END //
DELIMITER ;
# 2. 并行处理
-- 并行迁移配置
CREATE TABLE parallel_migration_tasks (
task_id INT PRIMARY KEY AUTO_INCREMENT,
migration_id VARCHAR(100) NOT NULL,
table_name VARCHAR(100) NOT NULL,
partition_key VARCHAR(100),
start_value VARCHAR(100),
end_value VARCHAR(100),
status ENUM('pending', 'running', 'completed', 'failed') DEFAULT 'pending',
worker_id VARCHAR(50),
start_time TIMESTAMP NULL,
end_time TIMESTAMP NULL,
records_processed INT DEFAULT 0,
INDEX idx_migration_status (migration_id, status)
);
-- 创建并行任务
DELIMITER //
CREATE PROCEDURE CreateParallelTasks(
IN p_migration_id VARCHAR(100),
IN p_table_name VARCHAR(100),
IN p_partition_key VARCHAR(100),
IN p_num_partitions INT DEFAULT 4
)
BEGIN
DECLARE v_min_value BIGINT;
DECLARE v_max_value BIGINT;
DECLARE v_partition_size BIGINT;
DECLARE v_start_value BIGINT;
DECLARE v_end_value BIGINT;
DECLARE v_i INT DEFAULT 0;
-- 获取分区范围
SET @sql = CONCAT('SELECT MIN(', p_partition_key, '), MAX(', p_partition_key, ') ',
'FROM ', p_table_name, ' INTO @min_val, @max_val');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET v_min_value = @min_val;
SET v_max_value = @max_val;
SET v_partition_size = CEIL((v_max_value - v_min_value + 1) / p_num_partitions);
-- 创建分区任务
WHILE v_i < p_num_partitions DO
SET v_start_value = v_min_value + (v_i * v_partition_size);
SET v_end_value = LEAST(v_start_value + v_partition_size - 1, v_max_value);
INSERT INTO parallel_migration_tasks (
migration_id, table_name, partition_key, start_value, end_value
) VALUES (
p_migration_id, p_table_name, p_partition_key,
v_start_value, v_end_value
);
SET v_i = v_i + 1;
END WHILE;
END //
DELIMITER ;
# 数据验证与回滚
# 1. 数据一致性验证
-- 数据验证存储过程
DELIMITER //
CREATE PROCEDURE ValidateMigration(
IN p_migration_id VARCHAR(100)
)
BEGIN
DECLARE v_source_count BIGINT;
DECLARE v_target_count BIGINT;
DECLARE v_checksum_match BOOLEAN DEFAULT TRUE;
DECLARE v_validation_result JSON;
-- 记录数量验证
SELECT COUNT(*) INTO v_source_count FROM old_users;
SELECT COUNT(*) INTO v_target_count FROM users;
-- 数据校验和验证
SET @source_checksum = (
SELECT SUM(CRC32(CONCAT(user_id, username, email)))
FROM old_users
);
SET @target_checksum = (
SELECT SUM(CRC32(CONCAT(user_id, username, email)))
FROM users u
INNER JOIN migration_user_mapping m ON u.user_id = m.new_user_id
);
SET v_checksum_match = (@source_checksum = @target_checksum);
-- 业务逻辑验证
SET @business_validation = (
SELECT COUNT(*) = 0
FROM users
WHERE email IS NULL OR email = ''
OR NOT ValidateEmail(email)
);
-- 构建验证结果
SET v_validation_result = JSON_OBJECT(
'record_count_match', (v_source_count = v_target_count),
'source_count', v_source_count,
'target_count', v_target_count,
'checksum_match', v_checksum_match,
'business_validation_passed', @business_validation,
'validation_time', NOW()
);
-- 记录验证结果
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (p_migration_id, 'INFO', 'Migration validation completed', v_validation_result);
-- 如果验证失败,标记迁移状态
IF v_source_count != v_target_count OR NOT v_checksum_match OR NOT @business_validation THEN
UPDATE migration_status
SET status = 'failed'
WHERE migration_id = p_migration_id;
INSERT INTO migration_log (migration_id, log_level, message)
VALUES (p_migration_id, 'ERROR', 'Migration validation failed');
END IF;
END //
DELIMITER ;
# 2. 回滚机制
-- 回滚存储过程
DELIMITER //
CREATE PROCEDURE RollbackMigration(
IN p_migration_id VARCHAR(100)
)
BEGIN
DECLARE v_table_name VARCHAR(100);
DECLARE v_backup_table VARCHAR(100);
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
ROLLBACK;
GET DIAGNOSTICS CONDITION 1
@error_message = MESSAGE_TEXT;
INSERT INTO migration_log (migration_id, log_level, message)
VALUES (p_migration_id, 'ERROR', CONCAT('Rollback failed: ', @error_message));
RESIGNAL;
END;
-- 获取迁移信息
SELECT table_name INTO v_table_name
FROM migration_status
WHERE migration_id = p_migration_id;
SET v_backup_table = CONCAT(v_table_name, '_backup_', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'));
START TRANSACTION;
-- 备份当前数据
SET @sql = CONCAT('CREATE TABLE ', v_backup_table, ' AS SELECT * FROM ', v_table_name);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 删除迁移的数据
CASE v_table_name
WHEN 'users' THEN
DELETE u FROM users u
INNER JOIN migration_user_mapping m ON u.user_id = m.new_user_id;
DELETE FROM migration_user_mapping
WHERE migration_id = p_migration_id;
WHEN 'orders' THEN
DELETE o FROM orders o
INNER JOIN migration_order_mapping m ON o.order_id = m.new_order_id;
DELETE FROM migration_order_mapping
WHERE migration_id = p_migration_id;
END CASE;
-- 更新迁移状态
UPDATE migration_status
SET status = 'rollback', end_time = NOW()
WHERE migration_id = p_migration_id;
COMMIT;
-- 记录回滚日志
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (p_migration_id, 'INFO', 'Migration rollback completed',
JSON_OBJECT('backup_table', v_backup_table, 'rollback_time', NOW()));
END //
DELIMITER ;
# 监控与告警
# 1. 迁移监控
-- 迁移进度监控视图
CREATE VIEW migration_progress AS
SELECT
ms.migration_id,
ms.table_name,
ms.migration_type,
ms.status,
ms.total_records,
ms.migrated_records,
ms.failed_records,
ROUND(ms.migrated_records * 100.0 / NULLIF(ms.total_records, 0), 2) as progress_percent,
TIMESTAMPDIFF(MINUTE, ms.start_time, COALESCE(ms.end_time, NOW())) as duration_minutes,
CASE
WHEN ms.status = 'running' AND ms.migrated_records > 0 THEN
ROUND((ms.total_records - ms.migrated_records) *
TIMESTAMPDIFF(MINUTE, ms.start_time, NOW()) / ms.migrated_records, 0)
ELSE NULL
END as estimated_remaining_minutes,
ms.start_time,
ms.end_time
FROM migration_status ms;
-- 迁移异常监控
CREATE VIEW migration_alerts AS
SELECT
mp.migration_id,
mp.table_name,
mp.status,
mp.progress_percent,
mp.duration_minutes,
CASE
WHEN mp.status = 'failed' THEN 'CRITICAL'
WHEN mp.status = 'running' AND mp.duration_minutes > 120 THEN 'WARNING'
WHEN mp.status = 'running' AND mp.progress_percent < 10 AND mp.duration_minutes > 30 THEN 'WARNING'
ELSE 'NORMAL'
END as alert_level,
CASE
WHEN mp.status = 'failed' THEN '迁移失败'
WHEN mp.status = 'running' AND mp.duration_minutes > 120 THEN '迁移时间过长'
WHEN mp.status = 'running' AND mp.progress_percent < 10 AND mp.duration_minutes > 30 THEN '迁移进度缓慢'
ELSE '正常'
END as alert_message
FROM migration_progress mp
WHERE mp.status IN ('running', 'failed')
OR (mp.status = 'running' AND (mp.duration_minutes > 120 OR
(mp.progress_percent < 10 AND mp.duration_minutes > 30)));
# 2. 自动化告警
-- 告警检查存储过程
DELIMITER //
CREATE PROCEDURE CheckMigrationAlerts()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_migration_id VARCHAR(100);
DECLARE v_alert_level VARCHAR(20);
DECLARE v_alert_message TEXT;
DECLARE alert_cursor CURSOR FOR
SELECT migration_id, alert_level, alert_message
FROM migration_alerts
WHERE alert_level IN ('CRITICAL', 'WARNING');
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN alert_cursor;
alert_loop: LOOP
FETCH alert_cursor INTO v_migration_id, v_alert_level, v_alert_message;
IF done THEN
LEAVE alert_loop;
END IF;
-- 发送告警(这里可以集成邮件、短信等通知方式)
INSERT INTO migration_log (migration_id, log_level, message, details)
VALUES (v_migration_id, 'WARN', v_alert_message,
JSON_OBJECT('alert_level', v_alert_level, 'check_time', NOW()));
-- 这里可以调用外部API发送通知
-- CALL SendAlert(v_migration_id, v_alert_level, v_alert_message);
END LOOP;
CLOSE alert_cursor;
END //
DELIMITER ;
# 最佳实践总结
# 1. 迁移前准备
- 充分的需求分析和方案设计
- 详细的数据映射和转换规则
- 完整的测试环境和测试数据
- 制定详细的回滚计划
# 2. 迁移执行
- 采用分批次、增量的迁移策略
- 实施严格的数据验证机制
- 建立完善的监控和告警系统
- 保持详细的操作日志
# 3. 迁移后维护
- 持续的数据质量监控
- 定期的一致性检查
- 及时的性能优化
- 完整的文档和知识传递
# 4. 风险控制
- 制定详细的风险评估
- 建立多层次的备份机制
- 实施渐进式的切换策略
- 准备应急响应预案
通过这些实战案例和最佳实践,可以帮助团队成功完成各种复杂的数据迁移项目,确保数据的完整性、一致性和业务的连续性。