数据库分片技术实现

# 数据库分片技术实现

# 概述

数据库分片(Database Sharding)是一种水平扩展数据库的技术,通过将大型数据库分割成多个较小的、更易管理的片段(分片),分布在不同的服务器上,以提高性能和可扩展性。

# 分片策略

# 1. 水平分片(Horizontal Sharding)

按行分割数据,将表中的不同行分布到不同的分片中。

// 基于用户ID的水平分片示例
public class UserShardingStrategy {
    
    private static final int SHARD_COUNT = 4;
    
    /**
     * 根据用户ID计算分片
     */
    public int getShardIndex(Long userId) {
        return (int) (userId % SHARD_COUNT);
    }
    
    /**
     * 获取分片数据源
     */
    public DataSource getDataSource(Long userId) {
        int shardIndex = getShardIndex(userId);
        return DataSourceManager.getDataSource("shard_" + shardIndex);
    }
}

# 2. 垂直分片(Vertical Sharding)

按列分割数据,将表中的不同列分布到不同的分片中。

// 垂直分片示例:用户基本信息和扩展信息分离
public class VerticalShardingExample {
    
    // 用户基本信息表
    @Entity
    @Table(name = "user_basic")
    public class UserBasic {
        private Long id;
        private String username;
        private String email;
        private Date createTime;
    }
    
    // 用户扩展信息表
    @Entity
    @Table(name = "user_profile")
    public class UserProfile {
        private Long userId;
        private String avatar;
        private String bio;
        private String address;
    }
}

# 3. 功能分片(Functional Sharding)

按功能模块分割数据库,不同的业务功能使用不同的数据库。

@Configuration
public class FunctionalShardingConfig {
    
    @Bean
    @Primary
    public DataSource userDataSource() {
        return DataSourceBuilder.create()
            .url("jdbc:mysql://localhost:3306/user_db")
            .build();
    }
    
    @Bean
    public DataSource orderDataSource() {
        return DataSourceBuilder.create()
            .url("jdbc:mysql://localhost:3306/order_db")
            .build();
    }
    
    @Bean
    public DataSource productDataSource() {
        return DataSourceBuilder.create()
            .url("jdbc:mysql://localhost:3306/product_db")
            .build();
    }
}

# MySQL分片实现

# 1. 基于ShardingSphere的分片配置

# application-sharding.yml
spring:
  shardingsphere:
    datasource:
      names: ds0,ds1,ds2,ds3
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_0
        username: root
        password: password
      ds1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_1
        username: root
        password: password
      ds2:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_2
        username: root
        password: password
      ds3:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_3
        username: root
        password: password
    rules:
      sharding:
        tables:
          user:
            actual-data-nodes: ds$->{0..3}.user_$->{0..3}
            table-strategy:
              standard:
                sharding-column: id
                sharding-algorithm-name: user-table-inline
            database-strategy:
              standard:
                sharding-column: id
                sharding-algorithm-name: user-database-inline
        sharding-algorithms:
          user-database-inline:
            type: INLINE
            props:
              algorithm-expression: ds$->{id % 4}
          user-table-inline:
            type: INLINE
            props:
              algorithm-expression: user_$->{id % 4}

# 2. 自定义分片算法

@Component
public class CustomShardingAlgorithm implements StandardShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, 
                           PreciseShardingValue<Long> shardingValue) {
        Long value = shardingValue.getValue();
        
        // 自定义分片逻辑
        if (value < 1000000) {
            return "ds0";
        } else if (value < 2000000) {
            return "ds1";
        } else if (value < 3000000) {
            return "ds2";
        } else {
            return "ds3";
        }
    }
    
    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames,
                                        RangeShardingValue<Long> shardingValue) {
        // 范围查询分片逻辑
        Set<String> result = new HashSet<>();
        Range<Long> range = shardingValue.getValueRange();
        
        for (String targetName : availableTargetNames) {
            if (isInRange(targetName, range)) {
                result.add(targetName);
            }
        }
        
        return result;
    }
    
    private boolean isInRange(String targetName, Range<Long> range) {
        // 判断目标分片是否在范围内
        // 实现具体逻辑
        return true;
    }
}

