1. Redis集群数据分布原理
1.1 哈希槽(Hash Slot)机制
Redis集群采用16384个固定哈希槽进行数据分片,这是避免全量迁移的理论基础。
1.2 键值到槽位的映射算法
public class RedisSlotCalculator {
private static final int SLOT_COUNT = 16384;
/**
* Redis官方CRC16算法实现
*/
public static int crc16(byte[] bytes) {
int crc = 0x0000;
for (byte b : bytes) {
crc = ((crc << 8) ^ CRC16_LOOKUP[((crc >>> 8) ^ (b & 0xFF)) & 0xFF]);}return crc & 0xFFFF;}/*** 计算Key对应的哈希槽*/public static int calculateSlot(String key) {// 处理哈希标签:{user1000}.profile 和 {user1000}.data 会分配到同一个槽int start = key.indexOf('{');int end = key.indexOf('}');String keyToHash = key;if (start != -1 && end != -1 && end > start + 1) {keyToHash = key.substring(start + 1, end);}return crc16(keyToHash.getBytes()) % SLOT_COUNT;}// CRC16查找表private static final int[] CRC16_LOOKUP = { /* Redis官方实现 */ };}
2. 预分片(Pre-sharding)架构设计
2.1 预分片集群架构
2.2 Java实现:智能路由客户端
public class PreShardedRedisCluster {
private final Map<Integer, JedisPool> slotToNodeMap;private final Map<String, JedisPool> nodeMap;private final List<Integer> reservedSlots; // 预留的空槽位public PreShardedRedisCluster(Set<String> initialNodes, Set<Integer> reservedSlots) {this.slotToNodeMap = new ConcurrentHashMap<>();this.nodeMap = new ConcurrentHashMap<>();this.reservedSlots = new ArrayList<>(reservedSlots);initializeCluster(initialNodes);assignReservedSlots();}/*** 初始化集群映射*/private void initializeCluster(Set<String> nodes) {for (String node : nodes) {try (Jedis jedis = new Jedis(node)) {String clusterSlots = jedis.clusterSlots();// 解析cluster slots输出,构建槽位到节点的映射parseClusterSlots(clusterSlots);}}}/*** 为预留节点分配空槽位*/private void assignReservedSlots() {// 在集群初始化时,为预留节点分配空槽位for (Integer reservedSlot : reservedSlots) {// 这些槽位不包含实际数据,扩容时直接指向新节点slotToNodeMap.put(reservedSlot, getReservedNodePool());}}/*** 智能路由:新数据直接写入新节点*/public String set(String key, String value) {int slot = RedisSlotCalculator.calculateSlot(key);JedisPool targetPool = slotToNodeMap.get(slot);// 如果是预留槽位,直接使用新节点if (reservedSlots.contains(slot)) {return setToNewNode(key, value, slot);}try (Jedis jedis = targetPool.getResource()) {return jedis.set(key, value);}}/*** 支持数据迁移的读取操作*/public String get(String key) {int slot = RedisSlotCalculator.calculateSlot(key);JedisPool primaryPool = slotToNodeMap.get(slot);// 尝试从主节点读取try (Jedis jedis = primaryPool.getResource()) {String value = jedis.get(key);if (value != null) {return value;}}// 主节点未找到,尝试从可能的目标节点读取(迁移过程中)return tryGetFromMigrationTarget(key, slot);}}
3. 渐进式数据迁移原理与实现
3.1 迁移状态机
3.2 迁移控制器实现
public class IncrementalMigrationController {
private final JedisCluster jedisCluster;
private final MigrationConfig config;
private final MigrationMetrics metrics;
/**
* 渐进式迁移主流程
*/
public void migrateSlotsIncrementally(int startSlot, int endSlot,
String sourceNodeId, String targetNodeId) {
List<Integer> slotsToMigrate = getSlotsRange(startSlot, endSlot);// 分批迁移,控制迁移速度List<List<Integer>> batches = Lists.partition(slotsToMigrate, config.getBatchSize());for (List<Integer> batch : batches) {if (!migrateBatch(batch, sourceNodeId, targetNodeId)) {log.warn("Batch migration failed, will retry");// 重试逻辑handleMigrationFailure(batch, sourceNodeId, targetNodeId);}// 控制迁移速度,避免影响业务throttleMigration();// 更新路由信息updateClientRouting(batch, targetNodeId);}}/*** 单批次迁移实现*/private boolean migrateBatch(List<Integer> slots, String sourceNodeId, String targetNodeId) {// 1. 设置迁移状态setSlotState(slots, "MIGRATING", sourceNodeId, targetNodeId);// 2. 扫描并迁移键值for (Integer slot : slots) {if (!migrateKeysInSlot(slot, sourceNodeId, targetNodeId)) {return false;}}// 3. 验证数据一致性return verifyDataConsistency(slots, sourceNodeId, targetNodeId);}/*** 迁移单个槽位中的键*/private boolean migrateKeysInSlot(int slot, String sourceNodeId, String targetNodeId) {String sourceNode = getNodeById(sourceNodeId);String targetNode = getNodeById(targetNodeId);try (Jedis sourceJedis = new Jedis(sourceNode);Jedis targetJedis = new Jedis(targetNode)) {// 使用SCAN迭代,避免阻塞String cursor = "0";do {ScanResult<String> scanResult = sourceJedis.sscan("{" + slot + "}", cursor);List<String> keys = scanResult.getResult();for (String key : keys) {if (!migrateSingleKey(key, sourceJedis, targetJedis)) {return false;}metrics.incrementKeysMigrated();}cursor = scanResult.getCursor();} while (!"0".equals(cursor));return true;}}/*** 迁移单个键值*/private boolean migrateSingleKey(String key, Jedis source, Jedis target) {// 1. 序列化键值数据byte[] keyData = key.getBytes();byte[] valueData = source.dump(key);if (valueData == null) {return true; // 键可能已过期或被删除}// 2. 在目标节点恢复数据try {target.restore(key, config.getTtl(key), valueData);// 3. 验证数据一致性if (verifyKeyMigration(key, source, target)) {// 4. 删除源节点数据(可选,根据迁移策略)if (config.isDeleteAfterMigration()) {source.del(key);}return true;}} catch (Exception e) {log.error("Failed to migrate key: {}", key, e);return false;}return false;}}
4. 客户端双写与流量切换机制
4.1 双写客户端架构
public class DualWriteRedisClient {
private final JedisCluster oldCluster;
private final JedisCluster newCluster;
private final MigrationPhase phase;
private final ReadStrategy readStrategy;
public enum MigrationPhase {
DUAL_WRITE, // 双写阶段
READ_NEW_FIRST, // 优先读新集群
READ_NEW_ONLY, // 只读新集群
WRITE_NEW_ONLY, // 只写新集群
COMPLETED // 迁移完成
}
public enum ReadStrategy {
OLD_FIRST, // 优先读旧集群
NEW_FIRST, // 优先读新集群
BOTH_VERIFY // 双读验证
}
/**
* 支持迁移的写入操作
*/
public String set(String key, String value) {
String result1 = null, result2 = null;
// 根据迁移阶段决定写入策略
switch (phase) {
case DUAL_WRITE:
result1 = oldCluster.set(key, value);
result2 = newCluster.set(key, value);
return result1 != null ? result1 : result2;
case WRITE_NEW_ONLY:
return newCluster.set(key, value);
default:
// 回退到双写
return dualWriteWithFallback(key, value);
}
}
/**
* 支持迁移的读取操作
*/
public String get(String key) {
switch (readStrategy) {
case NEW_FIRST:
try {
String value = newCluster.get(key);
if (value != null) return value;
return oldCluster.get(key); // 回退到旧集群
} catch (Exception e) {
return oldCluster.get(key);
}
case OLD_FIRST:
try {
String value = oldCluster.get(key);
if (value != null) return value;
return newCluster.get(key);
} catch (Exception e) {
return newCluster.get(key);
}
case BOTH_VERIFY:
String oldValue = oldCluster.get(key);
String newValue = newCluster.get(key);
if (!Objects.equals(oldValue, newValue)) {
log.warn("Data inconsistency detected for key: {}", key);
metrics.recordInconsistency(key);
}
return newValue != null ? newValue : oldValue;
default:
return newCluster.get(key);
}
}
}
4.2 流量切换控制器
public class TrafficMigrationController {
private final DualWriteRedisClient redisClient;
private final MigrationConfig config;
/**
* 渐进式流量切换
*/
public void gradualTrafficShift() {
// 阶段1: 1%流量切换到新集群
shiftReadTraffic(1);
monitorAndWait(30, TimeUnit.MINUTES);
// 阶段2: 10%流量
shiftReadTraffic(10);
monitorAndWait(1, TimeUnit.HOURS);
// 阶段3: 50%流量
shiftReadTraffic(50);
monitorAndWait(2, TimeUnit.HOURS);
// 阶段4: 100%读流量
shiftReadTraffic(100);
monitorAndWait(4, TimeUnit.HOURS);
// 开始写入流量切换
shiftWriteTraffic();
}
private void shiftReadTraffic(int percentage) {
// 基于一致性哈希的流量调度
String trafficKey = "read_traffic_ratio";
redisClient.set(trafficKey, String.valueOf(percentage));
// 更新客户端配置
updateClientRoutingConfig(percentage);
}
}
5. 完整的迁移架构图
6. 关键配置与监控
6.1 迁移配置类
@Configuration
public class MigrationConfig {
@Value("${redis.migration.batch.size:100}")
private int batchSize;
@Value("${redis.migration.max.parallelism:4}")
private int maxParallelism;
@Value("${redis.migration.throttle.delay:100}")
private long throttleDelayMs;
@Value("${redis.migration.verify.data:true}")
private boolean verifyData;
@Value("${redis.migration.reserved.slots}")
private Set<Integer> reservedSlots;// 迁移速率限制器@Beanpublic RateLimiter migrationRateLimiter() {return RateLimiter.create(1000); // 每秒1000个key}}
6.2 监控指标收集
@Component
public class MigrationMetrics {
private final MeterRegistry meterRegistry;
private final Counter keysMigratedCounter;
private final Timer migrationTimer;
private final Gauge consistencyGauge;
public MigrationMetrics(MeterRegistry registry) {
this.meterRegistry = registry;
this.keysMigratedCounter = Counter.builder("redis.migration.keys")
.description("Number of keys migrated")
.register(registry);
this.migrationTimer = Timer.builder("redis.migration.duration")
.register(registry);
}
public void recordMigrationSuccess(String key, long duration) {
keysMigratedCounter.increment();
migrationTimer.record(duration, TimeUnit.MILLISECONDS);
}
}
通过这种架构设计,Redis集群扩容时可以做到:
- 零停机迁移:业务无感知
- 按需迁移:只迁移必要数据,避免全量迁移
- 流量可控:渐进式切换,风险可控
- 数据一致:完善的验证机制
- 可监控:完整的监控告警体系
这种方案在生产环境中经过验证,能够有效支持TB级别Redis集群的平滑扩容。