【Zookeeper】两种基于原生zk客户端的分布式锁的实现

基于zk的分布式锁的实现主要依赖zk节点的原子性,可以基于原生zk来自己实现分布式锁,更多的是基于Curator这个框架来直接使用基于zk的分布式锁[1]。这里我们仅仅讨论基于原生zk客户端依赖自己实现的zk分布式锁。

原生zk客户端中的一些调用如getChildren方法,可以是同步返回,也可以通过实现AsyncCallback的内部接口来重写异步回调处理逻辑。这里我们举例同步和异步两种方式的实现。

同步实现[1],这篇文章中缺少了关于"Watcher关注的前面节点状态改变后CountDown"的逻辑,即缺少了Watcher的回调。这里我补上了回调并做了一些调整,代码如下:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class SyncZkLock implements Watcher {// zookeeper server 列表private String connectString ="192.168.1.128:2181,192.168.1.129:2181,192.168.1.130:2181";// 超时时间private int sessionTimeout = 2000;private ZooKeeper zk;private String rootNode = "locks";private String subNode = "seq-";// 当前 client 等待的子节点private String waitPath;// ZooKeeper 连接等待private CountDownLatch connectLatch = new CountDownLatch(1);// ZooKeeper 节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// 当前 client 创建的子节点private String currentNode;// 和 zk 服务建立连接,并创建根节点public SyncZkLock() throws IOException, InterruptedException, KeeperException {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程if (event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// 发生了 waitPath 的删除事件if (event.getType() == Event.EventType.NodeDeleted &&event.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待连接建立connectLatch.await();//获取根节点状态Stat stat = zk.exists("/" + rootNode, false);//如果根节点不存在,则创建根节点,根节点类型为永久节点if (stat == null) {System.out.println("根节点不存在");zk.create("/" + rootNode, new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 加锁方法public void zkLock() {try {//在根节点下创建临时顺序节点,返回值为创建的节点路径currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);checkAndLockOrAwait(false);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}// 解锁方法public void zkUnlock() {try {zk.delete(this.currentNode, -1);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}//watch被触发@Overridepublic void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:checkAndLockOrAwait(true);break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}//查看当前节点状态,Lock结束或者添加Watcher并等待private void checkAndLockOrAwait(boolean flag) {try {// 注意, 没有必要监听"/locks"的子节点的变化情况List<String> childrenNodes = zk.getChildren("/" + rootNode, false);// 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁if (childrenNodes.size() == 1) {return;} else {//对根节点下的所有临时顺序节点进行从小到大排序Collections.sort(childrenNodes);//当前节点名称String thisNode = currentNode.substring(("/" + rootNode + "/").length());//获取当前节点的位置int index = childrenNodes.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");} else if (index == 0) {//刚创建时flag为false,不需要countDown。//watch触发时flag为true,需要countDown。if (flag){waitLatch.countDown();}// index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁return;} else {// 获得排名比 currentNode 前 1 位的节点this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法zk.getData(waitPath, true, new Stat());//进入等待锁状态waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}

异步的实现,代码如下:

public class AsyncZkLock implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {private ZooKeeper zk ;private String threadName;private CountDownLatch cc = new CountDownLatch(1);private String pathName;private final String ctx = "zk_lock";public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}public void tryLock(){try {System.out.println(threadName + "  create....");zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this, ctx);cc.await();} catch (InterruptedException e) {e.printStackTrace();}}public void unLock(){try {zk.delete(pathName,-1);System.out.println(threadName + " over work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//给前一个节点加的Watcher被触发的回调@Overridepublic void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted://这个getChildren是个异步方法,通过重写AsyncCallback.Children2Callback的processResult方法,处理回调zk.getChildren("/",false,this ,ctx);break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}//string callback//zk.create方法的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, String name) {if(name != null ){System.out.println(threadName  +"  create node : " +  name );pathName =  name ;zk.getChildren("/",false,this , ctx);}}//getChildren  call back//zk.getChildren方法的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {Collections.sort(children);int i = children.indexOf(pathName.substring(1));//是不是第一个if(i == 0){//yesSystem.out.println(threadName +" i am first....");try {zk.setData("/",threadName.getBytes(),-1);cc.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}else{//no//监控前面节点,创建监控前面节点的Watcherzk.exists("/"+children.get(i-1),this,this, ctx);}}//statCallback//zk.exists的异步回调@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {//这里默认添加watch成功,没有做失败的处理。//假设有依次A B C D E ,C取消了,D接受到回调,取到了children列表 A B D E,//但是此时B也取消了,而D此时给前面节点B添加watch,会出现问题,//因此这里如果添加失败,应该重新获取children列表,// 依靠getChildren的回调逻辑:如果是第一个就结束,不是第一个,就找到前一个节点并给前一个添加监控//来重新添加watch}
}

参考文章:
[1],Zookeeper + Curator实现分布式锁

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/861136.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法课程笔记——蓝桥云课第25次云课

算法课程笔记——蓝桥云课第25次云课

DDD学习笔记二

模型的要素——用例、视图和构造块 模型的构建步骤 1&#xff09;从用例场景开始&#xff0c;给模型输入概念、属性、术语。 2&#xff09;构建静态领域模型&#xff08;类图&#xff09;&#xff0c;发现领域概念和对象属性。 3&#xff09;构建动态领域模型&#xff08;时序图…

Redis 高速性能揭秘:核心原因解析

1. 数据结构设计 Redis 的高性能很大程度上归功于其内部精心设计的数据结构。Redis 支持五种基本数据类型&#xff1a;字符串&#xff08;String&#xff09;、列表&#xff08;List&#xff09;、集合&#xff08;Set&#xff09;、有序集合&#xff08;Sorted Set&#xff0…

Java中的数据结构选择指南

Java中的数据结构选择指南 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们来探讨在Java中如何选择适合的数据结构以提高代码效率和性能。 1. 简介 在…

MySQL中ALTER DATABASE语句的使用

ALTER DATABASE 是一个数据库管理命令&#xff0c;主要用于修改或更改已存在数据库的各种属性和配置。 ALTER {DATABASE | SCHEMA} [db_name]alter_option ... ALTER {DATABASE | SCHEMA} db_nameUPGRADE DATA DIRECTORY NAMEalter_option: {[DEFAULT] CHARACTER SET [] chars…

rollup-plugin-visualizer 打包体积分析插件(vue+vite)

安装&#xff1a; npm install rollup-plugin-visualizer基本用法&#xff1a; vite.config.js import { visualizer } from rollup-plugin-visualizermodule.exports {plugins: [visualizer({open: true, // 注意这里要设置为true&#xff0c;否则无效 gzipSize: true, /…

在数字化转型中,数字孪生技术的作用和价值几何?

引言&#xff1a;随着全球化和市场竞争的加剧&#xff0c;企业需要通过数字化转型来提高生产效率、优化产品质量、降低成本&#xff0c;以增强自身竞争力。企业需要通过数字化转型更好地理解客户需求&#xff0c;提供个性化、定制化的产品和服务&#xff0c;从而满足客户的多样…

无人门店社区拼团小程序系统源码

​打造便捷购物新体验 &#x1f6d2; 引言&#xff1a;社区购物新趋势 随着科技的飞速发展&#xff0c;无人门店和社区拼团已经成为购物的新趋势。而结合这两者的“无人门店社区拼团微信小程序”更是为我们带来了前所未有的便捷购物体验。无需排队、无需现金交易&#xff0c;只…

平面点云格网过程及可视化介绍(python)

1、背景介绍 实际人工构造物中&#xff0c;很多物体表面为平面结构&#xff0c;因此将点云投影在二维平面上进行处理&#xff0c;如进行点云面积计算、点云边缘提取等。 具体案例可以参考博客&#xff1a;详解基于格网法统计平面点云面积_点云格网法计算xy投影面积-CSDN博客、点…

FTP服务器的错误码和异常处理介绍

在FTP服务器中&#xff0c;常见的错误码包括但不限于&#xff1a; 1、421 服务不可用&#xff1a; 原因&#xff1a;服务器无法接受新的连接&#xff0c;可能是因为达到了连接数限制或者服务器正在执行重启操作。 处理&#xff1a;等待一段时间后重试连接&#xff0c;或联系服务…

chatGPT是什么?到底用了什么技术呢?未来apple intelligence会用chatgpt的大模型?

本文尽可能精简的讲解openai的chatgpt 文章目录 前言一、chatgpt是什么&#xff1f;1. 基础架构2. 训练过程3. 应用场景4. 技术特点5. 局限性 二、树形图ChatGPT 大致架构 总结 前言 随着人工智能的不断发展&#xff0c;Ai对话工具的使用也越来越广泛。由国外openai推出的chatg…

【高考志愿】计算机

目录 一、专业概述 二、就业方向 三、选择建议 四、注意事项 高考志愿选择计算机专业&#xff0c;无疑是一个充满挑战与机遇的决策。这个专业以其广泛的应用领域、前沿的技术研究和可观的就业前景&#xff0c;吸引了无数考生的目光。 一、专业概述 计算机专业是一门以计算…

Keka for Mac:轻量级压缩解压神器

Keka for Mac是一款专为Mac用户打造的轻量级压缩解压软件&#xff0c;凭借其强大的功能和简洁易用的界面&#xff0c;赢得了众多用户的喜爱。无论是日常办公还是学习娱乐&#xff0c;Keka都能为您提供高效、安全的文件压缩和解压体验。 Keka for Mac v1.4.2中文版下载 产品特点…

Promise入门详解

文章目录 Promise 的介绍和优点&#xff08;为什么需要 Promise&#xff1f;&#xff09;Promise 的基本使用Promise 的状态和回调函数Promise 对象的 3 种状态 Promise 的回调函数Promise的状态图&#xff1a; new Promise() 是同步代码Promise 封装定时器Promise 封装 Ajax 请…

同步时钟系统为何能成为机场时间管理的好伙伴?

在机场这个分秒必争的环境中&#xff0c;精准的时间管理至关重要。同步时钟系统的出现&#xff0c;成为了机场时间管理的得力助手&#xff0c;为机场的高效运行和服务质量的提升发挥了关键作用。 一、同步时钟系统简介 同步时钟系统是一种通过网络技术实现时间同步的高精度计时…

给前端小白的11个建议(少走弯路)

作为一个编程4年的的前端工程师&#xff0c;一路走来踩过许多坑。希望我的经验能让你少踩些坑&#xff0c;在编程的路上走的更顺些&#xff01; 1. 禁用var声明 只使用const或let声明变量。并且首选const&#xff0c;当一个变量需要重新赋值时&#xff0c;才使用let。并且在创…

队列与循环队列

目录 1. 前言&#xff1a; 2. 队列 2.1 队列的概念 2.2 队列的实现 2.3 队列的声明 2.4 队列的初始化 2.5 队列的入队 2.6 队列的出队 2.7 队列获取队头元素 2.8 队列获取队尾元素 2.9 队列获取有效数据个数 2.10 队列判断是否为空 2.11 打印队列 2.12 销毁队列 …

RK3568技术笔记十七 让Linux支持GPIOLIB

在 Linux 系统中&#xff0c;为了操作和管理 RK3568 的 GPIO 引脚&#xff0c;需要使用 GPIOLIB 这一子系统。关于在 RK3568 上使用 GPIOLIB 的用法如下&#xff1a; 1. 硬件平台初始化 首先&#xff0c;在使用 GPIOLIB 之前&#xff0c;需要确保 RK3568 的硬件平台初始化正确…

大厂面试经验分享,小白如何在面试中脱颖而出

前言 毕业季&#xff0c;对于每一位即将步入社会的学子来说&#xff0c;都是一个充满挑战和机遇的时刻。作为我的一位好朋友也是好学长&#xff0c;他刚刚在一家顶尖科技公司斩获了他梦寐以求的职位。他深知求职路上的艰辛&#xff0c;因此打算把自己的经验分享给大家&#xf…

【GitOps】使用Google工具JIB实现本地无需安装容器推送镜像,加速SpringCloud项目开发

文章目录 一、效果展示二、简介三、安装Jib插件1、区分环境2、安装插件一、效果展示 本地是window系统,无docker环境,没有任何runtime,使用jib工具打包镜像并推送完成,用时20秒 二、简介 Jib 是 Google 开发的一款开源工具,旨在帮助 Java 开发者更高效地将 Java 应用程…