数据库分片技术实现
# 数据库分片技术实现
# 概述
数据库分片(Database Sharding)是一种水平扩展数据库的技术,通过将大型数据库分割成多个较小的、更易管理的片段(分片),分布在不同的服务器上,以提高性能和可扩展性。
# 分片策略
# 1. 水平分片(Horizontal Sharding)
按行分割数据,将表中的不同行分布到不同的分片中。
// 基于用户ID的水平分片示例
public class UserShardingStrategy {
private static final int SHARD_COUNT = 4;
/**
* 根据用户ID计算分片
*/
public int getShardIndex(Long userId) {
return (int) (userId % SHARD_COUNT);
}
/**
* 获取分片数据源
*/
public DataSource getDataSource(Long userId) {
int shardIndex = getShardIndex(userId);
return DataSourceManager.getDataSource("shard_" + shardIndex);
}
}
# 2. 垂直分片(Vertical Sharding)
按列分割数据,将表中的不同列分布到不同的分片中。
// 垂直分片示例:用户基本信息和扩展信息分离
public class VerticalShardingExample {
// 用户基本信息表
@Entity
@Table(name = "user_basic")
public class UserBasic {
private Long id;
private String username;
private String email;
private Date createTime;
}
// 用户扩展信息表
@Entity
@Table(name = "user_profile")
public class UserProfile {
private Long userId;
private String avatar;
private String bio;
private String address;
}
}
# 3. 功能分片(Functional Sharding)
按功能模块分割数据库,不同的业务功能使用不同的数据库。
@Configuration
public class FunctionalShardingConfig {
@Bean
@Primary
public DataSource userDataSource() {
return DataSourceBuilder.create()
.url("jdbc:mysql://localhost:3306/user_db")
.build();
}
@Bean
public DataSource orderDataSource() {
return DataSourceBuilder.create()
.url("jdbc:mysql://localhost:3306/order_db")
.build();
}
@Bean
public DataSource productDataSource() {
return DataSourceBuilder.create()
.url("jdbc:mysql://localhost:3306/product_db")
.build();
}
}
# MySQL分片实现
# 1. 基于ShardingSphere的分片配置
# application-sharding.yml
spring:
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_0
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_1
username: root
password: password
ds2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_2
username: root
password: password
ds3:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_3
username: root
password: password
rules:
sharding:
tables:
user:
actual-data-nodes: ds$->{0..3}.user_$->{0..3}
table-strategy:
standard:
sharding-column: id
sharding-algorithm-name: user-table-inline
database-strategy:
standard:
sharding-column: id
sharding-algorithm-name: user-database-inline
sharding-algorithms:
user-database-inline:
type: INLINE
props:
algorithm-expression: ds$->{id % 4}
user-table-inline:
type: INLINE
props:
algorithm-expression: user_$->{id % 4}
# 2. 自定义分片算法
@Component
public class CustomShardingAlgorithm implements StandardShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames,
PreciseShardingValue<Long> shardingValue) {
Long value = shardingValue.getValue();
// 自定义分片逻辑
if (value < 1000000) {
return "ds0";
} else if (value < 2000000) {
return "ds1";
} else if (value < 3000000) {
return "ds2";
} else {
return "ds3";
}
}
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames,
RangeShardingValue<Long> shardingValue) {
// 范围查询分片逻辑
Set<String> result = new HashSet<>();
Range<Long> range = shardingValue.getValueRange();
for (String targetName : availableTargetNames) {
if (isInRange(targetName, range)) {
result.add(targetName);
}
}
return result;
}
private boolean isInRange(String targetName, Range<Long> range) {
// 判断目标分片是否在范围内
// 实现具体逻辑
return true;
}
}
# 3. 分片事务处理
@Service
@Transactional
public class ShardingTransactionService {
@Autowired
private UserRepository userRepository;
@Autowired
private OrderRepository orderRepository;
/**
* 跨分片事务处理
*/
@ShardingTransactionType(TransactionType.XA)
public void createUserAndOrder(User user, Order order) {
try {
// 保存用户信息(可能在不同分片)
userRepository.save(user);
// 保存订单信息(可能在不同分片)
order.setUserId(user.getId());
orderRepository.save(order);
} catch (Exception e) {
// 事务回滚
throw new RuntimeException("跨分片事务失败", e);
}
}
}
# PostgreSQL分片实现
# 1. 使用Postgres-XL集群
-- 创建分布式表
CREATE TABLE user_info (
id BIGSERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) DISTRIBUTE BY HASH(id);
-- 创建复制表(小表)
CREATE TABLE user_status (
status_id INT PRIMARY KEY,
status_name VARCHAR(20)
) DISTRIBUTE BY REPLICATION;
# 2. 使用Citus扩展
-- 启用Citus扩展
CREATE EXTENSION citus;
-- 创建分布式表
SELECT create_distributed_table('user_info', 'id');
SELECT create_distributed_table('user_orders', 'user_id');
-- 创建参考表
SELECT create_reference_table('categories');
# 3. Java代码实现
@Configuration
public class PostgreSQLShardingConfig {
@Bean
@ConfigurationProperties("spring.datasource.coordinator")
public DataSource coordinatorDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.worker1")
public DataSource worker1DataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.worker2")
public DataSource worker2DataSource() {
return DataSourceBuilder.create().build();
}
}
@Repository
public class PostgreSQLShardingRepository {
@Autowired
@Qualifier("coordinatorDataSource")
private DataSource coordinatorDataSource;
/**
* 分布式查询
*/
public List<User> findUsersByAgeRange(int minAge, int maxAge) {
String sql = """
SELECT u.*, p.address
FROM user_info u
JOIN user_profile p ON u.id = p.user_id
WHERE u.age BETWEEN ? AND ?
""";
try (Connection conn = coordinatorDataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, minAge);
stmt.setInt(2, maxAge);
ResultSet rs = stmt.executeQuery();
return mapResultSetToUsers(rs);
} catch (SQLException e) {
throw new RuntimeException("分布式查询失败", e);
}
}
}
# MongoDB分片实现
# 1. MongoDB分片集群配置
// 启动配置服务器
mongod --configsvr --replSet configReplSet --port 27019 --dbpath /data/configdb
// 启动分片服务器
mongod --shardsvr --replSet shard1ReplSet --port 27018 --dbpath /data/shard1
mongod --shardsvr --replSet shard2ReplSet --port 27020 --dbpath /data/shard2
// 启动mongos路由
mongos --configdb configReplSet/localhost:27019 --port 27017
# 2. 分片配置脚本
// 连接到mongos
use admin
// 添加分片
sh.addShard("shard1ReplSet/localhost:27018")
sh.addShard("shard2ReplSet/localhost:27020")
// 启用数据库分片
sh.enableSharding("myapp")
// 创建分片键
sh.shardCollection("myapp.users", {"_id": "hashed"})
sh.shardCollection("myapp.orders", {"userId": 1})
// 查看分片状态
sh.status()
# 3. Java MongoDB分片操作
@Configuration
public class MongoShardingConfig {
@Bean
public MongoClient mongoClient() {
// 连接到mongos路由器
return MongoClients.create("mongodb://localhost:27017");
}
@Bean
public MongoTemplate mongoTemplate() {
return new MongoTemplate(mongoClient(), "myapp");
}
}
@Service
public class MongoShardingService {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 插入分片数据
*/
public void insertUser(User user) {
// MongoDB自动根据分片键路由到正确的分片
mongoTemplate.save(user, "users");
}
/**
* 跨分片查询
*/
public List<User> findUsersByAge(int minAge, int maxAge) {
Query query = new Query(Criteria.where("age").gte(minAge).lte(maxAge));
return mongoTemplate.find(query, User.class, "users");
}
/**
* 聚合查询
*/
public List<AggregationResult> getUserStatistics() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("status").count().as("count"),
Aggregation.sort(Sort.Direction.DESC, "count")
);
AggregationResults<AggregationResult> results =
mongoTemplate.aggregate(aggregation, "users", AggregationResult.class);
return results.getMappedResults();
}
}
# 分片中间件对比
# 1. ShardingSphere
优点:
- 支持多种数据库
- 丰富的分片算法
- 透明化分片
- 强大的读写分离功能
缺点:
- 学习成本较高
- 配置复杂
@Configuration
@EnableShardingSphereDataSource
public class ShardingSphereConfig {
@Bean
public DataSource dataSource() throws SQLException {
Map<String, DataSource> dataSourceMap = new HashMap<>();
dataSourceMap.put("ds0", createDataSource("shard_0"));
dataSourceMap.put("ds1", createDataSource("shard_1"));
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
shardingRuleConfig.getTableRuleConfigs().add(getUserTableRuleConfiguration());
shardingRuleConfig.getBindingTableGroups().add("user");
shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(
new InlineShardingStrategyConfiguration("user_id", "ds$->{user_id % 2}"));
return ShardingDataSourceFactory.createDataSource(dataSourceMap,
new ShardingRuleConfiguration(), new Properties());
}
}
# 2. MyCat
优点:
- 基于MySQL协议
- 支持多种分片算法
- 配置相对简单
缺点:
- 主要支持MySQL
- 社区活跃度一般
<!-- schema.xml -->
<mycat:schema xmlns:mycat="http://io.mycat/">
<schema name="testdb" checkSQLschema="false" sqlMaxLimit="100">
<table name="user" primaryKey="id" dataNode="dn1,dn2"
rule="auto-sharding-long" />
</schema>
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataHost name="localhost1" maxCon="1000" minCon="10"
balance="0" writeType="0" dbType="mysql"
dbDriver="native" switchType="1">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="localhost:3306"
user="root" password="123456" />
</dataHost>
</mycat:schema>
# 分片最佳实践
# 1. 分片键选择原则
public class ShardingKeyBestPractices {
/**
* 好的分片键特征:
* 1. 数据分布均匀
* 2. 查询友好
* 3. 避免热点
* 4. 业务相关性强
*/
// 推荐:用户ID作为分片键
public void goodShardingKey() {
// 用户相关的所有操作都在同一分片
// 避免跨分片查询
}
// 不推荐:时间戳作为分片键
public void badShardingKey() {
// 会导致热点问题
// 新数据总是写入最新的分片
}
}
# 2. 跨分片查询优化
@Service
public class CrossShardQueryOptimization {
/**
* 避免跨分片JOIN
*/
public List<UserOrderDTO> getUserOrders(Long userId) {
// 方案1:应用层JOIN
User user = userService.getUser(userId);
List<Order> orders = orderService.getOrdersByUserId(userId);
return orders.stream()
.map(order -> new UserOrderDTO(user, order))
.collect(Collectors.toList());
}
/**
* 数据冗余避免跨分片查询
*/
@Entity
public class OrderWithUserInfo {
private Long orderId;
private Long userId;
private String username; // 冗余用户名
private String userEmail; // 冗余用户邮箱
// 其他订单字段
}
/**
* 使用缓存减少跨分片查询
*/
@Cacheable(value = "userCache", key = "#userId")
public User getUserFromCache(Long userId) {
return userRepository.findById(userId);
}
}
# 3. 分片扩容策略
@Component
public class ShardingExpansionStrategy {
/**
* 一致性哈希扩容
*/
public void consistentHashExpansion() {
// 1. 添加新的分片节点
// 2. 重新计算数据分布
// 3. 迁移部分数据到新节点
// 4. 更新路由规则
}
/**
* 双写策略扩容
*/
public void doubleWriteExpansion() {
// 1. 新增分片节点
// 2. 新数据同时写入新旧分片
// 3. 逐步迁移历史数据
// 4. 切换读取到新分片
// 5. 停止旧分片写入
}
}
# 监控与运维
# 1. 分片监控指标
@Component
public class ShardingMonitor {
@Autowired
private MeterRegistry meterRegistry;
/**
* 监控分片查询性能
*/
@EventListener
public void onShardingQuery(ShardingQueryEvent event) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 执行查询
event.execute();
} finally {
sample.stop(Timer.builder("sharding.query.duration")
.tag("shard", event.getShardName())
.tag("table", event.getTableName())
.register(meterRegistry));
}
}
/**
* 监控分片数据分布
*/
@Scheduled(fixedRate = 60000)
public void monitorDataDistribution() {
for (String shardName : getShardNames()) {
long recordCount = getRecordCount(shardName);
Gauge.builder("sharding.data.distribution")
.tag("shard", shardName)
.register(meterRegistry, recordCount);
}
}
}
# 2. 分片故障处理
@Service
public class ShardingFailoverService {
/**
* 分片故障检测
*/
@Scheduled(fixedRate = 30000)
public void healthCheck() {
for (String shardName : getShardNames()) {
try {
DataSource dataSource = getDataSource(shardName);
Connection conn = dataSource.getConnection();
// 执行健康检查查询
PreparedStatement stmt = conn.prepareStatement("SELECT 1");
stmt.executeQuery();
// 标记分片健康
markShardHealthy(shardName);
} catch (Exception e) {
// 标记分片故障
markShardUnhealthy(shardName);
// 发送告警
alertService.sendShardFailureAlert(shardName, e);
}
}
}
/**
* 故障转移
*/
public void failover(String failedShard) {
// 1. 将流量路由到其他健康分片
updateRoutingRules(failedShard, false);
// 2. 启动数据恢复流程
startDataRecovery(failedShard);
// 3. 通知运维人员
notificationService.notifyFailover(failedShard);
}
}
# 总结
数据库分片是解决大规模数据存储和高并发访问的重要技术手段。选择合适的分片策略和实现方案需要考虑:
- 业务特点:数据访问模式、查询类型、事务需求
- 技术栈:现有数据库类型、开发框架、运维能力
- 性能要求:并发量、响应时间、数据一致性
- 扩展性:未来数据增长、业务发展需求
通过合理的分片设计和实现,可以有效提升系统的性能和可扩展性,但同时也要注意分片带来的复杂性,做好监控和运维工作。