Redis分片技术实现
# Redis分片技术实现
# 概述
Redis分片(Redis Sharding)是将数据分布到多个Redis实例中的技术,用于突破单个Redis实例的内存限制,提高系统的整体性能和可用性。
# Redis分片方案
# 1. 客户端分片(Client-side Sharding)
客户端负责决定数据存储在哪个Redis实例中。
@Component
public class RedisClientSharding {
private List<JedisPool> jedisPools;
private ConsistentHash<JedisPool> consistentHash;
@PostConstruct
public void init() {
// 初始化Redis连接池
jedisPools = Arrays.asList(
new JedisPool("redis-node-1:6379"),
new JedisPool("redis-node-2:6379"),
new JedisPool("redis-node-3:6379"),
new JedisPool("redis-node-4:6379")
);
// 初始化一致性哈希环
consistentHash = new ConsistentHash<>(jedisPools);
}
/**
* 根据key获取对应的Redis实例
*/
private JedisPool getJedisPool(String key) {
return consistentHash.get(key);
}
/**
* 设置值
*/
public void set(String key, String value) {
JedisPool pool = getJedisPool(key);
try (Jedis jedis = pool.getResource()) {
jedis.set(key, value);
}
}
/**
* 获取值
*/
public String get(String key) {
JedisPool pool = getJedisPool(key);
try (Jedis jedis = pool.getResource()) {
return jedis.get(key);
}
}
/**
* 批量获取(需要跨分片)
*/
public Map<String, String> mget(String... keys) {
Map<String, String> result = new HashMap<>();
Map<JedisPool, List<String>> poolKeyMap = new HashMap<>();
// 按分片分组keys
for (String key : keys) {
JedisPool pool = getJedisPool(key);
poolKeyMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(key);
}
// 并行查询各分片
poolKeyMap.entrySet().parallelStream().forEach(entry -> {
JedisPool pool = entry.getKey();
List<String> poolKeys = entry.getValue();
try (Jedis jedis = pool.getResource()) {
List<String> values = jedis.mget(poolKeys.toArray(new String[0]));
for (int i = 0; i < poolKeys.size(); i++) {
result.put(poolKeys.get(i), values.get(i));
}
}
});
return result;
}
}
# 2. 一致性哈希实现
public class ConsistentHash<T> {
private final SortedMap<Long, T> circle = new TreeMap<>();
private final int virtualNodes;
private final HashFunction hashFunction;
public ConsistentHash(Collection<T> nodes) {
this(nodes, 150); // 默认150个虚拟节点
}
public ConsistentHash(Collection<T> nodes, int virtualNodes) {
this.virtualNodes = virtualNodes;
this.hashFunction = Hashing.md5();
for (T node : nodes) {
addNode(node);
}
}
/**
* 添加节点
*/
public void addNode(T node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNodeName = node.toString() + "#" + i;
long hash = hashFunction.hashString(virtualNodeName, StandardCharsets.UTF_8).asLong();
circle.put(hash, node);
}
}
/**
* 移除节点
*/
public void removeNode(T node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNodeName = node.toString() + "#" + i;
long hash = hashFunction.hashString(virtualNodeName, StandardCharsets.UTF_8).asLong();
circle.remove(hash);
}
}
/**
* 获取key对应的节点
*/
public T get(String key) {
if (circle.isEmpty()) {
return null;
}
long hash = hashFunction.hashString(key, StandardCharsets.UTF_8).asLong();
if (!circle.containsKey(hash)) {
SortedMap<Long, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
}
# 3. Redis Cluster(官方集群方案)
@Configuration
public class RedisClusterConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
List<RedisNode> nodes = Arrays.asList(
new RedisNode("redis-cluster-1", 7000),
new RedisNode("redis-cluster-2", 7000),
new RedisNode("redis-cluster-3", 7000),
new RedisNode("redis-cluster-4", 7000),
new RedisNode("redis-cluster-5", 7000),
new RedisNode("redis-cluster-6", 7000)
);
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.setClusterNodes(nodes);
clusterConfig.setMaxRedirects(3);
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.build();
return new LettuceConnectionFactory(clusterConfig, clientConfig);
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
// 设置序列化器
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
@Service
public class RedisClusterService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 设置值(自动路由到正确的分片)
*/
public void set(String key, Object value, long timeout, TimeUnit unit) {
redisTemplate.opsForValue().set(key, value, timeout, unit);
}
/**
* 获取值
*/
public Object get(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* 批量操作(可能跨分片)
*/
public List<Object> multiGet(Collection<String> keys) {
return redisTemplate.opsForValue().multiGet(keys);
}
/**
* 使用Pipeline提高性能
*/
public void batchSet(Map<String, Object> keyValues) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : keyValues.entrySet()) {
byte[] key = entry.getKey().getBytes();
byte[] value = serialize(entry.getValue());
connection.set(key, value);
}
return null;
}
});
}
private byte[] serialize(Object obj) {
// 实现序列化逻辑
return obj.toString().getBytes();
}
}
# Redis Cluster部署
# 1. 集群配置文件
# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
bind 0.0.0.0
protected-mode no
# 启动Redis实例
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf
# 创建集群
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1
# 2. 集群管理脚本
#!/bin/bash
# redis-cluster-manager.sh
CLUSTER_NODES="127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005"
case "$1" in
start)
echo "Starting Redis Cluster..."
for port in 7000 7001 7002 7003 7004 7005; do
redis-server redis-$port.conf
done
;;
stop)
echo "Stopping Redis Cluster..."
for port in 7000 7001 7002 7003 7004 7005; do
redis-cli -p $port shutdown
done
;;
status)
echo "Redis Cluster Status:"
redis-cli --cluster info 127.0.0.1:7000
;;
add-node)
echo "Adding new node $2 to cluster..."
redis-cli --cluster add-node $2 127.0.0.1:7000
;;
reshard)
echo "Resharding cluster..."
redis-cli --cluster reshard 127.0.0.1:7000
;;
*)
echo "Usage: $0 {start|stop|status|add-node|reshard}"
exit 1
;;
esac
# 分片策略优化
# 1. 热点数据处理
@Service
public class HotDataShardingService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final LoadingCache<String, AtomicLong> accessCounter = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(key -> new AtomicLong(0));
/**
* 检测热点key
*/
public boolean isHotKey(String key) {
long accessCount = accessCounter.get(key).incrementAndGet();
return accessCount > 1000; // 5分钟内访问超过1000次认为是热点
}
/**
* 热点数据多副本存储
*/
public void setWithHotKeyOptimization(String key, Object value) {
if (isHotKey(key)) {
// 热点数据存储多个副本
for (int i = 0; i < 3; i++) {
String replicaKey = key + "#replica#" + i;
redisTemplate.opsForValue().set(replicaKey, value);
}
} else {
redisTemplate.opsForValue().set(key, value);
}
}
/**
* 热点数据负载均衡读取
*/
public Object getWithHotKeyOptimization(String key) {
if (isHotKey(key)) {
// 随机选择一个副本读取
int replicaIndex = ThreadLocalRandom.current().nextInt(3);
String replicaKey = key + "#replica#" + replicaIndex;
Object value = redisTemplate.opsForValue().get(replicaKey);
if (value == null) {
// 副本不存在,回退到原key
value = redisTemplate.opsForValue().get(key);
}
return value;
} else {
return redisTemplate.opsForValue().get(key);
}
}
}
# 2. 数据倾斜处理
@Component
public class DataSkewHandler {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 监控各分片数据分布
*/
@Scheduled(fixedRate = 300000) // 5分钟检查一次
public void monitorDataDistribution() {
Map<String, Long> shardSizes = new HashMap<>();
// 获取集群信息
RedisClusterConnection clusterConnection =
(RedisClusterConnection) redisTemplate.getConnectionFactory().getConnection();
Iterable<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
RedisConnection nodeConnection = clusterConnection.getConnection(node);
Properties info = nodeConnection.info("memory");
String usedMemory = info.getProperty("used_memory");
shardSizes.put(node.getId(), Long.parseLong(usedMemory));
}
}
// 检查数据倾斜
checkDataSkew(shardSizes);
}
private void checkDataSkew(Map<String, Long> shardSizes) {
if (shardSizes.isEmpty()) return;
long maxSize = Collections.max(shardSizes.values());
long minSize = Collections.min(shardSizes.values());
// 如果最大分片是最小分片的3倍以上,认为存在数据倾斜
if (maxSize > minSize * 3) {
log.warn("检测到数据倾斜,最大分片: {}MB, 最小分片: {}MB",
maxSize / 1024 / 1024, minSize / 1024 / 1024);
// 触发重新分片
triggerReshard();
}
}
private void triggerReshard() {
// 实现重新分片逻辑
log.info("开始执行重新分片操作");
}
}
# 3. 跨分片事务处理
@Service
public class RedisDistributedTransaction {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 使用Lua脚本实现原子操作
*/
public boolean transferPoints(String fromUser, String toUser, int points) {
String luaScript = """
local fromKey = KEYS[1]
local toKey = KEYS[2]
local points = tonumber(ARGV[1])
local fromPoints = tonumber(redis.call('GET', fromKey) or 0)
if fromPoints >= points then
redis.call('DECRBY', fromKey, points)
redis.call('INCRBY', toKey, points)
return 1
else
return 0
end
""";
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);
List<String> keys = Arrays.asList(
"user:points:" + fromUser,
"user:points:" + toUser
);
Long result = redisTemplate.execute(script, keys, points);
return result != null && result == 1;
}
/**
* 分布式锁实现
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
String luaScript = """
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
return 1
else
return 0
end
""";
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);
Long result = redisTemplate.execute(script,
Collections.singletonList(lockKey), requestId, expireTime);
return result != null && result == 1;
}
/**
* 释放分布式锁
*/
public boolean releaseLock(String lockKey, String requestId) {
String luaScript = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
""";
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(Long.class);
Long result = redisTemplate.execute(script,
Collections.singletonList(lockKey), requestId);
return result != null && result == 1;
}
}
# 性能优化
# 1. 连接池优化
@Configuration
public class RedisConnectionPoolConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
// 连接池配置
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(200); // 最大连接数
poolConfig.setMaxIdle(50); // 最大空闲连接数
poolConfig.setMinIdle(10); // 最小空闲连接数
poolConfig.setMaxWaitMillis(3000); // 获取连接最大等待时间
poolConfig.setTestOnBorrow(true); // 获取连接时检测有效性
poolConfig.setTestOnReturn(true); // 归还连接时检测有效性
poolConfig.setTestWhileIdle(true); // 空闲时检测有效性
// Lettuce客户端配置
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.poolingClientConfiguration(LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.build())
.commandTimeout(Duration.ofSeconds(2))
.shutdownTimeout(Duration.ofSeconds(5))
.build();
return new LettuceConnectionFactory(redisClusterConfiguration(), clientConfig);
}
}
# 2. 批量操作优化
@Service
public class RedisBatchOptimization {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 批量设置(使用Pipeline)
*/
public void batchSet(Map<String, Object> data) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : data.entrySet()) {
byte[] key = entry.getKey().getBytes();
byte[] value = serialize(entry.getValue());
connection.set(key, value);
}
return null;
}
});
}
/**
* 批量获取(按分片分组)
*/
public Map<String, Object> batchGet(Set<String> keys) {
// 按分片分组keys
Map<Integer, List<String>> shardKeyMap = keys.stream()
.collect(Collectors.groupingBy(this::getShardIndex));
Map<String, Object> result = new ConcurrentHashMap<>();
// 并行查询各分片
shardKeyMap.entrySet().parallelStream().forEach(entry -> {
List<String> shardKeys = entry.getValue();
List<Object> values = redisTemplate.opsForValue().multiGet(shardKeys);
for (int i = 0; i < shardKeys.size(); i++) {
if (values.get(i) != null) {
result.put(shardKeys.get(i), values.get(i));
}
}
});
return result;
}
private int getShardIndex(String key) {
// 计算key对应的分片索引
return Math.abs(key.hashCode()) % 16; // 假设16个分片
}
private byte[] serialize(Object obj) {
// 实现序列化
return obj.toString().getBytes();
}
}
# 3. 缓存预热策略
@Service
public class RedisCacheWarmup {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserService userService;
/**
* 应用启动时预热缓存
*/
@EventListener(ApplicationReadyEvent.class)
public void warmupCache() {
log.info("开始缓存预热...");
CompletableFuture.runAsync(() -> {
try {
warmupHotUsers();
warmupHotProducts();
warmupConfigData();
log.info("缓存预热完成");
} catch (Exception e) {
log.error("缓存预热失败", e);
}
});
}
/**
* 预热热点用户数据
*/
private void warmupHotUsers() {
List<Long> hotUserIds = userService.getHotUserIds(1000);
Map<String, Object> userData = new HashMap<>();
for (Long userId : hotUserIds) {
User user = userService.getUserById(userId);
userData.put("user:" + userId, user);
}
// 批量写入Redis
batchSetWithExpire(userData, 3600); // 1小时过期
}
/**
* 批量设置带过期时间
*/
private void batchSetWithExpire(Map<String, Object> data, long seconds) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (Map.Entry<String, Object> entry : data.entrySet()) {
byte[] key = entry.getKey().getBytes();
byte[] value = serialize(entry.getValue());
connection.setEx(key, seconds, value);
}
return null;
}
});
}
private byte[] serialize(Object obj) {
// 实现序列化
return obj.toString().getBytes();
}
}
# 监控与运维
# 1. Redis集群监控
@Component
public class RedisClusterMonitor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private MeterRegistry meterRegistry;
/**
* 监控集群状态
*/
@Scheduled(fixedRate = 30000)
public void monitorClusterHealth() {
try {
RedisClusterConnection connection =
(RedisClusterConnection) redisTemplate.getConnectionFactory().getConnection();
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
int masterCount = 0;
int slaveCount = 0;
int failedCount = 0;
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
masterCount++;
} else {
slaveCount++;
}
if (node.getFlags().contains(RedisClusterNode.Flag.FAIL)) {
failedCount++;
}
}
// 记录指标
Gauge.builder("redis.cluster.master.count")
.register(meterRegistry, masterCount);
Gauge.builder("redis.cluster.slave.count")
.register(meterRegistry, slaveCount);
Gauge.builder("redis.cluster.failed.count")
.register(meterRegistry, failedCount);
} catch (Exception e) {
log.error("Redis集群监控失败", e);
}
}
/**
* 监控性能指标
*/
@Scheduled(fixedRate = 60000)
public void monitorPerformanceMetrics() {
RedisClusterConnection connection =
(RedisClusterConnection) redisTemplate.getConnectionFactory().getConnection();
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
try {
RedisConnection nodeConnection = connection.getConnection(node);
Properties info = nodeConnection.info();
// 内存使用率
long usedMemory = Long.parseLong(info.getProperty("used_memory"));
long maxMemory = Long.parseLong(info.getProperty("maxmemory"));
double memoryUsageRatio = maxMemory > 0 ? (double) usedMemory / maxMemory : 0;
// QPS
long totalCommands = Long.parseLong(info.getProperty("total_commands_processed"));
// 记录指标
Gauge.builder("redis.memory.usage.ratio")
.tag("node", node.getId())
.register(meterRegistry, memoryUsageRatio);
Gauge.builder("redis.commands.total")
.tag("node", node.getId())
.register(meterRegistry, totalCommands);
} catch (Exception e) {
log.error("获取节点{}性能指标失败", node.getId(), e);
}
}
}
}
}
# 2. 故障自动恢复
@Service
public class RedisFailoverService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private NotificationService notificationService;
/**
* 检测并处理节点故障
*/
@Scheduled(fixedRate = 15000)
public void detectAndHandleFailures() {
try {
RedisClusterConnection connection =
(RedisClusterConnection) redisTemplate.getConnectionFactory().getConnection();
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.getFlags().contains(RedisClusterNode.Flag.FAIL)) {
handleNodeFailure(node);
}
}
} catch (Exception e) {
log.error("故障检测失败", e);
}
}
private void handleNodeFailure(RedisClusterNode failedNode) {
log.error("检测到节点故障: {}", failedNode.getId());
// 发送告警
notificationService.sendAlert(
"Redis节点故障",
String.format("节点 %s 发生故障,请及时处理", failedNode.getId())
);
// 如果是主节点故障,检查是否有从节点可以提升
if (failedNode.isMaster()) {
promoteSlaveToMaster(failedNode);
}
}
private void promoteSlaveToMaster(RedisClusterNode failedMaster) {
// 实现从节点提升为主节点的逻辑
log.info("尝试提升从节点为主节点,替换故障节点: {}", failedMaster.getId());
}
}
# 总结
Redis分片技术是构建高性能、高可用缓存系统的关键技术。选择合适的分片方案需要考虑:
- 业务场景:数据访问模式、一致性要求、性能需求
- 运维复杂度:集群管理、故障处理、扩容难度
- 成本考虑:硬件资源、开发成本、维护成本
推荐方案:
- 小规模应用:客户端分片 + 一致性哈希
- 中大规模应用:Redis Cluster官方方案
- 超大规模应用:Redis Cluster + 代理层(如Twemproxy、Codis)
通过合理的分片设计和优化,可以构建出高性能、高可用的Redis集群系统。