目录
什么是幂等?
解决幂等的常见解决方案:
唯一标识符案例
数据库唯一约束 案例
乐观锁案例
分布式锁(Distributed Locking)
实践精选方案
首先 为什么不直接使用分布式锁呢?
自定义实现幂等组件!
RepeatExecuteLimitAutoConfiguration
repeatExecuteLimitAspect防重复幂等 切面
本地锁
localLockCache是本地锁缓存,可根据锁名和锁类型(公平锁/非公平锁)来获得ReentrantLock的实例
本地锁的作用:
分布式锁
设置幂等标识的作用
注意
什么是幂等?
在web项目中,幂等性同样是一个重要的概念。这主要是因为在网络和分布式系统中,由于网络的不稳定性和其他潜在问题,可能会导致请求被重复发送。如果一个操作不是幂等的,那么重复执行该操作可能会产生不一致的结果或副作用。例如,一个非幂等的操作可能会导致数据被重复添加、更新或删除,从而破坏数据的一致性。
因此,JavaWeb项目需要保证幂等性,主要是为了确保无论请求被发送一次还是多次,系统都能产生相同的结果。这有助于避免由于重复请求导致的数据不一致或其他潜在问题。实现幂等性的方法有很多种,包括但不限于使用数据库唯一索引、乐观锁、分布式锁、令牌等技术。
总的来说,幂等性是JavaWeb项目中一个非常重要的概念,它有助于确保系统的稳定性和数据的一致性。通过实现幂等性,我们可以有效地处理重复请求,并减少由于网络不稳定或其他原因导致的潜在问题。
解决幂等的常见解决方案:
-
唯一标识符(Unique Identifiers): 为每个请求生成一个唯一的标识符(如UUID),并将其作为请求的一部分发送。当接收到请求时,服务器可以检查该标识符是否已处理过。如果已处理,则拒绝或忽略该请求;如果未处理,则处理该请求并记录标识符。
-
数据库唯一约束: 使用数据库的唯一约束(如主键或唯一索引)来确保即使多次尝试插入相同的数据,也只有一条记录会被保存。如果尝试插入重复的数据,数据库会抛出异常,服务器可以捕获这个异常并返回幂等性的响应。
-
乐观锁(Optimistic Locking): 使用版本号或时间戳来检查数据是否已被其他操作修改过。在更新数据时,如果版本号或时间戳与预期的不符,则拒绝更新并返回冲突信息。这样,即使多次尝试更新相同的数据,也只有一次会成功。
-
分布式锁(Distributed Locking): 在分布式系统中,可以使用分布式锁来确保同一时间只有一个节点能够执行某个操作。这可以防止多个节点同时处理相同的请求,从而实现幂等性。
唯一标识符案例
-
使用 setex 命令存储ID并设置过期时间,避免内存泄漏。
-
适用于高并发场景,如支付、订单创建等。
// 生成唯一ID(客户端)
String requestId = UUID.randomUUID().toString();// 服务端校验(基于Redis)
public class IdempotencyService {private Jedis jedis; // Redis客户端public boolean checkRequest(String requestId) {if (jedis.exists(requestId)) {return false; // 已处理,拒绝重复请求}jedis.setex(requestId, 3600, "processed"); // 存储ID并设置过期时间return true;}
}
数据库唯一约束 案例
-
场景:用户注册防重复。
实现思路:数据库表中为关键字段(如用户名、邮箱)添加唯一索引,插入重复数据时抛出异常。
-
数据库自动拒绝重复数据,无需额外代码判断
-
适用于新增操作(如注册、订单号生成)
// JPA实体类定义
@Entity
@Table(name = "users", uniqueConstraints = @UniqueConstraint(columnNames = "email"))
public class User {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String email; // 唯一字段
}// 插入逻辑
try {userRepository.save(user); // 尝试插入
} catch (DataIntegrityViolationException e) {throw new DuplicateKeyException("邮箱已存在"); // 捕获唯一约束异常
}
乐观锁案例
场景:库存扣减防超卖。
实现思路:通过版本号字段控制并发更新,仅当版本匹配时才允许操作。
-
JPA的
@Version
注解自动管理版本号 -
更新失败时抛出
OptimisticLockingFailureException
,需重试或提示用户。
// 实体类添加版本号字段
@Entity
public class Product {@Idprivate Long id;private int stock;@Versionprivate int version; // 乐观锁版本号
}// 更新逻辑(JPA)
@Transactional
public void deductStock(Long productId, int quantity) {Product product = productRepository.findById(productId).orElseThrow();if (product.getStock() >= quantity) {product.setStock(product.getStock() - quantity);productRepository.save(product); // 自动检查版本号} else {throw new InsufficientStockException();}
}
分布式锁(Distributed Locking)
场景:分布式系统中全局资源操作(如优惠券发放)。
实现思路:使用Redisson实现分布式锁,确保同一时刻仅一个节点执行关键逻辑。
-
tryLock
设置超时时间防止死锁 -
适用于跨服务或集群环境下的资源竞争场景
// 基于Redisson的分布式锁
public class CouponService {private RedissonClient redissonClient;public void grantCoupon(String userId) {RLock lock = redissonClient.getLock("COUPON_GRANT_" + userId);try {if (lock.tryLock(0, 10, TimeUnit.SECONDS)) { // 尝试获取锁// 执行业务逻辑(如发放优惠券)grantCouponToUser(userId);}} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
方案 | 适用场景 | 优点 | 缺点 |
唯一标识符 | 高并发请求(如支付) | 实现简单,适合分布式环境 | 需维护ID存储(如Redis) |
数据库唯一约束 | 数据唯一性要求(如注册) | 依赖数据库特性,无需额外逻辑 | 不适用于更新操作 |
乐观锁 | 低冲突更新(如库存扣减) | 无锁竞争,性能较高 | 需处理版本冲突和重试逻辑 |
分布式锁 | 跨服务资源竞争(如秒杀) | 强一致性保证 | 实现复杂,可能影响性能 |
实践精选方案
除了单纯实现幂等,而是要在保证实现幂等的前提下,还要考虑高并发下的高效率执行,不能影响程序的性能和吞吐量
基于上述这些要求,最终选择利用redis来实现,而在对使用redis上,Redisson又是非常优秀的开源中间件,其中的分布式锁是非常的经典,项目中也对分布式锁做了封装,使用起来灵活而方便,而这次幂等组件也是对Redisson基础上进行封装,保证了性能,支持MQ中间件和用户请求的幂等。
首先 为什么不直接使用分布式锁呢?
为什么还要额外设计出幂等组件?首先直接使用分布式锁是可以实现幂等的,当然业务逻辑验证也要做验证,但其实分布式锁会浪费一些性能。
分布式锁的特点是多个请求并发执行,这些请求是来自不同的用户,也就是这些请求虽然要依次等待锁执行,但最终还是要把这些请求都执行完的(执行时间太长超时的异常情况排除),总结起来就是都要获得锁,没有获得锁的请求,也要争取获得锁接着执行。
幂等的特点也是多个请求并发执行,但这些请求是来自同一个用户,也就是说这些请求只要保证第一个请求能执行,其余的请求要直接拒绝掉,总结起来就是只有第一个请求获得锁执行就可以,其余的请求看到已经上了锁,那么就要直接结束掉。
自定义实现幂等组件!
通过定义注解实现哦!
@Target(value = {ElementType.TYPE, ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RepeatExecuteLimit {/*** 业务名称** @return name*/String name() default "";/*** key设置** @return key*/String[] keys();/*** 在多长时间内一直保持幂等,如果不配置则以执行方法为准*/long durationTime() default 0L;/*** 当消息执行已经出发防重复执行的限制时,提示信息*/String message() default "提交频繁,请稍后重试";}
RepeatExecuteLimitAutoConfiguration
public class RepeatExecuteLimitAutoConfiguration {@Bean(LockInfoType.REPEAT_EXECUTE_LIMIT)public LockInfoHandle repeatExecuteLimitHandle(){return new RepeatExecuteLimitLockInfoHandle();}@Beanpublic RepeatExecuteLimitAspect repeatExecuteLimitAspect(LocalLockCache localLockCache,LockInfoHandleFactory lockInfoHandleFactory,ServiceLockFactory serviceLockFactory,RedissonDataHandle redissonDataHandle){return new RepeatExecuteLimitAspect(localLockCache, lockInfoHandleFactory,serviceLockFactory,redissonDataHandle);}
}
RepeatExecuteLimitAutoConfiguration是自动装配类,用于加载需要的对象,repeatExecuteLimitHandle是锁键名处理器、repeatExecuteLimitAspect是幂等切面。
repeatExecuteLimitAspect防重复幂等 切面
@Slf4j
@Aspect
@Order(-11)
@AllArgsConstructor
public class RepeatExecuteLimitAspect {private final LocalLockCache localLockCache;private final LockInfoHandleFactory lockInfoHandleFactory;private final ServiceLockFactory serviceLockFactory;private final RedissonDataHandle redissonDataHandle;@Around("@annotation(repeatLimit)")public Object around(ProceedingJoinPoint joinPoint, RepeatExecuteLimit repeatLimit) throws Throwable {// 指定保持幂等的时间long durationTime = repeatLimit.durationTime();// 提示信息String message = repeatLimit.message();Object obj;// 获取锁信息LockInfoHandle lockInfoHandle = lockInfoHandleFactory.getLockInfoHandle(LockInfoType.REPEAT_EXECUTE_LIMIT);// 获取锁名String lockName = lockInfoHandle.getLockName(joinPoint,repeatLimit.name(), repeatLimit.keys());// 幂等标识String repeatFlagName = PREFIX_NAME + lockName;// 获得幂等标识String flagObject = redissonDataHandle.get(repeatFlagName);//如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}// 获得本地锁ReentrantLock localLock = localLockCache.getLock(lockName,true);// 本地锁尝试获取boolean localLockResult = localLock.tryLock();// 如果本地锁获取失败,说明有其他请求在执行,这次请求直接结束if (!localLockResult) {throw new DaMaiFrameException(message);}try {// 获得分布式锁ServiceLocker lock = serviceLockFactory.getLock(LockType.Fair);// 尝试获取分布式锁boolean result = lock.tryLock(lockName, TimeUnit.SECONDS, 0);// 加锁成功执行if (result) {try{// 再次获取幂等标识flagObject = redissonDataHandle.get(repeatFlagName);// 如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}obj = joinPoint.proceed();if (durationTime > 0) {try {// 业务逻辑执行成功后,设置 指定幂等保持时间 设置请求标识redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}}else{// 获得锁失败,说明有其他请求在执行,这次请求直接结束throw new DaMaiFrameException(message);}}finally {localLock.unlock();}}
}
RepeatExecuteLimitAspect是负责幂等执行的切面也是核心流程。
负责幂等的切面顺序要优先于分布式锁前,所以这里是-11。
这个实践中给我最大的震撼是本地锁、分布式锁、还有本地缓存的使用!
本地锁
ReentrantLock localLock = localLockCache.getLock(lockName,true);
boolean localLockResult = localLock.tryLock();
if (!localLockResult) {throw new DaMaiFrameException(message);
}
localLockCache是本地锁缓存,可根据锁名和锁类型(公平锁/非公平锁)来获得ReentrantLock的实例
public class LocalLockCache {/*** 本地锁缓存* */private Cache<String, ReentrantLock> localLockCache;/*** 本地锁的过期时间(小时单位)* */@Value("${durationTime:2}")private Integer durationTime;@PostConstructpublic void localLockCacheInit(){localLockCache = Caffeine.newBuilder().expireAfterWrite(durationTime, TimeUnit.HOURS).build();}/*** 获得锁,Caffeine的get是线程安全的* */public ReentrantLock getLock(String lockKey,boolean fair){return localLockCache.get(lockKey, key -> new ReentrantLock(fair));}
}
LocalLockCache其实是用Caffeine缓存来保存的锁信息,并可以设置锁实例的保存时间,默认是2小时,这个时间可以根据durationTime
来进行配置,如果时间过大,那么锁的实例就会过多,对项目的内存就会有压力。如果时间过小,那么构建锁的频率就会增加,性能就会受到影响,使用时,可根据业务特点进行灵活配置
Caffeine是基于Java 1.8的高性能本地缓存库,由Guava改进而来,而且在Spring5开始的默认缓存实现就将Caffeine代替原来的Google Guava,官方说明指出,其缓存命中率已经接近最优值。实际上Caffeine这样的本地缓存和ConcurrentMap很像,即支持并发,并且支持O(1)时间复杂度的数据存取。二者的主要区别在于:
-
ConcurrentMap将存储所有存入的数据,直到你显式将其移除;
-
Caffeine将通过给定的配置,自动移除“不常用”的数据,以保持内存的合理占用。
因此,一种更好的理解方式是:Cache是一种带有存储和移除策略的Map。
本地锁的作用:
之所以先使用本地锁去加锁的原因是,可以很大程度上节省分布式锁的资源,虽然分布式锁是利用reids实现的,redis的性能又非常的高,但是它再高,依旧存在网络损耗,而本地锁的操作都是基于内存中,一个是内存中操作,一个是网络操作,前者的效率可是后者的几十倍差距。
如果一秒内有100个请求,服务的实例有5个,那么每个实例就有20个请求,这20个请求就可以靠本地锁来拦截掉,那么到分布式锁那里,就有5个请求来获得锁了,其余的95个请求都可以被提前结束掉。
这是一个经典的思想,优先考虑本地内存操作,经过本地内存操作后,再去操作第三方中间件。
分布式锁
当本地锁获得了锁之后,还要用分布式锁去尝试获得锁,因为本地锁只能保证当前自己的实例范围内能锁住请求,微服务多个实例部署的话,就需要分布式锁了
//加锁成功执行
if (result) {try{//再次获取幂等标识flagObject = redissonDataHandle.get(repeatFlagName);//如果幂等标识的值为success,说明已经有请求在执行了,这次请求直接结束if (SUCCESS_FLAG.equals(flagObject)) {throw new DaMaiFrameException(message);}//执行业务逻辑obj = joinPoint.proceed();if (durationTime > 0) {try {//业务逻辑执行成功 并且 指定了设置幂等保持时间 设置请求标识redissonDataHandle.set(repeatFlagName,SUCCESS_FLAG,durationTime,TimeUnit.SECONDS);}catch (Exception e) {log.error("getBucket error",e);}}return obj;} finally {lock.unlock(lockName);}
}else{//获取锁失败,说明已经有请求在执行了,这次请求直接结束throw new DaMaiFrameException(message);
}
当通过分布式锁工厂获取到客户端实例后,就会尝试去获取分布式锁了,如果加锁失败,说明之前已经有请求获得了锁在执行中没有释放掉,那么这次请求直接结束
如果加锁成功则执行业务逻辑joinPoint.proceed()
-
如果执行业务逻辑成功,如果设置了幂等保持时间,那么设置幂等标识
-
如果执行业务逻辑失败,那么直接释放锁
设置幂等标识的作用
有的小伙伴可能会好奇,为什么要设置幂等标识?直接使用本地锁+分布式锁不就可以实现幂等了吗?为什么多此一举?只使用本地锁+分布式锁的方法确实能实现幂等,但此项目是为了高并发的,考虑的细节要全面。
幂等包括用户请求幂等和MQ消息幂等
要介绍这两种的特点
-
用户请求的幂等特点是用户在短时间内多次的点击,比如说一秒内发出了10次请求,那么就第1次请求正常执行,而剩余9次请求要全部被拦截将请求直接结束掉,特点是短时间内的多次请求
-
MQ消息幂等特点是MQ为了保证消息的可靠性。在有些异常情况确实会重复投递的,比如说 某个服务监听到了消息,接着执行业务逻辑,但在执行过程中,这个服务宕机了,没有给MQ发送消息提交机制,MQ就会认为消息没有消费成功,就会再次投递。但其实这个逻辑都执行成功了,就差给MQ提交确认了。
这就需要保证幂等。而MQ的重试就没有用户多次重复请求那么频繁,可能会1分钟 5分钟 10分钟,这种情况就需要幂等标识,当有了标识后逻辑直接结束。
注意
有的小伙伴刚接触开发,对并发产生的细节问题不太清楚,比如幂等和分布式锁,直接使用就觉得没有问题了,但其实并不是这样,我们来举一个例子,比如说添加用户的操作
-
请求获得锁得到执行
-
开启事务
-
向数据库中添加用户
-
提交事务
这个流程其实是有问题的,比如说第一个请求添加了用户后释放了锁,第二个请求是重复提交的,也添加了用户,这两个用户就是重复的。所以要在第2步后,要有验证用户是否存在的步骤,这个属于业务验证,需要业务来实现,组件只能防止并发重复,并不能防止业务重复。正确的流程:
-
请求获得锁得到执行
-
开启事务
-
查询用户是否已存在
-
向数据库中添加用户
-
提交事务