网站建设 北京有什么公司建网站
web/
2025/9/26 4:39:57/
文章来源:
网站建设 北京,有什么公司建网站,好看个人网页模板,优设网logo文章目录 一、使用Zookeeper的意义二、Zookeeper基础1 文件系统2 通知机制3 原生zkclient API存在的问题4 服务配置中心Zookeeper模块 三、Zk类实现Start方法创建节点、get节点值方法 四、框架应用rpc提供端框架rpc调用端#xff08;客户端#xff09;框架 总结 一、使用Zook… 文章目录 一、使用Zookeeper的意义二、Zookeeper基础1 文件系统2 通知机制3 原生zkclient API存在的问题4 服务配置中心Zookeeper模块 三、Zk类实现Start方法创建节点、get节点值方法 四、框架应用rpc提供端框架rpc调用端客户端框架 总结 一、使用Zookeeper的意义
分布式系统存在的问题 为了支持高并发每个客户端都保存了一份服务提供者的列表。但是如果列表有更新想要得到最新的URL列表rpc服务的ip和端口号必须要手动更新配置文件很不方便。 如图所示实例3挂掉了但是列表并没有得到更新。 故需要动态的更新URL列表由此引入Zookeeper服务配置中心。
二、Zookeeper基础
zookeeper是为分布式应用提供一致性协调服务的中间件
本质类似linux的文件系统 调用者真实运行的情况下其实并不知道自已想要调用的函数在哪台机器上不知道ip和端口 所以调用者需要在调用前先去服务配置中心去问一下想调用API所在机器上的ip和端口。 同时他还提供全局分布式锁起到协调控制管理各个分布式节点的功能。
1 文件系统
Zookeeper提供一个多层级的节点命名空间节点称为znode。与文件系统不同的是这些节点都可以设置关联的数据而文件系统中只有文件节点可以存放数据目录节点不能存放数据。 Zookeeper为了保证高吞吐和低延迟在内存中维护了这个树状的目录结构这种特性使得Zookeeper不能用于存放大量的数据每个节点的存放数据上限为1M。
一个节点可以存储1Mb的数据
znode节点操作指令
ls 罗列节点get 查看节点create 创建节点set 修改节点的值delete 删除节点要先删子节点
2 通知机制
client端会对某个znode建立一个watcher事件当该znode发生变化时这些client会收到zk的通知然后client可以根据znode变化来做出业务上的改变等。
zk的watcher机制 客户端通过watcher机制监听zk节点变化 客户端维护的map键是节点的名字值是节点的内容ip端口 通过通知和回调机制由zk主动向客户端汇报节点的变化节点死掉、新节点加入
3 原生zkclient API存在的问题
1、设置监听的watcher是一次性的 2、znode 节点只存储简单的byte字节数组如果想存对象需要转换对象生成字节数组 注意 原生zkclient会自动发送心跳消息维护session源码会在1/3的Timeout时间发送ping心跳。 抓包验证 sudo tcpdump -I lo port 2181 // 抓2181这个端口的所有包
4 服务配置中心Zookeeper模块
所以需要一个项目注册节点中心配置维护session会话相当于检测tcp连接的心跳消息以此来确定链接是否断开。 项目中的由每个服务创建的节点为临时性节点。 总结每个rpc服务器端都会向zookeeper服务注册配置中心 传入 ip port 服务名字客户端进程查询获得ipport
三、Zk类实现
封装的ZkClient客户端类头文件
#pragma onceclass ZkClient
{
public:ZkClient();~ZkClient();void Start(); // zkclient启动连接zkservervoid Create(const char *path, const char *data, int datalen, int state0); // 在zkserver上根据指定的path创建znode节点std::string GetData(const char *path); // 根据参数指定的znode节点路径或者znode节点的值
private:zhandle_t *m_zhandle; // zk的客户端句柄
};构造、析构实现
ZkClient::ZkClient() : m_zhandle(nullptr)
{
}ZkClient::~ZkClient()
{if (m_zhandle ! nullptr){zookeeper_close(m_zhandle); // 关闭句柄释放资源 MySQL_Conn}
}Start方法
首先从配置文件中读取zookeeper客户端ip和port。
std::string host MprpcApplication::GetInstance().GetConfig().Load(zookeeperip);
std::string port MprpcApplication::GetInstance().GetConfig().Load(zookeeperport);
std::string connstr host : port;使用zookeeper_mt的多线程版本。
zk的客户端提供了三个线程 1、API调用线程zookeeper_init直接导致下面两个线程的开辟 2、网络I/O收发线程pthread_create底层为poll且会在1/3的Timeout时间发送ping心跳保持与zkserver的通信 3、watcher回调线程pthread_create当zkclient接收zkserver的响应后zkserver给zkclient通知 zookeeper是异步连接过程需要绑定一个全局回调函数global_watcher新线程连接
m_zhandle zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
之后检查创建的m_zhandle是否为空指针输入(127.0.0.1: 2181, 回调函数, session超时时间30s, null, null, 0)
注
zk的端口号2181。上述代码只是成功创建句柄资源并不代表zkserver的连接成功与否。全局回调函数global_watcher决定是否连接成功。
全局的watcher观察器
// 全局的watcher观察器 zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type,int state, const char *path, void *watcherCtx)
{if (type ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型{if (state ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功{sem_t *sem (sem_t*)zoo_get_context(zh);sem_post(sem);}}
}直到收到state ZOO_CONNECTED_STATE消息才算连接成功。这时sem信号量置1。 也就说sem的值是由全局观察者在连接状态变为已连接时通过调用sem_post()或类似的函数来增加的。
Start方法中等待信号量sem为1连接成功打印信息。
sem_t sem;
sem_init(sem, 0, 0);
zoo_set_context(m_zhandle, sem);sem_wait(sem); // 阻塞直到sem为1
std::cout zookeeper_init success! std::endl;创建节点、get节点值方法
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{char path_buffer[128];int bufferlen sizeof(path_buffer);int flag;// 先判断path表示的znode节点是否存在如果存在就不再重复创建了flag zoo_exists(m_zhandle, path, 0, nullptr);if (ZNONODE flag) // 表示path的znode节点不存在{// 创建指定path的znode节点了flag zoo_create(m_zhandle, path, data, datalen,ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);if (flag ZOK){std::cout znode create success... path: path std::endl;}else{std::cout flag: flag std::endl;std::cout znode create error... path: path std::endl;exit(EXIT_FAILURE);}}
}// 根据指定的path获取znode节点的值
std::string ZkClient::GetData(const char *path)
{char buffer[64];int bufferlen sizeof(buffer);int flag zoo_get(m_zhandle, path, 0, buffer, bufferlen, nullptr);if (flag ! ZOK){std::cout get znode error... path: path std::endl;return ;}else{return buffer;}
}四、框架应用
rpc提供端框架
首先调用Start方法。 之后将当前rpc节点上要发布的服务全部注册到zk上让rpc调用端可以从zk上发现服务。
先创建永久父节点/UserServiceRpc再根据提供端维护的rpc方法map表创建临时子节点/UserServiceRpc/LoginLogin方法…也就是将要发布的服务全部注册到zk上。
// session timeout 30s zkclient 网络I/O线程 1/3 * timeout 时间发送ping消息
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点 method_name为临时性节点
for (auto sp : m_serviceMap)
{// /service_name /UserServiceRpcstd::string service_path / sp.first;zkCli.Create(service_path.c_str(), nullptr, 0);for (auto mp : sp.second.m_methodMap){// /service_name/method_name /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和portstd::string method_path service_path / mp.first;char method_path_data[128] {0};sprintf(method_path_data, %s:%d, ip.c_str(), port);// ZOO_EPHEMERAL表示znode是一个临时性节点zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);}
}rpc调用端客户端框架
首先同样是调用Start方法
ZkClient zkCli;
zkCli.Start();然后CallMethod中通过zk获取ip:port。也就是说通过要调用方法的名称Login在zk的节点中寻找对应的ip和port。
// /UserServiceRpc/Login
std::string method_path / service_name / method_name;
// 127.0.0.1:8000
std::string host_data zkCli.GetData(method_path.c_str());因为读取的地址是host_data 127.0.0.1:8000所以将其分离
if (host_data )
{controller-SetFailed(method_path is not exist!);return;
}
int idx host_data.find(:);
if (idx -1)
{controller-SetFailed(method_path address is invalid!);return;
}
std::string ip host_data.substr(0, idx);
uint16_t port atoi(host_data.substr(idx1, host_data.size()-idx).c_str()); 总结
Zookeeper功能如下
master节点选举, 主节点down掉后, 从节点就会接手工作, 并且保证这个节点是唯一的,这也就是所谓首脑模式,从而保证我们集群是高可用的统一配置文件管理, 即只需要部署一台服务器, 则可以把相同的配置文件同步更新到其他所有服务器, 此操作在云计算中用的特别多(例如修改了redis统一配置)数据发布与订阅, 类似消息队列MQ分布式锁,分布式环境中不同进程之间争夺资源,类似于多进程中的锁集群管理, 保证集群中数据的强一致性
服务配置中心用法
每个rpc服务器端都会向zookeeper服务注册配置中心 传入 ip port 服务名字 客户端进程查询获得ipport
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/81299.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!