C++负载均衡远程调用学习之订阅功能与发布功能

目录

 

1.lars-DnsV0.1回顾

2.Lars-DnsV0.2-订阅功能的订阅模块分析

3.Lars-DnsV0.2-订阅模块的类的单例创建及方法属性初始化

4.Lars-DnsV0.2-发布功能的实现

5.Lars-DnsV0.2-发布功能的总结

6.Lars-DnsV0.2-订阅流程复习

7.Lars-DnsV0.2-订阅模块的集成

8.Lars-DnsV0.2订阅模块的测试

9.Lars-DnsV0.2订阅模块测试2

10.Lars-DnsV0.2的发布问题bug修复

11.Lars-DnsV0.2订阅发布流程梳理


 

1.lars-DnsV0.1回顾

6) Route订阅模式

### 6.1 订阅模块的设计与实现        

​        订阅模式整体的设计.

> lars_dns/include/subscribe.h

```c
#pragma once

#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"

using namespace __gnu_cxx;


//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;

2.Lars-DnsV0.2-订阅功能的订阅模块分析

class SubscribeList {
public:
    //设计单例
    static void init()  {
        _instance = new SubscribeList();
    }

    static SubscribeList *instance() {
        //保证init方法在这个进程执行中,只执行一次
        pthread_once(&_once, init);
        return _instance;
    }

    //订阅
    void subscribe(uint64_t mod, int fd);
    
    //取消订阅
    void unsubscribe(uint64_t mod, int fd);
    
    //发布
    void publish(std::vector<uint64_t> &change_mods);

    //根据在线用户fd得到需要发布的列表
    void make_publish_map(listen_fd_set &online_fds, 
                          publish_map &need_publish);
    
    
private:
    //设计单例
    SubscribeList();
    SubscribeList(const SubscribeList &);
    const SubscribeList& operator=(const SubscribeList);

    static SubscribeList *_instance;
    static pthread_once_t _once;


    subscribe_map _book_list; //订阅清单
    pthread_mutex_t _book_list_lock;

    publish_map _push_list; //发布清单
    pthread_mutex_t _push_list_lock;
};
```

​        首先`SubscribeList`采用单例设计。这里面定义了两种数据类型

3.Lars-DnsV0.2-订阅模块的类的单例创建及方法属性初始化

``c
//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
```

​        `subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,启动一个fd就代表一个客户端。

​        `publish_map`是即将发布的表,启动这里面是subscribe_map的一个反表,可以是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。

**属性**:

`_book_list`:目前dns已经全部的订阅信息清单。

`_push_list`:目前dns即将发布的客户端及订阅信息清单。

**方法**

`void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和订阅的客户端fd到_book_list中。

`void unsubscribe(uint64_t mod, int fd)`:取消一条订阅数据。

`void publish(std::vector<uint64_t> &change_mods)`: 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。

`void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根据目前在线的订阅用户,得到需要通信的发布订阅列表。

4.Lars-DnsV0.2-发布功能的实现

//根据在线用户fd得到需要发布的列表
void SubscribeList::make_publish_map(
            listen_fd_set &online_fds, 
            publish_map &need_publish)
{
    publish_map::iterator it;

    pthread_mutex_lock(&_push_list_lock);
    //遍历_push_list 找到 online_fds匹配的数据,放到need_publish中
    for (it = _push_list.begin(); it != _push_list.end(); it++)  {
        //it->first 是 fd
        //it->second 是 modid/cmdid
        if (online_fds.find(it->first) != online_fds.end()) {
            //匹配到
            //当前的键值对移动到need_publish中
            need_publish[it->first] = _push_list[it->first];
            //当该组数据从_push_list中删除掉
            _push_list.erase(it);
        }
    }

    pthrea

5.Lars-DnsV0.2-发布功能的总结

//发布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
    //1 将change_mods已经修改的mod->fd 
    //  放到 发布清单_push_list中 
    pthread_mutex_lock(&_book_list_lock);
    pthread_mutex_lock(&_push_list_lock);

    std::vector<uint64_t>::iterator it;

    for (it = change_mods.begin(); it != change_mods.end(); it++) {
        uint64_t mod = *it;
        if (_book_list.find(mod) != _book_list.end()) {
            //将mod下面的fd set集合拷迁移到 _push_list中
            hash_set<int>::iterator fds_it;
            for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
                int fd = *fds_it;
                _push_list[fd].insert(mod);
            }
        }
    }

    pthread_mutex_unlock(&_push_list_lock);
    pthread_mutex_unlock(&_book_list_lock);

    //2 通知各个线程去执行推送任务
    server->thread_poll()->send_task(push_change_task, this);
}
```

​        这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。

6.Lars-DnsV0.2-订阅流程复习

### 6.2 开启订阅

​        那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.



>  lars_dns/src/dns_service.cpp

```c
#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"

