基于redis实现分布式锁方案实战

分布式锁的进阶实现与优化方案

作为Java高级开发工程师,我将为您提供更完善的Redis分布式锁实现方案,包含更多生产级考量。

1. 生产级Redis分布式锁实现

1.1 完整实现类(支持可重入、自动续约)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class RedisDistributedLock {private final JedisPool jedisPool;private final ScheduledExecutorService scheduler;// 本地存储锁的持有计数(用于可重入)private final ThreadLocal<Map<String, LockEntry>> lockHoldCounts = ThreadLocal.withInitial(HashMap::new);// 锁续约的Future集合private final ConcurrentMap<String, ScheduledFuture<?>> renewalFutures = new ConcurrentHashMap<>();private static class LockEntry {final String requestId;final AtomicInteger holdCount;LockEntry(String requestId) {this.requestId = requestId;this.holdCount = new AtomicInteger(1);}}public RedisDistributedLock(JedisPool jedisPool) {this.jedisPool = jedisPool;this.scheduler = Executors.newScheduledThreadPool(4);}public boolean tryLock(String lockKey, String requestId, int expireTime) {return tryLock(lockKey, requestId, expireTime, 0, 0);}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval) {long startTime = System.currentTimeMillis();try {// 检查是否已经持有锁(可重入)LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry != null && entry.requestId.equals(requestId)) {entry.holdCount.incrementAndGet();return true;}// 尝试获取锁while (true) {if (acquireLock(lockKey, requestId, expireTime)) {// 获取成功,记录持有信息lockHoldCounts.get().put(lockKey, new LockEntry(requestId));// 启动自动续约scheduleRenewal(lockKey, requestId, expireTime);return true;}// 检查是否超时if (maxWaitTime > 0 && System.currentTimeMillis() - startTime > maxWaitTime) {return false;}// 等待重试if (retryInterval > 0) {try {Thread.sleep(retryInterval);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}}} catch (Exception e) {// 异常处理return false;}}private boolean acquireLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(lockKey, requestId, params));}}private void scheduleRenewal(String lockKey, String requestId, int expireTime) {// 计算续约间隔(通常是过期时间的1/3)long renewalInterval = expireTime * 2 / 3;ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {try {renewLock(lockKey, requestId, expireTime);} catch (Exception e) {// 续约失败,取消任务renewalFutures.remove(lockKey);}}, renewalInterval, renewalInterval, TimeUnit.MILLISECONDS);renewalFutures.put(lockKey, future);}public boolean renewLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(expireTime)));return "OK".equals(result);}}public boolean releaseLock(String lockKey, String requestId) {try {LockEntry entry = lockHoldCounts.get().get(lockKey);if (entry == null || !entry.requestId.equals(requestId)) {return false;}// 减少持有计数if (entry.holdCount.decrementAndGet() > 0) {return true;}// 完全释放锁lockHoldCounts.get().remove(lockKey);// 取消续约任务ScheduledFuture<?> future = renewalFutures.remove(lockKey);if (future != null) {future.cancel(false);}// 释放Redis锁try (Jedis jedis = jedisPool.getResource()) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));return Long.valueOf(1L).equals(result);}} catch (Exception e) {return false;}}public void shutdown() {scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}

1.2 锁工厂模式(支持多种锁类型)

public interface DistributedLock {boolean tryLock(String lockKey, String requestId, int expireTime);boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime, long retryInterval);boolean releaseLock(String lockKey, String requestId);
}public class RedisLockFactory {private final JedisPool jedisPool;public RedisLockFactory(JedisPool jedisPool) {this.jedisPool = jedisPool;}public DistributedLock createSimpleLock() {return new SimpleRedisLock(jedisPool);}public DistributedLock createReentrantLock() {return new ReentrantRedisLock(jedisPool);}public DistributedLock createReadWriteLock() {return new RedisReadWriteLock(jedisPool);}private static class SimpleRedisLock implements DistributedLock {// 简单实现(同前面基础实现)}private static class ReentrantRedisLock extends RedisDistributedLock {// 可重入实现(同前面完整实现)}private static class RedisReadWriteLock implements DistributedLock {// 读写锁实现}
}

2. 高级特性实现

2.1 读写锁实现

public class RedisReadWriteLock {private final JedisPool jedisPool;private static final String READ_LOCK_PREFIX = "READ_LOCK:";private static final String WRITE_LOCK_PREFIX = "WRITE_LOCK:";public RedisReadWriteLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryReadLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有写锁if (jedis.exists(WRITE_LOCK_PREFIX + lockKey)) {return false;}// 获取读锁String readLockKey = READ_LOCK_PREFIX + lockKey;Long count = jedis.incr(readLockKey);if (count == 1L) {// 第一次获取读锁,设置过期时间jedis.pexpire(readLockKey, expireTime);}return true;}}public boolean tryWriteLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {// 检查是否有读锁if (jedis.exists(READ_LOCK_PREFIX + lockKey)) {return false;}// 获取写锁SetParams params = SetParams.setParams().nx().px(expireTime);return "OK".equals(jedis.set(WRITE_LOCK_PREFIX + lockKey, requestId, params));}}// 释放方法类似...
}

2.2 公平锁实现(基于Redis列表)

public class RedisFairLock {private final JedisPool jedisPool;private static final String QUEUE_PREFIX = "LOCK_QUEUE:";private static final String LOCK_PREFIX = "LOCK:";public RedisFairLock(JedisPool jedisPool) {this.jedisPool = jedisPool;}public boolean tryLock(String lockKey, String requestId, int expireTime, long maxWaitTime) {long startTime = System.currentTimeMillis();String queueKey = QUEUE_PREFIX + lockKey;String lockRealKey = LOCK_PREFIX + lockKey;try (Jedis jedis = jedisPool.getResource()) {// 加入等待队列jedis.rpush(queueKey, requestId);try {while (true) {// 检查是否轮到自己String firstRequestId = jedis.lindex(queueKey, 0);if (requestId.equals(firstRequestId)) {// 尝试获取锁SetParams params = SetParams.setParams().nx().px(expireTime);if ("OK".equals(jedis.set(lockRealKey, requestId, params))) {return true;}}// 检查超时if (System.currentTimeMillis() - startTime > maxWaitTime) {// 从队列中移除自己jedis.lrem(queueKey, 0, requestId);return false;}// 短暂等待Thread.sleep(100);}} finally {// 确保最终从队列中移除(防止异常情况)jedis.lrem(queueKey, 0, requestId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}public void releaseLock(String lockKey, String requestId) {try (Jedis jedis = jedisPool.getResource()) {String lockRealKey = LOCK_PREFIX + lockKey;String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";jedis.eval(script, Collections.singletonList(lockRealKey), Collections.singletonList(requestId));}}
}

3. 生产环境最佳实践

3.1 配置建议

  1. 锁过期时间

    • 根据业务操作的最长时间设置,通常为业务平均耗时的3倍
    • 例如:业务平均耗时200ms,可设置锁过期时间为600ms
  2. 续约间隔

    • 设置为过期时间的1/3到1/2
    • 例如:过期时间600ms,续约间隔200-300ms
  3. 重试策略

    • 初始重试间隔50-100ms
    • 可考虑指数退避策略

3.2 监控与告警

public class LockMonitor {private final JedisPool jedisPool;private final MeterRegistry meterRegistry; // 假设使用Micrometerpublic LockMonitor(JedisPool jedisPool, MeterRegistry meterRegistry) {this.jedisPool = jedisPool;this.meterRegistry = meterRegistry;}public void monitorLockStats() {// 监控锁获取成功率Timer lockAcquireTimer = Timer.builder("redis.lock.acquire.time").description("Time taken to acquire redis lock").register(meterRegistry);// 监控锁等待时间DistributionSummary waitTimeSummary = DistributionSummary.builder("redis.lock.wait.time").description("Time spent waiting for redis lock").register(meterRegistry);// 监控锁竞争情况Gauge.builder("redis.lock.queue.size", () -> {try (Jedis jedis = jedisPool.getResource()) {return jedis.llen("LOCK_QUEUE:important_lock");}}).description("Number of clients waiting for lock").register(meterRegistry);}
}

3.3 异常处理策略

  1. Redis不可用时的降级策略
    • 本地降级锁(仅适用于单机或可以接受短暂不一致的场景)
    • 快速失败,避免系统雪崩
public class DegradableRedisLock implements DistributedLock {private final DistributedLock redisLock;private final ReentrantLock localLock = new ReentrantLock();private final CircuitBreaker circuitBreaker;public DegradableRedisLock(DistributedLock redisLock) {this.redisLock = redisLock;this.circuitBreaker = CircuitBreaker.ofDefaults("redisLock");}@Overridepublic boolean tryLock(String lockKey, String requestId, int expireTime) {return CircuitBreaker.decorateSupplier(circuitBreaker, () -> {try {return redisLock.tryLock(lockKey, requestId, expireTime);} catch (Exception e) {// Redis不可用,降级到本地锁return localLock.tryLock();}}).get();}// 其他方法实现类似...
}

4. 测试方案

4.1 单元测试

public class RedisDistributedLockTest {private RedisDistributedLock lock;private JedisPool jedisPool;@BeforeEachvoid setUp() {jedisPool = new JedisPool("localhost");lock = new RedisDistributedLock(jedisPool);}@Testvoid testLockAndUnlock() {String lockKey = "test_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.releaseLock(lockKey, requestId));}@Testvoid testReentrantLock() {String lockKey = "test_reentrant_lock";String requestId = UUID.randomUUID().toString();assertTrue(lock.tryLock(lockKey, requestId, 10000));assertTrue(lock.tryLock(lockKey, requestId, 10000)); // 可重入assertTrue(lock.releaseLock(lockKey, requestId));assertTrue(lock.releaseLock(lockKey, requestId)); // 需要释放两次}// 更多测试用例...
}

4.2 并发测试

@Test
void testConcurrentLock() throws InterruptedException {String lockKey = "concurrent_test_lock";int threadCount = 10;CountDownLatch latch = new CountDownLatch(threadCount);AtomicInteger successCount = new AtomicInteger();for (int i = 0; i < threadCount; i++) {new Thread(() -> {String requestId = UUID.randomUUID().toString();if (lock.tryLock(lockKey, requestId, 1000, 5000, 100)) {try {successCount.incrementAndGet();Thread.sleep(100); // 模拟业务处理} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {lock.releaseLock(lockKey, requestId);}}latch.countDown();}).start();}latch.await();assertEquals(1, successCount.get()); // 确保只有一个线程获取到锁
}

5. 性能优化建议

  1. 连接池优化

    • 合理配置Jedis连接池大小
    • 使用try-with-resources确保连接释放
  2. Lua脚本优化

    • 预加载常用Lua脚本
    • 减少脚本复杂度
  3. 批量操作

    • 对于RedLock等多节点场景,考虑使用管道(pipeline)
  4. 本地缓存

    • 对于频繁使用的锁信息,可考虑本地缓存

6. 替代方案对比

方案优点缺点适用场景
Redis单节点实现简单,性能高单点故障,可靠性较低对可靠性要求不高的场景
Redis集群+RedLock可靠性较高实现复杂,性能较低对可靠性要求高的场景
Zookeeper可靠性高,原生支持临时节点性能较低,依赖Zookeeper强一致性要求的场景
数据库实现无需额外组件性能差,容易成为瓶颈简单场景,并发量低

这个进阶方案提供了生产环境所需的完整功能,包括可重入锁、读写锁、公平锁等高级特性,以及监控、降级等生产级考量。您可以根据实际项目需求选择合适的实现方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/81170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

XML简要介绍

实际上现在的Java Web项目中更多的是基于springboot开发的&#xff0c;所以很少再使用xml去配置项目。所以我们的目的就是尽可能快速的去了解如何读懂和使用xml文件&#xff0c;对于DTD&#xff0c;XMLSchema这类约束的学习可以放松&#xff0c;主要是确保自己知道这里面的大致…

UI自动化测试中,一个完整的断言应所需要考虑的问题

在UI自动化测试中,一个完整的断言应全面覆盖用户界面(UI)的功能性、交互性和视觉正确性。以下是断言需要包含的核心内容及详细说明: 一、基础元素验证 存在性断言 验证元素存在于DOM中示例代码(Python + Selenium):assert driver.find_element(By.ID, "submit_btn&…

[Java][Leetcode middle] 238. 除自身以外数组的乘积

第一个想法是&#xff1a; 想求出所有元素乘积&#xff0c;然后除以i对应的元素本书&#xff1b;这个想法是完全错误的&#xff1a; nums[I] 可能有0题目要求了不能用除法 第二个想法是&#xff1a; 其实写之前就知道会超时&#xff0c;但是我什么都做不到啊&#xff01; 双…

5.16本日总结

一、英语 背诵list30&#xff0c;复习list1 二、数学 学习14讲部分内容&#xff0c;订正30讲13讲题目 三、408 学习计网5.3知识点&#xff0c;完成5.1&#xff0c;5.2题目并订正 四、总结 高数对于基本定义概念类题目掌握不好&#xff0c;做题时往往不会下手&#xff0c…

深度学习---常用优化器

优化器一&#xff1a;Adam&#xff08;Adaptive Moment Estimation&#xff09; 一、适用场景总结&#xff08;实践导向&#xff09; 场景是否推荐用 Adam说明小模型训练&#xff08;如 MLP、CNN&#xff09;✅✅✅稳定、无需复杂调参&#xff0c;适合快速实验初学者使用或结构…

SparkSQL 连接 MySQL 并添加新数据:实战指南

SparkSQL 连接 MySQL 并添加新数据&#xff1a;实战指南 在大数据处理中&#xff0c;SparkSQL 作为 Apache Spark 的重要组件&#xff0c;能够方便地与外部数据源进行交互。MySQL 作为广泛使用的关系型数据库&#xff0c;与 SparkSQL 的结合可以充分发挥两者的优势。本文将详细…

基于对抗性后训练的快速文本到音频生成:stable-audio-open-small 模型论文速读

Fast Text-to-Audio Generation with Adversarial Post-Training 论文解析 一、引言与背景 文本到音频系统的局限性&#xff1a;当前文本到音频生成系统性能虽佳&#xff0c;但推理速度慢&#xff08;需数秒至数分钟&#xff09;&#xff0c;限制了其在创意领域的应用。 研究…

AI画图Stable Diffusion web UI学习笔记(中)

本文记录讲解AI画图工具Stable Diffusion web UI的部分基本使用方法&#xff0c;以便进行学习。AI画图Stable Diffusion web UI学习笔记分为上、中、下三篇文章。 我在 AI画图Stable Diffusion web UI学习笔记&#xff08;上&#xff09;_webui-CSDN博客 这篇文章中介绍了Stabl…

安全与智能的双向奔赴,安恒信息先行一步

人类文明发展的长河中&#xff0c;每一次技术变革都重新书写了安全的定义。 从蒸汽机的轰鸣到电力的普及&#xff0c;从互联网的诞生到人工智能的崛起&#xff0c;技术创新与变革从未停止对于安全的挑战。今天&#xff0c;我们又站在一个关键的历史节点&#xff1a;AI大模型的…

【Reality Capture 】02:Reality Capture1.5中文版软件设置与介绍

文章目录 一、如何设置中文二、如何设置界面分区三、如何切换二三维窗口四、工具栏有多个视图选项卡RealityCapture是虚幻引擎旗下一款三维建模软件,跟我们常用的三维建模软件一样,可以从图像或激光扫描中创建实景三维模型和正射影像等产品。可用于建筑、测绘、游戏和视觉特效…

真题卷001——算法备赛

蓝桥杯2024年C/CB组国赛卷 1.合法密码 问题描述 小蓝正在开发自己的OJ网站。他要求用户的密码必须符合一下条件&#xff1a; 长度大于等于8小于等于16必须包含至少一个数字字符和至少一个符号字符 请计算一下字符串&#xff0c;有多少个子串可以当作合法密码。字符串为&am…

17.three官方示例+编辑器+AI快速学习webgl_buffergeometry_lines

本实例主要讲解内容 这个Three.js示例展示了如何使用BufferGeometry创建大量线段&#xff0c;并通过**变形目标(Morph Targets)**实现动态变形效果。通过随机生成的点云数据&#xff0c;结合顶点颜色和变形动画&#xff0c;创建出一个视觉效果丰富的3D线条场景。 核心技术包括…

InfluxDB 2.7 连续查询实战指南:Task 替代方案详解

InfluxDB 2.7 引入了 Task 功能&#xff0c;作为连续查询&#xff08;CQ&#xff09;的现代替代方案。本文详细介绍了如何使用 Task 实现传统 CQ 的功能&#xff0c;包括语法解析、示例代码、参数对比以及典型应用场景。通过实际案例和最佳实践&#xff0c;帮助开发者高效迁移并…

Pytorch张量和损失函数

文章目录 张量张量类型张量例子使用概率分布创建张量正态分布创建张量 (torch.normal)正态分布创建张量示例标准正态分布创建张量标准正态分布创建张量示例均匀分布创建张量均匀分布创建张量示例 激活函数常见激活函数 损失函数(Pytorch API)L1范数损失函数均方误差损失函数交叉…

大模型在数据分析领域的研究综述

大模型在业务指标拆解中的应用场景与方法研究 随着人工智能技术的快速发展&#xff0c;大模型&#xff08;Large Language Models, LLMs&#xff09;在数据分析领域的应用日益广泛。尤其是在业务指标拆解这一复杂任务中&#xff0c;大模型展现了其独特的价值和潜力。通过对多维…

JAVA:ResponseBodyEmitter 实现异步流式推送的技术指南

1、简述 在许多场景下,我们希望后端能够以流式、实时的方式推送数据给前端,比如消息通知、日志实时展示、进度条更新等。Spring Boot 提供了 ResponseBodyEmitter 机制,可以让我们在 Controller 中异步地推送数据,从而实现实时流式输出。 样例代码:https://gitee.com/lh…

Spring Boot循环依赖的陷阱与解决方案:如何打破“Bean创建死循环”?

引言 在Spring Boot开发中&#xff0c;你是否遇到过这样的错误信息&#xff1f; The dependencies of some of the beans in the application context form a cycle 这表示你的应用出现了循环依赖。尽管Spring框架通过巧妙的机制解决了部分循环依赖问题&#xff0c;但在实际开…

如何阅读、学习 Tcc (Tiny C Compiler) 源代码?如何解析 Tcc 源代码?

阅读和解析 TCC&#xff08;Tiny C Compiler&#xff09; 的源代码需要对编译器的基本工作原理和代码结构有一定的了解。以下是分步骤的指南&#xff0c;帮助你更高效地学习和理解 TCC 的源代码&#xff1a; 1. 前置知识准备 C 语言基础&#xff1a;TCC 是用 C 语言编写的&…

Java Set系列集合详解:HashSet、LinkedHashSet、TreeSet底层原理与使用场景

Java Set系列集合详解&#xff1a;HashSet、LinkedHashSet、TreeSet底层原理与使用场景 一、Set系列集合概述 1. 核心特点 无序性&#xff1a;存取顺序不一致&#xff08;LinkedHashSet除外&#xff09;。唯一性&#xff1a;元素不重复。无索引&#xff1a;无法通过索引直接访…

解决 CentOS 7 镜像源无法访问的问题

在国内使用 CentOS 系统时&#xff0c;经常会遇到镜像源无法访问或者下载速度慢的问题。尤其是默认的 CentOS 镜像源通常是国外的&#xff0c;如果你的网络环境无法直接访问国外服务器&#xff0c;就会出现无法下载包的情况。本文将介绍如何修改 CentOS 7 的镜像源为国内镜像源…