长春火车站到龙嘉机场高铁时刻表企业信息化管理系统
web/
2025/9/26 11:28:39/
文章来源:
长春火车站到龙嘉机场高铁时刻表,企业信息化管理系统,wordpress创建论坛,物流网站开发公司RedLock底层源码分析
一、Redlock红锁算法
https://redis.io/docs/manual/patterns/distributed-locks/官网说明 1、为什么要学习这个#xff1f;怎么产生的#xff1f;
一个很直接的问题#xff0c;当我使用redis锁的那台机器挂了#xff0c;出现了单点故障了#…RedLock底层源码分析
一、Redlock红锁算法
https://redis.io/docs/manual/patterns/distributed-locks/官网说明 1、为什么要学习这个怎么产生的
一个很直接的问题当我使用redis锁的那台机器挂了出现了单点故障了程序该何去何从
官网上的说明 再翻译一下就是客户端A获取到了master中的锁了在从节点slave同步master之前master挂了这个时候slave就会从机上位成为master但是它就没有客户端A获取的那个锁此时客户端B过来了一看没有锁直接获取一个把锁加上这样AB加的就是同一把锁了一锁多写要是A完成了自己的业务把锁给删除了B完成业务之后一看我tm锁没了。
我们加的是排他独占锁同一时间只能有一个建redis锁成功并持有锁严禁出现2个及以上的线程拿到锁这是危险的操作。
2、Redlock算法设计理念 解释这个方案解决了数据不一致的问题直接舍弃了集群或者哨兵的模式只使用master官方建议使用5个案例中使用3台机器演示不存在主从关系大家都是master。
2.1、容错公式
需要奇数个机器N2X1N是最终需要的机器X是容错的机器数
什么是容错
失败了多少个机器实例后我还是可以容忍的所谓的容忍就是数据一致性还是可以Ok的CP数据一致性还是可以满足。
为啥是奇数
因为可以用最少的机器最多的产出效果。
举个例子
使用奇数容错机器是1个则最终需要2*113个实例使用偶数容错机器是1个则最终需要2*214个实例
3、落地实现Redisson
官网
官网的例子
RLock lock redisson.getLock(myLock);// traditional lock method
lock.lock();// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}多重锁定
RLock lock1 redisson1.getLock(lock1);
RLock lock2 redisson2.getLock(lock2);
RLock lock3 redisson3.getLock(lock3);RLock multiLock anyRedisson.getMultiLock(lock1, lock2, lock3);// traditional lock method
multiLock.lock();// or acquire lock and automatically unlock it after 10 seconds
multiLock.lock(10, TimeUnit.SECONDS);// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {multiLock.unlock();}
}二、使用Redisson进行编码改造上一节的案例
Redisson官网https://redisson.org/
怎么使用官网查看quick start 先导依赖
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.24.3/version
/dependency 然后写RedisConfig
Bean
public Redisson redisson()
{Config config new Config();config.useSingleServer().setAddress(redis://192.168.111.27:6379).setDatabase(0).setPassword(123456);return (Redisson) Redisson.create(config);
}再去写service
//使用Redisson对应的官网推荐的RedLock算法实现类
Autowired
private Redisson redisson;
public String saleByRedisson() {String retMessage ;RLock redissonLock redisson.getLock(redisLock);redissonLock.lock();try {// 查询库存信息String result stringRedisTemplate.opsForValue().get(inventory001);// 判断库存够不够如果为空则设置为0有则转化为integgerInteger inventoryNumber result null ? 0 : Integer.parseInt(result);if (inventoryNumber 0){// 减扣库存每次减少一个stringRedisTemplate.opsForValue().set(inventory001,String.valueOf(--inventoryNumber));retMessage 成功卖出一个商品库存剩余inventoryNumber;System.out.println(retMessage\t服务端口号port);}else {retMessage 商品卖完了去别处看看吧0-0ll;}} finally {redissonLock.unlock();}return retMessage\t服务端口号port;
}新加一个controller
ApiOperation(扣减库存saleByRedisson一次卖一个)
GetMapping(value /inventory/saleByRedisson)
public String saleByRedisson()
{return inventoryService.saleByRedisson();
}好了测试一下单机高并发测试 一切正常但是真的就是一切正常吗
当然不是一帆风顺目前会造成解锁的时候找不着锁也就是这个线程的锁被别人删除了所以在释放锁时要进行判断只能删除自己的锁。
但是又区别于上一次我们的判断上一次是因为A线程的业务没有干完锁过期了B线程拿到了锁但是A的活又干完了A以为这是自己的锁就删除了然后B来删除的时候没有锁了。
我们这次使用的是Redisson中redissonLock正在持有锁并且正是该线程持有才能释放锁。这里的判断和解锁是原子性的底层帮我们做了 InvrntoryService进行部分修改
finally {//只能删除自己的进行判断Redisson中redissonLock正在持有锁并且正是该线程持有才能释放锁if (redissonLock.isLocked() redissonLock.isHeldByCurrentThread()){redissonLock.unlock();}
}再次上测试 没有问题
三、Redisdon源码解析
分布式锁的要求加锁、可重入、续期、解锁
守护线程的“续命”
额外起一个线程定期检查线程是否还持有锁如果有则延长过期时间。
Redisson里面就实现了这个方案使用“看门狗”定期检查每1/3的锁时间检查1次如果线程还持有锁则刷新过期时间。
在获取锁成功后给锁加一个watchdog, watchdog会起一个定时任务在锁没有被释放且快要过期的时候会续期。 找到RedissonLock源码文件 这个过期时间就是30秒 所以通过redisson创建的锁默认过期时间就是30秒
再来看一下续期的源码包括尝试加锁加锁后的看门狗缓存续期操作 点进tryLockInnerAsync方法看到加锁的源代码
T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {this.internalLockLeaseTime unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}可以看到源代码为了保证原子性也是用的Lua脚本分析一下这个lua脚本。
if (redis.call(exists, KEYS[1]) 0) then 先看存不存在这个锁redis.call(hincrby, KEYS[1], ARGV[2], 1); 不存在就hincrby设置它的值redis.call(pexpire, KEYS[1], ARGV[1]); 并且加过期时间return nil;
end;
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then 如果锁已经存在而且是当前线程的redis.call(hincrby, KEYS[1], ARGV[2], 1); 就进行可重入操作就是通过hincrby加1redis.call(pexpire, KEYS[1], ARGV[1]); 再续个期return nil;
end;
return redis.call(pttl, KEYS[1]);如果锁存在而且不是当前的线程所持有就返回这个所的ttl这时加锁失败watch dog自动延期机制 自动续期lua脚本分析
protected RFutureBoolean renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;, Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then 判断是否存在redis.call(pexpire, KEYS[1], ARGV[1]); 存在就续期return 1;
end;
return 0;unlock脚本分析
protected RFutureBoolean unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}提取lua脚本
if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;等于0就是没有这把锁不是同一个线程返回nil
end;
local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); 这里自定义一个变量代表先释放一次
if (counter 0) then 释放一次之后counter还大于0代表它是可重入锁需要刷新过期时间redis.call(pexpire, KEYS[1], ARGV[2]); return 0;
else 如果剩余次数小于0就删除key并发布锁释放的订阅消息解锁成功。redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1;
end;
return nil;多机案例
这里的多机不是集群也不是哨兵模式而是多个主节点每一个都是master。
RedLock算法实现了多redis实例的情况相对于单节点来说其优点在于防止因单点故障造成整个服务停止运行的事故发生且在多节点中锁的设计以及多节点同时崩溃等各种意外情况都有自己的独特设计方法。
Redisson分布式锁还支持MultiLock多重锁机制可以将多个锁合并成一个大锁对一个大锁进行统一的申请加锁以及释放锁。
最低保证分布式锁的有效性以及安全性的要求
互斥任何时候都只能有一个client获取锁释放死锁即使锁定资源的服务器崩溃或者分区仍然可以释放锁容错性只要多数redis节点一半以上在使用client就可以获取和释放锁
网上讲的基于故障转移实现的redis主从无法真正实现Redlock: 因为redis在进行主从复制时是异步完成的比如在clientA获取锁后主redis复制数据到从redis过程中崩溃了导致没有复制到从redis中然后从redis选举出一个升级为主redis,造成新的主redis没有clientA设置的锁这是clientB尝试获取锁并且能够成功获取锁导致互斥失效;
但是现在去官网找RedLock第8.4节会发现这玩意被弃用了官网推荐去使用RLock或者RFencedLock 现在我们要使用官网中的8.3节的多重锁。 官网示例代码
RLock lock1 redisson1.getLock(lock1);
RLock lock2 redisson2.getLock(lock2);
RLock lock3 redisson3.getLock(lock3);RLock multiLock anyRedisson.getMultiLock(lock1, lock2, lock3);// traditional lock method
multiLock.lock();// or acquire lock and automatically unlock it after 10 seconds
multiLock.lock(10, TimeUnit.SECONDS);// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {multiLock.unlock();}
}开始案例
我们使用docker起3个master。 然后分别启动他们
使用命令
docker exec -it master01 /bin/bash 启动后进去再连接
或者
docker exec -it master01 redis-cli 直接连接启动OK启动完成。
接下来我们再去idea新建一个moderedis_redLock.
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.zm/groupIdartifactIdredis_distributed_lock2/artifactIdversion1.0-SNAPSHOT/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.10/versionrelativePath/ !-- lookup parent from repository --/parentpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetlombok.version1.16.18/lombok.version/propertiesdependencies!--SpringBoot通用依赖模块--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!--SpringBoot与Redis整合依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependency!--swagger2--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency!--redisson--dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.13.4/version/dependency!--通用基础配置boottest/lombok/hutool--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion${lombok.version}/versionoptionaltrue/optional/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.8/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build
/projectapplication.properties
server.port9090
spring.application.nameredLockspring.swagger2.enabledtruespring.redis.database0
spring.redis.password123456
spring.redis.timeout3000
spring.session.redis.flush-modesinglespring.redis.pool.conn-timeout3000
spring.redis.pool.so-timeout3000
spring.redis.pool.size10spring.redis.single.address1192.168.111.27:6381
spring.redis.single.address2192.168.111.27:6382
spring.redis.single.address3192.168.111.27:6383主启动类
SpringBootApplication
public class RedLockApplication9090 {public static void main(String[] args) {SpringApplication.run(RedLockApplication9090.class,args);}
}RedisSingleProperties单机配置类此类中就定义那三台IP地址。
Data
public class RedisSingleProperties {private String address1;private String address2;private String address3;
}RedisPoolProperties池化技术定义一些超时时间和池的大小变量。
Data
public class RedisPoolProperties {private int maxIdle;private int minIdle;private int maxActive;private int maxWait;private int connTimeout 10000;private int soTimeout;//池的大小private int size;
}RedisProperties读取application配置文件顺便也将池配置和单机信息配置也注入。
ConfigurationProperties(prefix spring.redis,ignoreInvalidFields false)
Data
public class RedisProperties {private int database;//等待节点回复命令的时间该时间从命令发送成功时开始计时private int timout 3000;private String password;private String mode;//池配置private RedisPoolProperties pool;//单机信息配置private RedisSingleProperties single;}CacheConfiguration主配置文件创建redissonClient实例
Configuration
EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {AutowiredRedisProperties redisProperties;BeanRedissonClient redissonClient1(){Config config new Config();String address1 redisProperties.getSingle().getAddress1();address1 address1.startsWith(redis://) ? address1 : redis:// address1;SingleServerConfig serverConfig config.useSingleServer().setAddress(address1).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtil.isNotBlank(redisProperties.getPassword())){serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}BeanRedissonClient redissonClient2(){Config config new Config();String address2 redisProperties.getSingle().getAddress2();address2 address2.startsWith(redis://) ? address2 : redis:// address2;SingleServerConfig serverConfig config.useSingleServer().setAddress(address2).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtil.isNotBlank(redisProperties.getPassword())){serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}BeanRedissonClient redissonClient3(){Config config new Config();String address3 redisProperties.getSingle().getAddress3();address3 address3.startsWith(redis://) ? address3 : redis:// address3;SingleServerConfig serverConfig config.useSingleServer().setAddress(address3).setTimeout(redisProperties.getPool().getConnTimeout()).setConnectionPoolSize(redisProperties.getPool().getSize()).setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());if (StringUtil.isNotBlank(redisProperties.getPassword())){serverConfig.setPassword(redisProperties.getPassword());}return Redisson.create(config);}}本次就不再写service层了就直接在controller中写逻辑代码了。
RedLockController.java
package com.zm.redLock.controller;import lombok.extern.slf4j.Slf4j;
import org.redisson.RedissonMultiLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.TimeUnit;RestController
Slf4j
public class RedLockController {public static final String CACHE_KEY_REDROCK ATGUIGU_REDLOCK;AutowiredRedissonClient redissonClient1;AutowiredRedissonClient redissonClient2;AutowiredRedissonClient redissonClient3;GetMapping(/getMultiLock)public String getMultiLock(){long threadID Thread.currentThread().getId();RLock lock1 redissonClient1.getLock(CACHE_KEY_REDROCK);RLock lock2 redissonClient2.getLock(CACHE_KEY_REDROCK);RLock lock3 redissonClient3.getLock(CACHE_KEY_REDROCK);RedissonMultiLock redLock new RedissonMultiLock(lock1, lock2, lock3);redLock.lock();try {System.out.println(进入业务逻辑多重锁threadID);//故意停30秒try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println(多重锁业务逻辑结束...);} catch (Exception e) {e.printStackTrace();log.error(multilock exception:{},e.getCause()\te.getMessage());} finally {redLock.unlock();System.out.println(释放锁成功!);}return 多重锁已经完成threadID;}
}启动进行测试刚开始它会一直转圈因为我们暂停进程30秒让它自动续期。
看一下自动续期的效果 可以看到确实续期了30秒这个是第二台master可以看出三个master是同步的。 30秒续期之后的时间已结束。 查看后台 现在我们将其中一台机器手动挂机然后再给它打开它默认的时间是30秒但是马上就跟上了大部队与其他的master进行时间同步。容错性贼强
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/81557.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!