tcp_server *server;

using __gnu_cxx::hash_set;

typedef hash_set<uint64_t> client_sub_mod_list;

// ...

//订阅route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
    conn->param = new client_sub_mod_list;
}

//退订route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
    client_sub_mod_list::iterator it;
    client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;

    for (it = sub_list->begin(); it  != sub_list->end(); it++) {
        uint64_t mod = *it;
        SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
    }

    delete sub_list;

    conn->param = NULL;
}

int main(int argc, char **argv)
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("conf/lars_dns.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7778);


    //创建tcp服务器
    server = new tcp_server(&loop, ip.c_str(), port);

    //==========注册链接创建/销毁Hook函数============
    server->set_conn_start(create_subscribe);
    server->set_conn_close(clear_subscribe);
      //============================================

    //注册路由业务
    server->add_msg_router(lars::ID_GetRouteRequest, get_route);

    //开始事件监听    
    printf("lars dns service ....\n");
    loop.event_process();

    return 0;
}
```

​        这里注册了两个链接Hook。`create_subscribe()`和`clear_subscribe()`。

`client_sub_mod_list`为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。

7.Lars-DnsV0.2-订阅模块的集成

## 7) Backend Thread实时监控

​            Backend Thread的后台总业务流程如下:

![11-Lars-dns_BackendThread](./pictures/11-Lars-dns_BackendThread.png)

### 7.1 数据库表相关查询方法实现

​        我们先实现一些基本的数据表达查询方法:

> lars_dns/src/dns_route.cpp

```c
/*
 *  return 0, 表示 加载成功,version没有改变
 *         1, 表示 加载成功,version有改变
 *         -1 表示 加载失败
 * */