# 3. 分片事务处理

@Service
@Transactional
public class ShardingTransactionService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private OrderRepository orderRepository;
    
    /**
     * 跨分片事务处理
     */
    @ShardingTransactionType(TransactionType.XA)
    public void createUserAndOrder(User user, Order order) {
        try {
            // 保存用户信息(可能在不同分片)
            userRepository.save(user);
            
            // 保存订单信息(可能在不同分片)
            order.setUserId(user.getId());
            orderRepository.save(order);
            
        } catch (Exception e) {
            // 事务回滚
            throw new RuntimeException("跨分片事务失败", e);
        }
    }
}

# PostgreSQL分片实现

# 1. 使用Postgres-XL集群

-- 创建分布式表
CREATE TABLE user_info (
    id BIGSERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) DISTRIBUTE BY HASH(id);

-- 创建复制表(小表)
CREATE TABLE user_status (
    status_id INT PRIMARY KEY,
    status_name VARCHAR(20)
) DISTRIBUTE BY REPLICATION;

# 2. 使用Citus扩展

-- 启用Citus扩展
CREATE EXTENSION citus;

-- 创建分布式表
SELECT create_distributed_table('user_info', 'id');
SELECT create_distributed_table('user_orders', 'user_id');

-- 创建参考表
SELECT create_reference_table('categories');

# 3. Java代码实现

@Configuration
public class PostgreSQLShardingConfig {
    
    @Bean
    @ConfigurationProperties("spring.datasource.coordinator")
    public DataSource coordinatorDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties("spring.datasource.worker1")
    public DataSource worker1DataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties("spring.datasource.worker2")
    public DataSource worker2DataSource() {
        return DataSourceBuilder.create().build();
    }
}

@Repository
public class PostgreSQLShardingRepository {
    
    @Autowired
    @Qualifier("coordinatorDataSource")
    private DataSource coordinatorDataSource;
    
    /**
     * 分布式查询
     */
    public List<User> findUsersByAgeRange(int minAge, int maxAge) {
        String sql = """
            SELECT u.*, p.address 
            FROM user_info u 
            JOIN user_profile p ON u.id = p.user_id 
            WHERE u.age BETWEEN ? AND ?
            """;
        
        try (Connection conn = coordinatorDataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            
            stmt.setInt(1, minAge);
            stmt.setInt(2, maxAge);
            
            ResultSet rs = stmt.executeQuery();
            return mapResultSetToUsers(rs);
        } catch (SQLException e) {
            throw new RuntimeException("分布式查询失败", e);
        }
    }
}

# MongoDB分片实现

# 1. MongoDB分片集群配置

// 启动配置服务器
mongod --configsvr --replSet configReplSet --port 27019 --dbpath /data/configdb

// 启动分片服务器
mongod --shardsvr --replSet shard1ReplSet --port 27018 --dbpath /data/shard1
mongod --shardsvr --replSet shard2ReplSet --port 27020 --dbpath /data/shard2

// 启动mongos路由
mongos --configdb configReplSet/localhost:27019 --port 27017

# 2. 分片配置脚本

// 连接到mongos
use admin

// 添加分片
sh.addShard("shard1ReplSet/localhost:27018")
sh.addShard("shard2ReplSet/localhost:27020")

// 启用数据库分片
sh.enableSharding("myapp")

// 创建分片键
sh.shardCollection("myapp.users", {"_id": "hashed"})
sh.shardCollection("myapp.orders", {"userId": 1})

// 查看分片状态
sh.status()

# 3. Java MongoDB分片操作

@Configuration
public class MongoShardingConfig {
    
    @Bean
    public MongoClient mongoClient() {
        // 连接到mongos路由器
        return MongoClients.create("mongodb://localhost:27017");
    }
    
