SQL Server分片技术实现
# SQL Server分片技术实现
# 概述
SQL Server分片技术主要通过分区表、Always On可用性组、弹性数据库工具等方式实现水平扩展。SQL Server提供了多种分片策略,包括表分区、数据库分片和弹性数据库池等解决方案。
# SQL Server分片架构
# 1. 分区表实现
-- 创建分区函数
CREATE PARTITION FUNCTION OrderDatePartitionFunction (datetime2)
AS RANGE RIGHT FOR VALUES
('2023-01-01', '2023-04-01', '2023-07-01', '2023-10-01', '2024-01-01');
-- 创建分区方案
CREATE PARTITION SCHEME OrderDatePartitionScheme
AS PARTITION OrderDatePartitionFunction
TO (FileGroup1, FileGroup2, FileGroup3, FileGroup4, FileGroup5, FileGroup6);
-- 创建分区表
CREATE TABLE Orders (
OrderId BIGINT IDENTITY(1,1) PRIMARY KEY,
CustomerId BIGINT NOT NULL,
OrderDate DATETIME2 NOT NULL,
Amount DECIMAL(10,2),
Status NVARCHAR(20),
Region NVARCHAR(50)
) ON OrderDatePartitionScheme(OrderDate);
-- 创建分区索引
CREATE INDEX IX_Orders_CustomerId
ON Orders(CustomerId)
ON OrderDatePartitionScheme(OrderDate);
CREATE INDEX IX_Orders_Region
ON Orders(Region, OrderDate)
ON OrderDatePartitionScheme(OrderDate);
# 2. 弹性数据库配置
-- 创建分片映射管理器数据库
CREATE DATABASE ShardMapManager;
-- 创建分片数据库
CREATE DATABASE Shard1;
CREATE DATABASE Shard2;
CREATE DATABASE Shard3;
-- 在每个分片中创建表结构
USE Shard1;
CREATE TABLE Orders (
OrderId BIGINT IDENTITY(1,1) PRIMARY KEY,
CustomerId BIGINT NOT NULL,
OrderDate DATETIME2 NOT NULL,
Amount DECIMAL(10,2),
Status NVARCHAR(20),
Region NVARCHAR(50),
ShardKey BIGINT NOT NULL
);
CREATE INDEX IX_Orders_ShardKey ON Orders(ShardKey);
CREATE INDEX IX_Orders_CustomerId ON Orders(CustomerId);
# 3. Docker Compose部署
# docker-compose.yml
version: '3.8'
services:
# SQL Server主实例
sqlserver-master:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
SA_PASSWORD: "YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
MSSQL_PID: "Developer"
ports:
- "1433:1433"
volumes:
- sqlserver_master_data:/var/opt/mssql
- ./scripts:/scripts
hostname: sqlserver-master
networks:
- sqlserver-network
# 分片1
sqlserver-shard1:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
SA_PASSWORD: "YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
MSSQL_PID: "Developer"
ports:
- "1434:1433"
volumes:
- sqlserver_shard1_data:/var/opt/mssql
hostname: sqlserver-shard1
networks:
- sqlserver-network
# 分片2
sqlserver-shard2:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
SA_PASSWORD: "YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
MSSQL_PID: "Developer"
ports:
- "1435:1433"
volumes:
- sqlserver_shard2_data:/var/opt/mssql
hostname: sqlserver-shard2
networks:
- sqlserver-network
# 分片3
sqlserver-shard3:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
SA_PASSWORD: "YourStrong@Passw0rd"
ACCEPT_EULA: "Y"
MSSQL_PID: "Developer"
ports:
- "1436:1433"
volumes:
- sqlserver_shard3_data:/var/opt/mssql
hostname: sqlserver-shard3
networks:
- sqlserver-network
volumes:
sqlserver_master_data:
sqlserver_shard1_data:
sqlserver_shard2_data:
sqlserver_shard3_data:
networks:
sqlserver-network:
driver: bridge
# Java应用集成
# 1. Maven依赖
<dependencies>
<!-- SQL Server JDBC驱动 -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>12.4.2.jre11</version>
</dependency>
<!-- 弹性数据库客户端 -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>elastic-db-tools</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Spring Boot Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- HikariCP连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
# 2. Spring Boot配置
# application.yml
spring:
datasource:
# 主数据源配置
master:
url: jdbc:sqlserver://localhost:1433;databaseName=ShardMapManager;encrypt=false
username: sa
password: YourStrong@Passw0rd
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 分片数据源配置
shards:
shard1:
url: jdbc:sqlserver://localhost:1434;databaseName=Shard1;encrypt=false
username: sa
password: YourStrong@Passw0rd
shard2:
url: jdbc:sqlserver://localhost:1435;databaseName=Shard2;encrypt=false
username: sa
password: YourStrong@Passw0rd
shard3:
url: jdbc:sqlserver://localhost:1436;databaseName=Shard3;encrypt=false
username: sa
password: YourStrong@Passw0rd
# 连接池配置
hikari:
maximum-pool-size: 20
minimum-idle: 5
idle-timeout: 300000
connection-timeout: 30000
max-lifetime: 1800000
jpa:
database-platform: org.hibernate.dialect.SQLServer2012Dialect
hibernate:
ddl-auto: validate
show-sql: true
properties:
hibernate:
format_sql: true
use_sql_comments: true
# SQL Server分片配置
sqlserver:
sharding:
shard-map-manager-server: localhost:1433
shard-map-manager-database: ShardMapManager
shard-map-name: OrderShardMap
connection-string-template: "Server={0};Database={1};User Id=sa;Password=YourStrong@Passw0rd;Encrypt=false;"
# 3. 分片数据源配置
@Configuration
@EnableJpaRepositories(basePackages = "com.example.repository")
public class SqlServerShardingConfig {
@Value("${sqlserver.sharding.shard-map-manager-server}")
private String shardMapManagerServer;
@Value("${sqlserver.sharding.shard-map-manager-database}")
private String shardMapManagerDatabase;
@Value("${sqlserver.sharding.shard-map-name}")
private String shardMapName;
@Bean
@Primary
public DataSource masterDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:sqlserver://localhost:1433;databaseName=ShardMapManager;encrypt=false");
config.setUsername("sa");
config.setPassword("YourStrong@Passw0rd");
config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(300000);
config.setMaxLifetime(1800000);
return new HikariDataSource(config);
}
@Bean
public Map<String, DataSource> shardDataSources() {
Map<String, DataSource> shards = new HashMap<>();
// 配置分片1
HikariConfig shard1Config = new HikariConfig();
shard1Config.setJdbcUrl("jdbc:sqlserver://localhost:1434;databaseName=Shard1;encrypt=false");
shard1Config.setUsername("sa");
shard1Config.setPassword("YourStrong@Passw0rd");
shard1Config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
shard1Config.setMaximumPoolSize(20);
shards.put("shard1", new HikariDataSource(shard1Config));
// 配置分片2
HikariConfig shard2Config = new HikariConfig();
shard2Config.setJdbcUrl("jdbc:sqlserver://localhost:1435;databaseName=Shard2;encrypt=false");
shard2Config.setUsername("sa");
shard2Config.setPassword("YourStrong@Passw0rd");
shard2Config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
shard2Config.setMaximumPoolSize(20);
shards.put("shard2", new HikariDataSource(shard2Config));
// 配置分片3
HikariConfig shard3Config = new HikariConfig();
shard3Config.setJdbcUrl("jdbc:sqlserver://localhost:1436;databaseName=Shard3;encrypt=false");
shard3Config.setUsername("sa");
shard3Config.setPassword("YourStrong@Passw0rd");
shard3Config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
shard3Config.setMaximumPoolSize(20);
shards.put("shard3", new HikariDataSource(shard3Config));
return shards;
}
@Bean
public JdbcTemplate masterJdbcTemplate(@Qualifier("masterDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public EntityManagerFactory entityManagerFactory(@Qualifier("masterDataSource") DataSource dataSource) {
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setDataSource(dataSource);
factory.setPackagesToScan("com.example.entity");
factory.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
Properties jpaProperties = new Properties();
jpaProperties.setProperty("hibernate.dialect", "org.hibernate.dialect.SQLServer2012Dialect");
jpaProperties.setProperty("hibernate.hbm2ddl.auto", "validate");
jpaProperties.setProperty("hibernate.show_sql", "true");
factory.setJpaProperties(jpaProperties);
factory.afterPropertiesSet();
return factory.getObject();
}
}
# 4. 分片实体类
@Entity
@Table(name = "Orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "OrderId")
private Long orderId;
@Column(name = "CustomerId")
private Long customerId;
@Column(name = "OrderDate")
private LocalDateTime orderDate;
@Column(name = "Amount")
private BigDecimal amount;
@Column(name = "Status")
private String status;
@Column(name = "Region")
private String region;
@Column(name = "ShardKey")
private Long shardKey;
// 构造函数
public Order() {}
public Order(Long customerId, LocalDateTime orderDate, BigDecimal amount, String status, String region) {
this.customerId = customerId;
this.orderDate = orderDate;
this.amount = amount;
this.status = status;
this.region = region;
this.shardKey = customerId; // 使用客户ID作为分片键
}
// Getter和Setter方法
public Long getOrderId() { return orderId; }
public void setOrderId(Long orderId) { this.orderId = orderId; }
public Long getCustomerId() { return customerId; }
public void setCustomerId(Long customerId) {
this.customerId = customerId;
this.shardKey = customerId; // 自动设置分片键
}
public LocalDateTime getOrderDate() { return orderDate; }
public void setOrderDate(LocalDateTime orderDate) { this.orderDate = orderDate; }
public BigDecimal getAmount() { return amount; }
public void setAmount(BigDecimal amount) { this.amount = amount; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public String getRegion() { return region; }
public void setRegion(String region) { this.region = region; }
public Long getShardKey() { return shardKey; }
public void setShardKey(Long shardKey) { this.shardKey = shardKey; }
}
# 5. 分片路由服务
@Service
public class SqlServerShardingService {
@Autowired
private Map<String, DataSource> shardDataSources;
@Autowired
@Qualifier("masterJdbcTemplate")
private JdbcTemplate masterJdbcTemplate;
private final Map<String, JdbcTemplate> shardJdbcTemplates = new HashMap<>();
@PostConstruct
public void initShardTemplates() {
shardDataSources.forEach((shardName, dataSource) -> {
shardJdbcTemplates.put(shardName, new JdbcTemplate(dataSource));
});
}
/**
* 根据分片键确定分片
*/
public String determineShardKey(Long shardKey) {
if (shardKey == null) {
throw new IllegalArgumentException("Shard key cannot be null");
}
// 简单的哈希分片策略
int shardIndex = (int) (shardKey % shardDataSources.size()) + 1;
return "shard" + shardIndex;
}
/**
* 获取分片的JdbcTemplate
*/
public JdbcTemplate getShardJdbcTemplate(Long shardKey) {
String shardName = determineShardKey(shardKey);
return shardJdbcTemplates.get(shardName);
}
/**
* 创建订单
*/
@Transactional
public Order createOrder(Order order) {
JdbcTemplate shardTemplate = getShardJdbcTemplate(order.getShardKey());
String sql = """
INSERT INTO Orders (CustomerId, OrderDate, Amount, Status, Region, ShardKey)
OUTPUT INSERTED.OrderId
VALUES (?, ?, ?, ?, ?, ?)
""";
Long orderId = shardTemplate.queryForObject(sql, Long.class,
order.getCustomerId(),
order.getOrderDate(),
order.getAmount(),
order.getStatus(),
order.getRegion(),
order.getShardKey());
order.setOrderId(orderId);
return order;
}
/**
* 根据客户ID查询订单
*/
public List<Order> findOrdersByCustomerId(Long customerId) {
JdbcTemplate shardTemplate = getShardJdbcTemplate(customerId);
String sql = """
SELECT OrderId, CustomerId, OrderDate, Amount, Status, Region, ShardKey
FROM Orders
WHERE CustomerId = ?
ORDER BY OrderDate DESC
""";
return shardTemplate.query(sql, this::mapRowToOrder, customerId);
}
/**
* 跨分片查询
*/
public List<Order> findOrdersByDateRange(LocalDateTime startDate, LocalDateTime endDate) {
List<Order> allOrders = new ArrayList<>();
String sql = """
SELECT OrderId, CustomerId, OrderDate, Amount, Status, Region, ShardKey
FROM Orders
WHERE OrderDate BETWEEN ? AND ?
ORDER BY OrderDate DESC
""";
// 并行查询所有分片
List<CompletableFuture<List<Order>>> futures = shardJdbcTemplates.values().stream()
.map(template -> CompletableFuture.supplyAsync(() ->
template.query(sql, this::mapRowToOrder, startDate, endDate)))
.collect(Collectors.toList());
// 合并结果
futures.forEach(future -> {
try {
allOrders.addAll(future.get());
} catch (Exception e) {
throw new RuntimeException("Failed to query shard", e);
}
});
// 按日期排序
return allOrders.stream()
.sorted((o1, o2) -> o2.getOrderDate().compareTo(o1.getOrderDate()))
.collect(Collectors.toList());
}
/**
* 跨分片聚合查询
*/
public Map<String, Object> getOrderStatistics() {
String sql = """
SELECT
COUNT(*) as total_orders,
SUM(Amount) as total_amount,
AVG(Amount) as avg_amount,
Region,
COUNT(*) as region_count
FROM Orders
GROUP BY Region
""";
List<Map<String, Object>> allRegionStats = new ArrayList<>();
// 查询所有分片
shardJdbcTemplates.values().parallelStream().forEach(template -> {
List<Map<String, Object>> shardStats = template.queryForList(sql);
synchronized (allRegionStats) {
allRegionStats.addAll(shardStats);
}
});
// 聚合结果
Map<String, Map<String, Object>> regionAggregates = new HashMap<>();
for (Map<String, Object> stat : allRegionStats) {
String region = (String) stat.get("Region");
regionAggregates.merge(region, stat, (existing, current) -> {
Map<String, Object> merged = new HashMap<>(existing);
merged.put("total_orders",
((Number) existing.get("total_orders")).longValue() +
((Number) current.get("total_orders")).longValue());
merged.put("total_amount",
((BigDecimal) existing.get("total_amount")).add(
(BigDecimal) current.get("total_amount")));
return merged;
});
}
// 计算总体统计
long totalOrders = regionAggregates.values().stream()
.mapToLong(stat -> ((Number) stat.get("total_orders")).longValue())
.sum();
BigDecimal totalAmount = regionAggregates.values().stream()
.map(stat -> (BigDecimal) stat.get("total_amount"))
.reduce(BigDecimal.ZERO, BigDecimal::add);
Map<String, Object> result = new HashMap<>();
result.put("total_orders", totalOrders);
result.put("total_amount", totalAmount);
result.put("avg_amount", totalOrders > 0 ? totalAmount.divide(BigDecimal.valueOf(totalOrders), 2, RoundingMode.HALF_UP) : BigDecimal.ZERO);
result.put("region_statistics", regionAggregates.values());
return result;
}
/**
* 批量插入优化
*/
@Transactional
public void batchInsertOrders(List<Order> orders) {
// 按分片分组
Map<String, List<Order>> ordersByShards = orders.stream()
.collect(Collectors.groupingBy(order -> determineShardKey(order.getShardKey())));
String sql = """
INSERT INTO Orders (CustomerId, OrderDate, Amount, Status, Region, ShardKey)
VALUES (?, ?, ?, ?, ?, ?)
""";
// 并行批量插入
ordersByShards.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
List<Order> shardOrders = entry.getValue();
JdbcTemplate template = shardJdbcTemplates.get(shardName);
List<Object[]> batchArgs = shardOrders.stream()
.map(order -> new Object[]{
order.getCustomerId(),
order.getOrderDate(),
order.getAmount(),
order.getStatus(),
order.getRegion(),
order.getShardKey()
})
.collect(Collectors.toList());
template.batchUpdate(sql, batchArgs);
});
}
/**
* 行映射器
*/
private Order mapRowToOrder(ResultSet rs, int rowNum) throws SQLException {
Order order = new Order();
order.setOrderId(rs.getLong("OrderId"));
order.setCustomerId(rs.getLong("CustomerId"));
order.setOrderDate(rs.getTimestamp("OrderDate").toLocalDateTime());
order.setAmount(rs.getBigDecimal("Amount"));
order.setStatus(rs.getString("Status"));
order.setRegion(rs.getString("Region"));
order.setShardKey(rs.getLong("ShardKey"));
return order;
}
}
# 6. 分片管理服务
@Service
public class SqlServerShardManagementService {
@Autowired
@Qualifier("masterJdbcTemplate")
private JdbcTemplate masterJdbcTemplate;
@Autowired
private Map<String, DataSource> shardDataSources;
/**
* 初始化分片映射
*/
public void initializeShardMap() {
// 创建分片映射表
String createShardMapSql = """
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'ShardMap')
CREATE TABLE ShardMap (
ShardId INT PRIMARY KEY,
ShardName NVARCHAR(50) NOT NULL,
ConnectionString NVARCHAR(500) NOT NULL,
MinShardKey BIGINT NOT NULL,
MaxShardKey BIGINT NOT NULL,
Status NVARCHAR(20) DEFAULT 'Active',
CreatedDate DATETIME2 DEFAULT GETDATE()
)
""";
masterJdbcTemplate.execute(createShardMapSql);
// 插入分片映射信息
String insertShardSql = """
MERGE ShardMap AS target
USING (VALUES
(1, 'shard1', 'Server=localhost,1434;Database=Shard1;User Id=sa;Password=YourStrong@Passw0rd;', 0, 333333333),
(2, 'shard2', 'Server=localhost,1435;Database=Shard2;User Id=sa;Password=YourStrong@Passw0rd;', 333333334, 666666666),
(3, 'shard3', 'Server=localhost,1436;Database=Shard3;User Id=sa;Password=YourStrong@Passw0rd;', 666666667, 999999999)
) AS source (ShardId, ShardName, ConnectionString, MinShardKey, MaxShardKey)
ON target.ShardId = source.ShardId
WHEN NOT MATCHED THEN
INSERT (ShardId, ShardName, ConnectionString, MinShardKey, MaxShardKey)
VALUES (source.ShardId, source.ShardName, source.ConnectionString, source.MinShardKey, source.MaxShardKey);
""";
masterJdbcTemplate.execute(insertShardSql);
}
/**
* 获取分片信息
*/
public List<Map<String, Object>> getShardInfo() {
String sql = """
SELECT
ShardId,
ShardName,
ConnectionString,
MinShardKey,
MaxShardKey,
Status,
CreatedDate
FROM ShardMap
ORDER BY ShardId
""";
return masterJdbcTemplate.queryForList(sql);
}
/**
* 获取分片统计信息
*/
public Map<String, Object> getShardStatistics() {
List<Map<String, Object>> shardStats = new ArrayList<>();
shardDataSources.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
DataSource dataSource = entry.getValue();
JdbcTemplate template = new JdbcTemplate(dataSource);
try {
String sql = """
SELECT
'" + shardName + "' as shard_name,
COUNT(*) as record_count,
SUM(CAST(Amount AS BIGINT)) as total_amount,
MIN(OrderDate) as min_date,
MAX(OrderDate) as max_date
FROM Orders
""";
Map<String, Object> stat = template.queryForMap(sql);
synchronized (shardStats) {
shardStats.add(stat);
}
} catch (Exception e) {
Map<String, Object> errorStat = new HashMap<>();
errorStat.put("shard_name", shardName);
errorStat.put("error", e.getMessage());
synchronized (shardStats) {
shardStats.add(errorStat);
}
}
});
Map<String, Object> result = new HashMap<>();
result.put("shard_statistics", shardStats);
result.put("timestamp", LocalDateTime.now());
return result;
}
/**
* 检查分片健康状态
*/
public List<Map<String, Object>> checkShardHealth() {
List<Map<String, Object>> healthStatus = new ArrayList<>();
shardDataSources.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
DataSource dataSource = entry.getValue();
Map<String, Object> status = new HashMap<>();
status.put("shard_name", shardName);
try {
JdbcTemplate template = new JdbcTemplate(dataSource);
template.queryForObject("SELECT 1", Integer.class);
status.put("status", "HEALTHY");
status.put("response_time", System.currentTimeMillis());
} catch (Exception e) {
status.put("status", "UNHEALTHY");
status.put("error", e.getMessage());
}
synchronized (healthStatus) {
healthStatus.add(status);
}
});
return healthStatus;
}
/**
* 重新平衡分片数据
*/
public void rebalanceShards() {
// 获取所有分片的数据分布
Map<String, Long> shardCounts = new HashMap<>();
shardDataSources.forEach((shardName, dataSource) -> {
JdbcTemplate template = new JdbcTemplate(dataSource);
Long count = template.queryForObject("SELECT COUNT(*) FROM Orders", Long.class);
shardCounts.put(shardName, count);
});
// 计算平均值
long totalRecords = shardCounts.values().stream().mapToLong(Long::longValue).sum();
long avgRecords = totalRecords / shardCounts.size();
// 识别需要重新平衡的分片
shardCounts.forEach((shardName, count) -> {
double deviation = Math.abs(count - avgRecords) / (double) avgRecords;
if (deviation > 0.2) { // 偏差超过20%
System.out.println(String.format("Shard %s needs rebalancing. Count: %d, Average: %d, Deviation: %.2f%%",
shardName, count, avgRecords, deviation * 100));
}
});
}
}
# 性能优化策略
# 1. 分区表优化
-- 分区表维护
-- 添加新分区
ALTER PARTITION SCHEME OrderDatePartitionScheme
NEXT USED FileGroup7;
ALTER PARTITION FUNCTION OrderDatePartitionFunction()
SPLIT RANGE ('2024-04-01');
-- 删除旧分区
ALTER PARTITION FUNCTION OrderDatePartitionFunction()
MERGE RANGE ('2023-01-01');
-- 分区切换(快速数据移动)
CREATE TABLE Orders_Archive (
OrderId BIGINT,
CustomerId BIGINT,
OrderDate DATETIME2,
Amount DECIMAL(10,2),
Status NVARCHAR(20),
Region NVARCHAR(50)
) ON FileGroup1;
-- 切换分区到归档表
ALTER TABLE Orders
SWITCH PARTITION 1 TO Orders_Archive;
# 2. 索引优化
-- 创建分区对齐索引
CREATE INDEX IX_Orders_CustomerId_Partitioned
ON Orders(CustomerId, OrderDate)
ON OrderDatePartitionScheme(OrderDate);
-- 创建覆盖索引
CREATE INDEX IX_Orders_Status_Covering
ON Orders(Status, Region)
INCLUDE (OrderId, CustomerId, Amount)
ON OrderDatePartitionScheme(OrderDate);
-- 创建列存储索引(分析查询优化)
CREATE COLUMNSTORE INDEX CCI_Orders
ON Orders (OrderId, CustomerId, OrderDate, Amount, Status, Region)
ON OrderDatePartitionScheme(OrderDate);
# 3. 查询优化
@Service
public class SqlServerQueryOptimizationService {
@Autowired
private SqlServerShardingService shardingService;
/**
* 分区消除查询
*/
public List<Order> findOrdersByDateRangeOptimized(LocalDateTime startDate, LocalDateTime endDate) {
// 使用分区消除的查询
String sql = """
SELECT OrderId, CustomerId, OrderDate, Amount, Status, Region, ShardKey
FROM Orders
WHERE OrderDate >= ? AND OrderDate < ?
ORDER BY OrderDate DESC
""";
List<Order> results = new ArrayList<>();
// 并行查询相关分片
shardingService.getShardJdbcTemplates().values().parallelStream().forEach(template -> {
List<Order> shardResults = template.query(sql, shardingService::mapRowToOrder, startDate, endDate);
synchronized (results) {
results.addAll(shardResults);
}
});
return results.stream()
.sorted((o1, o2) -> o2.getOrderDate().compareTo(o1.getOrderDate()))
.collect(Collectors.toList());
}
/**
* 使用提示的查询优化
*/
public List<Order> findOrdersWithHints(Long customerId) {
JdbcTemplate template = shardingService.getShardJdbcTemplate(customerId);
String sql = """
SELECT /*+ INDEX(Orders, IX_Orders_CustomerId) */
OrderId, CustomerId, OrderDate, Amount, Status, Region, ShardKey
FROM Orders WITH (NOLOCK)
WHERE CustomerId = ?
ORDER BY OrderDate DESC
""";
return template.query(sql, shardingService::mapRowToOrder, customerId);
}
/**
* 批量查询优化
*/
public Map<Long, List<Order>> findOrdersByCustomerIds(List<Long> customerIds) {
// 按分片分组客户ID
Map<String, List<Long>> customerIdsByShards = customerIds.stream()
.collect(Collectors.groupingBy(shardingService::determineShardKey));
Map<Long, List<Order>> results = new ConcurrentHashMap<>();
// 并行查询
customerIdsByShards.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
List<Long> shardCustomerIds = entry.getValue();
JdbcTemplate template = shardingService.getShardJdbcTemplates().get(shardName);
String sql = """
SELECT OrderId, CustomerId, OrderDate, Amount, Status, Region, ShardKey
FROM Orders
WHERE CustomerId IN (" +
shardCustomerIds.stream().map(id -> "?").collect(Collectors.joining(",")) +
") ORDER BY CustomerId, OrderDate DESC
""";
List<Order> orders = template.query(sql, shardingService::mapRowToOrder, shardCustomerIds.toArray());
// 按客户ID分组
Map<Long, List<Order>> customerOrders = orders.stream()
.collect(Collectors.groupingBy(Order::getCustomerId));
results.putAll(customerOrders);
});
return results;
}
}
# 监控和运维
# 1. 性能监控
@Component
public class SqlServerShardMonitor {
@Autowired
private Map<String, DataSource> shardDataSources;
/**
* 监控分片性能指标
*/
public Map<String, Object> getPerformanceMetrics() {
List<Map<String, Object>> shardMetrics = new ArrayList<>();
shardDataSources.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
DataSource dataSource = entry.getValue();
JdbcTemplate template = new JdbcTemplate(dataSource);
try {
// 查询性能计数器
String sql = """
SELECT
'" + shardName + "' as shard_name,
(SELECT cntr_value FROM sys.dm_os_performance_counters
WHERE counter_name = 'Batch Requests/sec') as batch_requests_per_sec,
(SELECT cntr_value FROM sys.dm_os_performance_counters
WHERE counter_name = 'SQL Compilations/sec') as compilations_per_sec,
(SELECT cntr_value FROM sys.dm_os_performance_counters
WHERE counter_name = 'Page life expectancy') as page_life_expectancy,
(SELECT COUNT(*) FROM sys.dm_exec_sessions WHERE is_user_process = 1) as active_sessions,
(SELECT COUNT(*) FROM sys.dm_exec_requests) as active_requests
""";
Map<String, Object> metrics = template.queryForMap(sql);
synchronized (shardMetrics) {
shardMetrics.add(metrics);
}
} catch (Exception e) {
Map<String, Object> errorMetrics = new HashMap<>();
errorMetrics.put("shard_name", shardName);
errorMetrics.put("error", e.getMessage());
synchronized (shardMetrics) {
shardMetrics.add(errorMetrics);
}
}
});
Map<String, Object> result = new HashMap<>();
result.put("shard_metrics", shardMetrics);
result.put("timestamp", LocalDateTime.now());
return result;
}
/**
* 监控等待统计
*/
public List<Map<String, Object>> getWaitStatistics() {
List<Map<String, Object>> allWaitStats = new ArrayList<>();
shardDataSources.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
DataSource dataSource = entry.getValue();
JdbcTemplate template = new JdbcTemplate(dataSource);
try {
String sql = """
SELECT TOP 10
'" + shardName + "' as shard_name,
wait_type,
waiting_tasks_count,
wait_time_ms,
max_wait_time_ms,
signal_wait_time_ms
FROM sys.dm_os_wait_stats
WHERE wait_time_ms > 0
ORDER BY wait_time_ms DESC
""";
List<Map<String, Object>> waitStats = template.queryForList(sql);
synchronized (allWaitStats) {
allWaitStats.addAll(waitStats);
}
} catch (Exception e) {
// 记录错误但继续处理其他分片
System.err.println("Error getting wait stats for " + shardName + ": " + e.getMessage());
}
});
return allWaitStats;
}
/**
* 监控索引使用情况
*/
public List<Map<String, Object>> getIndexUsageStats() {
List<Map<String, Object>> allIndexStats = new ArrayList<>();
shardDataSources.entrySet().parallelStream().forEach(entry -> {
String shardName = entry.getKey();
DataSource dataSource = entry.getValue();
JdbcTemplate template = new JdbcTemplate(dataSource);
try {
String sql = """
SELECT
'" + shardName + "' as shard_name,
OBJECT_NAME(ius.object_id) as table_name,
i.name as index_name,
ius.user_seeks,
ius.user_scans,
ius.user_lookups,
ius.user_updates,
ius.last_user_seek,
ius.last_user_scan,
ius.last_user_lookup
FROM sys.dm_db_index_usage_stats ius
INNER JOIN sys.indexes i ON ius.object_id = i.object_id AND ius.index_id = i.index_id
WHERE OBJECT_NAME(ius.object_id) = 'Orders'
ORDER BY (ius.user_seeks + ius.user_scans + ius.user_lookups) DESC
""";
List<Map<String, Object>> indexStats = template.queryForList(sql);
synchronized (allIndexStats) {
allIndexStats.addAll(indexStats);
}
} catch (Exception e) {
System.err.println("Error getting index stats for " + shardName + ": " + e.getMessage());
}
});
return allIndexStats;
}
}
# 2. 自动化运维脚本
# sqlserver_shard_maintenance.ps1
# SQL Server连接参数
$ServerInstances = @(
"localhost,1433",
"localhost,1434",
"localhost,1435",
"localhost,1436"
)
$Username = "sa"
$Password = "YourStrong@Passw0rd"
$LogPath = "C:\Logs\SQLServer"
# 创建日志目录
if (!(Test-Path $LogPath)) {
New-Item -ItemType Directory -Path $LogPath -Force
}
# 记录日志函数
function Write-Log {
param([string]$Message)
$Timestamp = Get-Date -Format "yyyy-MM-dd HH:mm:ss"
$LogMessage = "$Timestamp - $Message"
Write-Host $LogMessage
Add-Content -Path "$LogPath\shard_maintenance_$(Get-Date -Format 'yyyyMMdd').log" -Value $LogMessage
}
# 备份分片数据库
function Backup-ShardDatabases {
Write-Log "开始备份分片数据库..."
$BackupPath = "C:\Backup\SQLServer\$(Get-Date -Format 'yyyyMMdd')"
if (!(Test-Path $BackupPath)) {
New-Item -ItemType Directory -Path $BackupPath -Force
}
$Databases = @("ShardMapManager", "Shard1", "Shard2", "Shard3")
for ($i = 0; $i -lt $ServerInstances.Length; $i++) {
$Server = $ServerInstances[$i]
$Database = $Databases[$i]
try {
$BackupFile = "$BackupPath\${Database}_$(Get-Date -Format 'yyyyMMdd_HHmmss').bak"
$Query = @"
BACKUP DATABASE [$Database]
TO DISK = '$BackupFile'
WITH FORMAT, INIT, COMPRESSION;
"@
Invoke-Sqlcmd -ServerInstance $Server -Username $Username -Password $Password -Query $Query -QueryTimeout 3600
Write-Log "数据库 $Database 备份完成: $BackupFile"
}
catch {
Write-Log "数据库 $Database 备份失败: $($_.Exception.Message)"
}
}
Write-Log "分片数据库备份完成"
}
# 检查分片状态
function Check-ShardStatus {
Write-Log "检查分片状态..."
foreach ($Server in $ServerInstances) {
try {
$Query = @"
SELECT
@@SERVERNAME as server_name,
DB_NAME() as database_name,
(SELECT COUNT(*) FROM sys.dm_exec_sessions WHERE is_user_process = 1) as active_sessions,
(SELECT COUNT(*) FROM sys.dm_exec_requests) as active_requests,
(SELECT cntr_value FROM sys.dm_os_performance_counters WHERE counter_name = 'Page life expectancy') as page_life_expectancy
"@
$Result = Invoke-Sqlcmd -ServerInstance $Server -Username $Username -Password $Password -Query $Query
Write-Log "服务器 $Server 状态正常 - 活动会话: $($Result.active_sessions), 活动请求: $($Result.active_requests)"
}
catch {
Write-Log "服务器 $Server 状态检查失败: $($_.Exception.Message)"
}
}
Write-Log "分片状态检查完成"
}
# 更新统计信息
function Update-Statistics {
Write-Log "更新统计信息..."
$Databases = @("ShardMapManager", "Shard1", "Shard2", "Shard3")
for ($i = 0; $i -lt $ServerInstances.Length; $i++) {
$Server = $ServerInstances[$i]
$Database = $Databases[$i]
try {
$Query = @"
USE [$Database];
EXEC sp_updatestats;
UPDATE STATISTICS Orders WITH FULLSCAN;
"@
Invoke-Sqlcmd -ServerInstance $Server -Username $Username -Password $Password -Query $Query -QueryTimeout 1800
Write-Log "数据库 $Database 统计信息更新完成"
}
catch {
Write-Log "数据库 $Database 统计信息更新失败: $($_.Exception.Message)"
}
}
Write-Log "统计信息更新完成"
}
# 清理旧日志
function Cleanup-Logs {
Write-Log "清理旧日志文件..."
# 清理30天前的日志文件
$CutoffDate = (Get-Date).AddDays(-30)
Get-ChildItem -Path $LogPath -Filter "*.log" | Where-Object { $_.LastWriteTime -lt $CutoffDate } | Remove-Item -Force
# 清理SQL Server错误日志
foreach ($Server in $ServerInstances) {
try {
$Query = "EXEC sp_cycle_errorlog;"
Invoke-Sqlcmd -ServerInstance $Server -Username $Username -Password $Password -Query $Query
Write-Log "服务器 $Server 错误日志已循环"
}
catch {
Write-Log "服务器 $Server 错误日志循环失败: $($_.Exception.Message)"
}
}
Write-Log "日志清理完成"
}
# 监控分片平衡
function Monitor-ShardBalance {
Write-Log "监控分片平衡状态..."
$ShardCounts = @()
$ShardDatabases = @("Shard1", "Shard2", "Shard3")
for ($i = 1; $i -lt $ServerInstances.Length; $i++) {
$Server = $ServerInstances[$i]
$Database = $ShardDatabases[$i-1]
try {
$Query = "USE [$Database]; SELECT COUNT(*) as record_count FROM Orders;"
$Result = Invoke-Sqlcmd -ServerInstance $Server -Username $Username -Password $Password -Query $Query
$ShardCounts += $Result.record_count
Write-Log "分片 $Database 记录数: $($Result.record_count)"
}
catch {
Write-Log "分片 $Database 记录数查询失败: $($_.Exception.Message)"
$ShardCounts += 0
}
}
# 计算平衡度
if ($ShardCounts.Count -gt 0) {
$TotalRecords = ($ShardCounts | Measure-Object -Sum).Sum
$AvgRecords = $TotalRecords / $ShardCounts.Count
for ($i = 0; $i -lt $ShardCounts.Count; $i++) {
$Deviation = [Math]::Abs($ShardCounts[$i] - $AvgRecords) / $AvgRecords
if ($Deviation -gt 0.2) {
Write-Log "警告: 分片 Shard$($i+1) 数据不平衡,偏差: $([Math]::Round($Deviation * 100, 2))%"
}
}
}
Write-Log "分片平衡监控完成"
}
# 主函数
switch ($args[0]) {
"backup" { Backup-ShardDatabases }
"status" { Check-ShardStatus }
"stats" { Update-Statistics }
"cleanup" { Cleanup-Logs }
"balance" { Monitor-ShardBalance }
"all" {
Backup-ShardDatabases
Check-ShardStatus
Update-Statistics
Monitor-ShardBalance
Cleanup-Logs
}
default {
Write-Host "用法: .\sqlserver_shard_maintenance.ps1 {backup|status|stats|cleanup|balance|all}"
exit 1
}
}
# 最佳实践
# 1. 分片设计原则
- 选择合适的分片键:选择分布均匀且查询频繁的字段
- 避免热点分片:确保数据在分片间均匀分布
- 考虑查询模式:根据业务查询模式设计分片策略
- 规划扩展性:预留足够的分片扩展空间
# 2. 性能优化
- 使用分区表:利用SQL Server原生分区功能
- 优化索引策略:创建分区对齐的索引
- 批量操作:使用批量插入和更新提高性能
- 查询优化:避免跨分片查询,使用查询提示
# 3. 运维管理
- 监控告警:设置关键性能指标监控
- 定期维护:定期更新统计信息和重建索引
- 备份策略:制定完善的备份恢复计划
- 容量规划:定期评估存储和性能需求
# 4. 高可用性
- Always On配置:使用Always On可用性组
- 故障转移:配置自动故障转移
- 读写分离:配置只读副本分担查询负载
- 灾难恢复:建立异地灾备方案
# 总结
SQL Server分片技术通过分区表、弹性数据库工具和Always On等功能,为企业应用提供了强大的水平扩展能力。合理的架构设计、性能优化和运维管理,可以构建高性能、高可用的分布式数据库系统。SQL Server的企业级特性和丰富的管理工具,使其成为Windows平台上的理想选择。