int Route::load_version()
{
    //这里面只会有一条数据
    snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");

    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret)
    {
        fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    MYSQL_RES *result = mysql_store_result(&_db_conn);
    if (!result)
    {
        fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    long line_num = mysql_num_rows(result);
    if (line_num == 0)
    {
        fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    MYSQL_ROW row = mysql_fetch_row(result);
    //得到version

    long new_version = atol(row[0]);

    if (new_version == this->_version)
    {
        //加载成功但是没有修改
        return 0;
    }
    this->_version = new_version;
    printf("now route version is %ld\n", this->_version);

    mysql_free_result(result);

    return 1;
}

8.Lars-DnsV0.2订阅模块的测试

//加载RouteChange得到修改的modid/cmdid
//将结果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list) 
{
    //读取当前版本之前的全部修改 
    snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);

    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret)
    {
        fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
        return ;
    }

    MYSQL_RES *result = mysql_store_result(&_db_conn);
    if (!result)
    {
        fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
        return ;
    }

    long lineNum = mysql_num_rows(result);
    if (lineNum == 0)
    {
        fprintf(stderr,  "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
        return ;
    }
    MYSQL_ROW row;
    for (long i = 0;i < lineNum; ++i)
    {
        row = mysql_fetch_row(result);
        int modid = atoi(row[0]);
        int cmdid = atoi(row[1]);
        uint64_t key = (((uint64_t)modid) << 32) + cmdid;
        change_list.push_back(key);
    }
    mysql_free_result(result);    
}

9.Lars-DnsV0.2订阅模块测试2

//将RouteChange
//删除RouteChange的全部修改记录数据,remove_all为全部删除
//否则默认删除当前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
    if (remove_all == false)
    {
        snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
    }
    else
    {
        snprintf(_sql, 1000, "DELETE FROM RouteChange;");
    }
    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret != 0)
    {
        fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
        return ;
    } 

    return;
}
```

这里面提供了基本的对一些表的加载和删除操作:

`load_version()`:加载当前route信息版本号。

`load_route_data()`:加载`RouteData`信息表,到_temp_pointer中。

`swap()`:将__temp_pointer的表数据同步到_data_temp表中.

`load_changes()`:加载RouteChange得到修改的modid/cmdid,将结果放在vector中

`remove_changes()`:清空之前的修改记录。

10.Lars-DnsV0.2的发布问题bug修复

### 7.2 Backend Thread业务流程实现



> lars_dns/src/dns_route.cpp

```c
//周期性后端检查db的route信息的更新变化业务
//backend thread完成
void *check_route_changes(void *args)
{
    int wait_time = 10;//10s自动修改一次,也可以从配置文件读取
    long last_load_time = time(NULL);

    //清空全部的RouteChange
    Route::instance()->remove_changes(true);

    //1 判断是否有修改
    while (true) {
        sleep(1);
        long current_time = time(NULL);

        //1.1 加载RouteVersion得到当前版本号
        int ret = Route::instance()->load_version();
        if (ret == 1) {
            //version改版 有modid/cmdid修改
            //2 如果有修改

            //2.1 将最新的RouteData加载到_temp_pointer中
            if (Route::instance()->load_route_data() == 0) {
                //2.2 更新_temp_pointer数据到_data_pointer map中
                Route::instance()->swap();
                last_load_time = current_time;//更新最后加载时间
            }

            //2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送         
            std::vector<uint64_t> changes;
            Route::instance()->load_changes(changes);

            //推送
            SubscribeList::instance()->publish(changes);

            //2.4 删除当前版本之前的修改记录
            Route::instance()->remove_changes();

        }
        else {
            //3 如果没有修改
            if (current_time - last_load_time >= wait_time) {
                //3.1 超时,加载最新的temp_pointer
                if (Route::instance()->load_route_data() == 0) {
                    //3.2 _temp_pointer数据更新到_data_pointer map中
                    Route::instance()->swap();
                    last_load_time = current_time;
                }
            }
        }
    }

    return NULL;
}
```

11.Lars-DnsV0.2订阅发布流程梳理

完成dns模块的订阅功能测试V0.3

​        我们提供一个修改一个modid/cmdid的sql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。

> lars_dns/test/test_insert_dns_route.sql

```sql
USE lars_dns;

SET @time = UNIX_TIMESTAMP(NOW());

INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;

INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
```



客户端代码:

> lars_dns/test/lars_dns_test1.cpp

```c
#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"

//命令行参数
struct Option
{
    Option():ip(NULL),port(0) {}

    char *ip;
    short port;
};


Option option;

void Usage() {
    printf("Usage: ./lars_dns_test -h ip -p port\n");
}

//解析命令行
void parse_option(int argc, char **argv)
{
    for (int i = 0; i < argc; i++) {
        if (strcmp(argv[i], "-h") == 0) {
            option.ip = argv[i + 1];
        }
        else if (strcmp(argv[i], "-p") == 0) {
            option.port = atoi(argv[i + 1]);
        }
    }

    if ( !option.ip || !option.port ) {
        Usage();
        exit(1);
    }

}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/904582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SurfSense开源程序是NotebookLM / Perplexity / Glean的开源替代品,连接到外部来源,如搜索引擎