    @Bean
    public MongoTemplate mongoTemplate() {
        return new MongoTemplate(mongoClient(), "myapp");
    }
}

@Service
public class MongoShardingService {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    /**
     * 插入分片数据
     */
    public void insertUser(User user) {
        // MongoDB自动根据分片键路由到正确的分片
        mongoTemplate.save(user, "users");
    }
    
    /**
     * 跨分片查询
     */
    public List<User> findUsersByAge(int minAge, int maxAge) {
        Query query = new Query(Criteria.where("age").gte(minAge).lte(maxAge));
        return mongoTemplate.find(query, User.class, "users");
    }
    
    /**
     * 聚合查询
     */
    public List<AggregationResult> getUserStatistics() {
        Aggregation aggregation = Aggregation.newAggregation(
            Aggregation.group("status").count().as("count"),
            Aggregation.sort(Sort.Direction.DESC, "count")
        );
        
        AggregationResults<AggregationResult> results = 
            mongoTemplate.aggregate(aggregation, "users", AggregationResult.class);
        
        return results.getMappedResults();
    }
}

# 分片中间件对比

# 1. ShardingSphere

优点:

  • 支持多种数据库
  • 丰富的分片算法
  • 透明化分片
  • 强大的读写分离功能

缺点:

  • 学习成本较高
  • 配置复杂
@Configuration
@EnableShardingSphereDataSource
public class ShardingSphereConfig {
    
    @Bean
    public DataSource dataSource() throws SQLException {
        Map<String, DataSource> dataSourceMap = new HashMap<>();
        dataSourceMap.put("ds0", createDataSource("shard_0"));
        dataSourceMap.put("ds1", createDataSource("shard_1"));
        
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        shardingRuleConfig.getTableRuleConfigs().add(getUserTableRuleConfiguration());
        shardingRuleConfig.getBindingTableGroups().add("user");
        shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(
            new InlineShardingStrategyConfiguration("user_id", "ds$->{user_id % 2}"));
        
        return ShardingDataSourceFactory.createDataSource(dataSourceMap, 
            new ShardingRuleConfiguration(), new Properties());
    }
}

# 2. MyCat

优点:

  • 基于MySQL协议
  • 支持多种分片算法
  • 配置相对简单

缺点:

  • 主要支持MySQL
  • 社区活跃度一般
<!-- schema.xml -->
<mycat:schema xmlns:mycat="http://io.mycat/">
    <schema name="testdb" checkSQLschema="false" sqlMaxLimit="100">
        <table name="user" primaryKey="id" dataNode="dn1,dn2" 
               rule="auto-sharding-long" />
    </schema>
    
    <dataNode name="dn1" dataHost="localhost1" database="db1" />
    <dataNode name="dn2" dataHost="localhost1" database="db2" />
    
    <dataHost name="localhost1" maxCon="1000" minCon="10" 
              balance="0" writeType="0" dbType="mysql" 
              dbDriver="native" switchType="1">
        <heartbeat>select user()</heartbeat>
        <writeHost host="hostM1" url="localhost:3306" 
                   user="root" password="123456" />
    </dataHost>
</mycat:schema>

# 分片最佳实践

# 1. 分片键选择原则

public class ShardingKeyBestPractices {
    
    /**
     * 好的分片键特征:
     * 1. 数据分布均匀
     * 2. 查询友好
     * 3. 避免热点
     * 4. 业务相关性强
     */
    
    // 推荐:用户ID作为分片键
    public void goodShardingKey() {
        // 用户相关的所有操作都在同一分片
        // 避免跨分片查询
    }
    
    // 不推荐:时间戳作为分片键
    public void badShardingKey() {
        // 会导致热点问题
        // 新数据总是写入最新的分片
    }
}

# 2. 跨分片查询优化

@Service
public class CrossShardQueryOptimization {
    
