Curator实现Zookeeper分布式锁
1、介绍
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes。
2、锁分类
InterProcessLock
实现类 | 特征 |
---|---|
InterprocessMutex | 互斥锁 可重入、独占锁 |
InterProcessMultiLock | 可重入锁 多重共享锁 (将多个锁作为单个实体管理的容器) |
InterProcessReadWriteLock | 读写锁 读锁可以重入,写锁互斥。类似ReentrantReadWriteLock |
InterProcessSemaphoreMutex | 共享信号量。不可重入、独占锁 |
InterProcessSemaphoreV2 | 共享信号量 |
相关拓展:
2.1、 可重入锁 (Shared Reentrant Lock)
Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。
2.2、不可重入锁(Shared Lock)
使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。
2.3、可重入读写锁(Shared Reentrant Read Write Lock)
类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。
2.4、信号量(Shared Semaphore)
一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。 调用acquire会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。
2.5、多锁对象(Multi Shared Lock)
Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。
锁内容转载自:在代码世界游走,没几把“锁”防身可不行 | 京东云技术团队
3、使用实例
3.1、引入依赖
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version>
</dependency>
3.2、构建工具类
package com.springbootcli.zookeeper.utils;import lombok.extern.log4j.Log4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;/*** Curator实现分布式锁工具类** @author liangtl* @Date 2023/9/3*/@Log4j
public class CuratorUtils {//定义锁节点的路径private static final String LOCK_PATH = "/zkLock/test";//设置zookeeper连接,多台使用逗号分隔private static final String CONNECT_STRING = "localhost1:2181,localhost2:2181,localhost3:2181";//设置会话存活时间,根据业务灵活指定 单位:毫秒private static final int SESSION_TIME_OUT = 3000;//设置超时时间private static final int CONNECTION_TIME_OUT = 3000;//设置重试次数private static final int RETRY_TIMES = 3;///设置失败重试间隔时间 单位:毫秒private static final int BASE_SLEEP_TIME_MS = 3000;/*** 获取锁对象(可重入锁)** @return 锁对象*/public static InterProcessMutex getMutexLock() {return new InterProcessMutex(getCuratorFramework(), LOCK_PATH);}/*** 对分布式锁进行初始化** @return*/private static CuratorFramework getCuratorFramework() {//重试策略,定义初试时间3s,重试3次ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS , RETRY_TIMES);//初始化客户端CuratorFramework client = CuratorFrameworkFactory.builder()//配置zookeeper连接地址.connectString(CONNECT_STRING)//设置超时时间.sessionTimeoutMs(SESSION_TIME_OUT)//设置连接超时时间.connectionTimeoutMs(CONNECTION_TIME_OUT)//重试策略.retryPolicy(exponentialBackoffRetry).build();//开启连接client.start();log.info("zookeeper初始化完成...");return client;}}
3.3、使用
@Test
void contextLoads() {// 通过工具类初始化锁对象InterProcessMutex lock = CuratorUtils.getMutexLock();try {// 获取锁lock.acquire();log.info(Thread.currentThread().getName() + "获取zk锁成功");} catch (Exception e) {log.info("zk锁获取异常:", e);}/*业务处理......*/try {lock.release();log.info(Thread.currentThread().getName() + "释放zk锁成功");} catch (Exception e) {log.info("zk锁释放异常:", e);}
}