​一、软件介绍 文末提供程序和源码下载 虽然 NotebookLM 和 Perplexity 等工具令人印象深刻&#xff0c;并且对于对任何主题/查询进行研究都非常有效&#xff0c;但 SurfSense 通过与你的个人知识库集成来提升这种能力。它是一个高度可定制的 AI 研究代理&#xff0c;连接到外…

基于OpenTelemetry的分布式链路追踪Trace‌实现(PHP篇)

目录 引言一、OpenTelemetry是一套可观测性标准协议二、分布式追踪&#xff08;‌Trace‌&#xff09;是OpenTelemetry的核心功能之一三、OpenTelemetry的架构原理四、OpenTelemetry的分布式追踪&#xff08;‌Trace‌&#xff09;实践1、准备PHP环境2、下载SDK3、编写实例代码…

探索智能体的记忆:类型、策略和应用

AI Agent 中的记忆&#xff1a;类型、策略和应用 记忆实现是使智能体能够保持上下文、从过去的交互中学习并做出明智决策的关键组成部分。与人类记忆非常相似&#xff0c;智能体记忆允许 AI 系统随时间存储、检索和利用信息&#xff0c;从而为用户创造更连贯和个性化的体验。 …

leetcode 2395. Find Subarrays With Equal Sum

题目描述 代码&#xff1a; class Solution { public:bool findSubarrays(vector<int>& nums) {int len nums.size();if(len <2)return false;unordered_set<int> table;int sum 0;for(int i 1;i < len;i){sum nums[i-1]nums[i];if(table.contains(…

Kubernetes(k8s)学习笔记(七)--KubeSphere 最小化安装

前情提要 可视化操作面板对于开发、运维绝对是提升工作效率的一大利器&#xff0c;因此很有必要搭建一套可视化操作来管理Kubernetes。 可视化面板有多种&#xff1a; 1.Kubernetes官方提供的默认面板&#xff1a;dashboard&#xff0c;用处不大&#xff0c;放弃&#xff1b…

MCP连接Agent:AI时代的TCP/IP

介绍 2023年&#xff0c;生成式AI爆发。2024年&#xff0c;智能体&#xff08;Agent&#xff09;接棒成为AI新焦点。2025年&#xff0c;智能体似乎已经要开始爆发了。目前的智能体更像一个“单机App”&#xff1a;彼此不了解、无法通信&#xff0c;更不能协作。类似互联网早期…

交换机工作原理(MAC地址表、VLAN)

目录 一、交换机的基本工作原理 数据帧的转发 MAC地址表的作用 交换机的转发方式 二、VLAN&#xff08;虚拟局域网&#xff09; VLAN的定义 VLAN的作用 VLAN的实现方式 VLAN的帧标记 VLAN的通信 三、交换机与VLAN的结合 四、交换机与VLAN的实际应用场景 交换机是局…

eFish-SBC-RK3576工控板外部RTC测试操作指南

备注&#xff1a; 1&#xff09;测试时一定要接电池&#xff0c;否则外部RTC断电后无法工作导致测试失败&#xff1b; 2&#xff09;如果连接了网络&#xff0c;系统会自动同步NTP时钟&#xff0c;所以需要关闭自动同步时钟。 关闭自动同步NTP时钟方法&#xff1a; 先查看是…

淘宝按图搜索商品(拍立淘)Java 爬虫实战指南

在电商领域&#xff0c;按图搜索商品功能为用户提供了更直观、便捷的购物体验。淘宝的拍立淘功能更是凭借其强大的图像识别技术&#xff0c;成为许多开发者和商家关注的焦点。本文将详细介绍如何利用 Java 爬虫技术实现淘宝按图搜索商品功能&#xff0c;包括注册账号、上传图片…

【Redis】List类型

文章目录 List的特点介绍lpush&#xff0c;lpushx&#xff0c;rpush&#xff0c;rpushx命令lrange命令lpop和rpoplindex命令linsert命令llen命令lrem 命令ltrim命令lset命令阻塞版本的命令blpop和brpop 命令小结list的内部编码List的应用场景 List的特点介绍 列表相当于一个数…