    /**
     * 避免跨分片JOIN
     */
    public List<UserOrderDTO> getUserOrders(Long userId) {
        // 方案1:应用层JOIN
        User user = userService.getUser(userId);
        List<Order> orders = orderService.getOrdersByUserId(userId);
        
        return orders.stream()
            .map(order -> new UserOrderDTO(user, order))
            .collect(Collectors.toList());
    }
    
    /**
     * 数据冗余避免跨分片查询
     */
    @Entity
    public class OrderWithUserInfo {
        private Long orderId;
        private Long userId;
        private String username; // 冗余用户名
        private String userEmail; // 冗余用户邮箱
        // 其他订单字段
    }
    
    /**
     * 使用缓存减少跨分片查询
     */
    @Cacheable(value = "userCache", key = "#userId")
    public User getUserFromCache(Long userId) {
        return userRepository.findById(userId);
    }
}

# 3. 分片扩容策略

@Component
public class ShardingExpansionStrategy {
    
    /**
     * 一致性哈希扩容
     */
    public void consistentHashExpansion() {
        // 1. 添加新的分片节点
        // 2. 重新计算数据分布
        // 3. 迁移部分数据到新节点
        // 4. 更新路由规则
    }
    
    /**
     * 双写策略扩容
     */
    public void doubleWriteExpansion() {
        // 1. 新增分片节点
        // 2. 新数据同时写入新旧分片
        // 3. 逐步迁移历史数据
        // 4. 切换读取到新分片
        // 5. 停止旧分片写入
    }
}

# 监控与运维

# 1. 分片监控指标

@Component
public class ShardingMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 监控分片查询性能
     */
    @EventListener
    public void onShardingQuery(ShardingQueryEvent event) {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 执行查询
            event.execute();
        } finally {
            sample.stop(Timer.builder("sharding.query.duration")
                .tag("shard", event.getShardName())
                .tag("table", event.getTableName())
                .register(meterRegistry));
        }
    }
    
    /**
     * 监控分片数据分布
     */
    @Scheduled(fixedRate = 60000)
    public void monitorDataDistribution() {
        for (String shardName : getShardNames()) {
            long recordCount = getRecordCount(shardName);
            
            Gauge.builder("sharding.data.distribution")
                .tag("shard", shardName)
                .register(meterRegistry, recordCount);
        }
    }
}

# 2. 分片故障处理

@Service
public class ShardingFailoverService {
    
    /**
     * 分片故障检测
     */
    @Scheduled(fixedRate = 30000)
    public void healthCheck() {
        for (String shardName : getShardNames()) {
            try {
                DataSource dataSource = getDataSource(shardName);
                Connection conn = dataSource.getConnection();
                
                // 执行健康检查查询
                PreparedStatement stmt = conn.prepareStatement("SELECT 1");
                stmt.executeQuery();
                
                // 标记分片健康
                markShardHealthy(shardName);
                
            } catch (Exception e) {
                // 标记分片故障
                markShardUnhealthy(shardName);
                
                // 发送告警
                alertService.sendShardFailureAlert(shardName, e);
            }
        }
    }
    
    /**
     * 故障转移
     */
    public void failover(String failedShard) {
        // 1. 将流量路由到其他健康分片
        updateRoutingRules(failedShard, false);
        
        // 2. 启动数据恢复流程
        startDataRecovery(failedShard);
        
        // 3. 通知运维人员
        notificationService.notifyFailover(failedShard);
    }
}

# 总结

数据库分片是解决大规模数据存储和高并发访问的重要技术手段。选择合适的分片策略和实现方案需要考虑:

  1. 业务特点:数据访问模式、查询类型、事务需求
  2. 技术栈:现有数据库类型、开发框架、运维能力
  3. 性能要求:并发量、响应时间、数据一致性
  4. 扩展性:未来数据增长、业务发展需求

通过合理的分片设计和实现,可以有效提升系统的性能和可扩展性,但同时也要注意分片带来的复杂性,做好监控和运维工作。