QT:qt5调用打开exe程序并获取调用按钮控件实例2025.5.7

为实现在 VS2015 的 Qt 开发环境下打开外部 exe&#xff0c;列出其界面按钮控件的序号与文本名&#xff0c;然后点击包含特定文本的按钮控件。以下是更新后的代码&#xff1a; #include <QCoreApplication> #include <QProcess> #include <QDebug> #include…

基于Jenkins的DevOps工程实践之Jenkins共享库

文章目录 前言Jenkins共享库结构1、共享库演示2、知识点补充3、实践使用共享库格式化输出日志4、groovy基础语法4.1、 什么是 Groovy&#xff1f;4.2、groovy特点4.3、运行方法4.4、标识符4.5、基本数据类型4.5.1、string类型4.5.2、list类型 4.6、函数使用4.7、正则表达式 5、…

【Qt4】Qt4中实现PDF预览

方案一&#xff1a; 在Qt4中预览PDF文件&#xff0c;你可以使用多种方法&#xff0c;但最常见和简单的方法之一是使用第三方库。Qt本身并没有内置直接支持PDF预览的功能&#xff0c;但你可以通过集成如Poppler、MuPDF等库来实现这一功能。下面我将展示如何使用Poppler库在Qt4中…

php artisan resetPass 执行密码重置失败的原因?php artisan resetPass是什么 如何使用?-优雅草卓伊凡

php artisan resetPass 执行密码重置失败的原因&#xff1f;php artisan resetPass是什么 如何使用&#xff1f;-优雅草卓伊凡 可能的原因 命令不存在&#xff1a;如果你没有正确定义这个命令&#xff0c;Laravel 会报错而不是提示”重置密码失败”用户不存在&#xff1a;’a…

ai说什么是注解,并以angular ts为例

在编程中&#xff0c;注解&#xff08;Annotation&#xff09; 是一种特殊的语法结构&#xff0c;用于为代码添加元数据&#xff08;metadata&#xff09;&#xff0c;从而在不修改代码逻辑的情况下&#xff0c;提供额外的信息或指示编译器、框架、工具如何处理这些代码。注解通…

【MySQL】-- 联合查询

文章目录 1. 简介1.1 为什么要使用联合查询1.2 多表联合查询时MySQL内部是如何进行计算的 2. 内连接2.1 语法2.2 示例 3. 外连接3.1 语法3.2 示例 4. 自连接4.1 应用场景4.2 示例4.3 表连接练习 5. 子查询5.1 语法5.2 单行子查询5.3 多行子查询5.4 多列子查询5.5 在from 子句中…

【多线程】六、基于阻塞队列的生产者消费者模型

文章目录 Ⅰ. 生产者消费者模型的概念Ⅱ. 生产者消费者模型的优点Ⅲ. 基于阻塞队列的生产者消费者模型MakefileBlock_queue.hpptask.hpptest.cpp Ⅳ. 如何理解提高了效率❓❓❓ Ⅰ. 生产者消费者模型的概念 ​ 生产者消费者模型是一种常见的并发模式&#xff0c;用于解决生产者…

【Vue】全局事件总线 TodoList 事件总线

目录 一、 实现所有组件看到x事件 二、 实现$on $off 以及 $emit 总结不易~ 本章节对我有很大的收获&#xff0c; 希望对你也是&#xff01;&#xff01;&#xff01; 本节素材已上传至Gitee&#xff1a;yihaohhh/我爱Vue - Gitee.com 全局事件总线图&#xff1a; 本节素材…

Python编程virtualenv库的简介和使用方法

Python编程virtualenv库的简介和使用方法 virtualenv和conda的区别是什么

MySQL的行级锁锁的到底是什么?

大家好&#xff0c;我是锋哥。今天分享关于【MySQL的行级锁锁的到底是什么?】面试题。希望对大家有帮助&#xff1b; MySQL的行级锁锁的到底是什么? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 MySQL的行级锁是数据库管理系统&#xff08;DBMS&#xff09;的一…