目录
1.lars-DnsV0.1回顾
2.Lars-DnsV0.2-订阅功能的订阅模块分析
3.Lars-DnsV0.2-订阅模块的类的单例创建及方法属性初始化
4.Lars-DnsV0.2-发布功能的实现
5.Lars-DnsV0.2-发布功能的总结
6.Lars-DnsV0.2-订阅流程复习
7.Lars-DnsV0.2-订阅模块的集成
8.Lars-DnsV0.2订阅模块的测试
9.Lars-DnsV0.2订阅模块测试2
10.Lars-DnsV0.2的发布问题bug修复
11.Lars-DnsV0.2订阅发布流程梳理
1.lars-DnsV0.1回顾
6) Route订阅模式
 
 ### 6.1 订阅模块的设计与实现        
 
         订阅模式整体的设计.
 
 > lars_dns/include/subscribe.h
 
 ```c
 #pragma once
 
 #include <vector>
 #include <pthread.h>
 #include <ext/hash_set>
 #include <ext/hash_map>
 #include "lars_reactor.h"
 #include "lars.pb.h"
 #include "dns_route.h"
 
 using namespace __gnu_cxx;
 
 
 //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
 typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
 
 //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
 typedef hash_map<int, hash_set<uint64_t>> publish_map;
2.Lars-DnsV0.2-订阅功能的订阅模块分析
class SubscribeList {
 public:
     //设计单例
     static void init()  {
         _instance = new SubscribeList();
     }
 
     static SubscribeList *instance() {
         //保证init方法在这个进程执行中,只执行一次
         pthread_once(&_once, init);
         return _instance;
     }
 
     //订阅
     void subscribe(uint64_t mod, int fd);
     
     //取消订阅
     void unsubscribe(uint64_t mod, int fd);
     
     //发布
     void publish(std::vector<uint64_t> &change_mods);
 
     //根据在线用户fd得到需要发布的列表
     void make_publish_map(listen_fd_set &online_fds, 
                           publish_map &need_publish);
     
     
 private:
     //设计单例
     SubscribeList();
     SubscribeList(const SubscribeList &);
     const SubscribeList& operator=(const SubscribeList);
 
     static SubscribeList *_instance;
     static pthread_once_t _once;
 
 
     subscribe_map _book_list; //订阅清单
     pthread_mutex_t _book_list_lock;
 
     publish_map _push_list; //发布清单
     pthread_mutex_t _push_list_lock;
 };
 ```
 
         首先`SubscribeList`采用单例设计。这里面定义了两种数据类型
3.Lars-DnsV0.2-订阅模块的类的单例创建及方法属性初始化
``c
 //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
 typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
 
 //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
 typedef hash_map<int, hash_set<uint64_t>> publish_map;
 ```
 
         `subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,启动一个fd就代表一个客户端。
 
         `publish_map`是即将发布的表,启动这里面是subscribe_map的一个反表,可以是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。
 
 **属性**:
 
 `_book_list`:目前dns已经全部的订阅信息清单。
 
 `_push_list`:目前dns即将发布的客户端及订阅信息清单。
 
 **方法**
 
 `void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和订阅的客户端fd到_book_list中。
 
 `void unsubscribe(uint64_t mod, int fd)`:取消一条订阅数据。
 
 `void publish(std::vector<uint64_t> &change_mods)`: 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。
 
 `void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根据目前在线的订阅用户,得到需要通信的发布订阅列表。
4.Lars-DnsV0.2-发布功能的实现
//根据在线用户fd得到需要发布的列表
 void SubscribeList::make_publish_map(
             listen_fd_set &online_fds, 
             publish_map &need_publish)
 {
     publish_map::iterator it;
 
     pthread_mutex_lock(&_push_list_lock);
     //遍历_push_list 找到 online_fds匹配的数据,放到need_publish中
     for (it = _push_list.begin(); it != _push_list.end(); it++)  {
         //it->first 是 fd
         //it->second 是 modid/cmdid
         if (online_fds.find(it->first) != online_fds.end()) {
             //匹配到
             //当前的键值对移动到need_publish中
             need_publish[it->first] = _push_list[it->first];
             //当该组数据从_push_list中删除掉
             _push_list.erase(it);
         }
     }
 
     pthrea
5.Lars-DnsV0.2-发布功能的总结
//发布
 void SubscribeList::publish(std::vector<uint64_t> &change_mods)
 {
     //1 将change_mods已经修改的mod->fd 
     //  放到 发布清单_push_list中 
     pthread_mutex_lock(&_book_list_lock);
     pthread_mutex_lock(&_push_list_lock);
 
     std::vector<uint64_t>::iterator it;
 
     for (it = change_mods.begin(); it != change_mods.end(); it++) {
         uint64_t mod = *it;
         if (_book_list.find(mod) != _book_list.end()) {
             //将mod下面的fd set集合拷迁移到 _push_list中
             hash_set<int>::iterator fds_it;
             for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
                 int fd = *fds_it;
                 _push_list[fd].insert(mod);
             }
         }
     }
 
     pthread_mutex_unlock(&_push_list_lock);
     pthread_mutex_unlock(&_book_list_lock);
 
     //2 通知各个线程去执行推送任务
     server->thread_poll()->send_task(push_change_task, this);
 }
 ```
 
         这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。
6.Lars-DnsV0.2-订阅流程复习
### 6.2 开启订阅
 
         那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.
 
 
 
 >  lars_dns/src/dns_service.cpp
 
 ```c
 #include <ext/hash_set>
 #include "lars_reactor.h"
 #include "subscribe.h"
 #include "dns_route.h"
 #include "lars.pb.h"
 
 tcp_server *server;
 
 using __gnu_cxx::hash_set;
 
 typedef hash_set<uint64_t> client_sub_mod_list;
 
 // ...
 
 //订阅route 的modid/cmdid
 void create_subscribe(net_connection * conn, void *args)
 {
     conn->param = new client_sub_mod_list;
 }
 
 //退订route 的modid/cmdid
 void clear_subscribe(net_connection * conn, void *args)
 {
     client_sub_mod_list::iterator it;
     client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
 
     for (it = sub_list->begin(); it  != sub_list->end(); it++) {
         uint64_t mod = *it;
         SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
     }
 
     delete sub_list;
 
     conn->param = NULL;
 }
 
 int main(int argc, char **argv)
 {
     event_loop loop;
 
     //加载配置文件
     config_file::setPath("conf/lars_dns.conf");
     std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
     short port = config_file::instance()->GetNumber("reactor", "port", 7778);
 
 
     //创建tcp服务器
     server = new tcp_server(&loop, ip.c_str(), port);
 
     //==========注册链接创建/销毁Hook函数============
     server->set_conn_start(create_subscribe);
     server->set_conn_close(clear_subscribe);
       //============================================
 
     //注册路由业务
     server->add_msg_router(lars::ID_GetRouteRequest, get_route);
 
     //开始事件监听    
     printf("lars dns service ....\n");
     loop.event_process();
 
     return 0;
 }
 ```
 
         这里注册了两个链接Hook。`create_subscribe()`和`clear_subscribe()`。
 
 `client_sub_mod_list`为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。
7.Lars-DnsV0.2-订阅模块的集成
## 7) Backend Thread实时监控
 
             Backend Thread的后台总业务流程如下:
 
 
 
 ### 7.1 数据库表相关查询方法实现
 
         我们先实现一些基本的数据表达查询方法:
 
 > lars_dns/src/dns_route.cpp
 
 ```c
 /*
  *  return 0, 表示 加载成功,version没有改变
  *         1, 表示 加载成功,version有改变
  *         -1 表示 加载失败
  * */
 int Route::load_version()
 {
     //这里面只会有一条数据
     snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
 
     int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
     if (ret)
     {
         fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
         return -1;
     }
 
     MYSQL_RES *result = mysql_store_result(&_db_conn);
     if (!result)
     {
         fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
         return -1;
     }
 
     long line_num = mysql_num_rows(result);
     if (line_num == 0)
     {
         fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
         return -1;
     }
 
     MYSQL_ROW row = mysql_fetch_row(result);
     //得到version
 
     long new_version = atol(row[0]);
 
     if (new_version == this->_version)
     {
         //加载成功但是没有修改
         return 0;
     }
     this->_version = new_version;
     printf("now route version is %ld\n", this->_version);
 
     mysql_free_result(result);
 
     return 1;
 }
8.Lars-DnsV0.2订阅模块的测试
//加载RouteChange得到修改的modid/cmdid
 //将结果放在vector中
 void Route::load_changes(std::vector<uint64_t> &change_list) 
 {
     //读取当前版本之前的全部修改 
     snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
 
     int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
     if (ret)
     {
         fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
         return ;
     }
 
     MYSQL_RES *result = mysql_store_result(&_db_conn);
     if (!result)
     {
         fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
         return ;
     }
 
     long lineNum = mysql_num_rows(result);
     if (lineNum == 0)
     {
         fprintf(stderr,  "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
         return ;
     }
     MYSQL_ROW row;
     for (long i = 0;i < lineNum; ++i)
     {
         row = mysql_fetch_row(result);
         int modid = atoi(row[0]);
         int cmdid = atoi(row[1]);
         uint64_t key = (((uint64_t)modid) << 32) + cmdid;
         change_list.push_back(key);
     }
     mysql_free_result(result);    
 }
9.Lars-DnsV0.2订阅模块测试2
//将RouteChange
 //删除RouteChange的全部修改记录数据,remove_all为全部删除
 //否则默认删除当前版本之前的全部修改
 void Route::remove_changes(bool remove_all)
 {
     if (remove_all == false)
     {
         snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
     }
     else
     {
         snprintf(_sql, 1000, "DELETE FROM RouteChange;");
     }
     int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
     if (ret != 0)
     {
         fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
         return ;
     } 
 
     return;
 }
 ```
 
 这里面提供了基本的对一些表的加载和删除操作:
 
 `load_version()`:加载当前route信息版本号。
 
 `load_route_data()`:加载`RouteData`信息表,到_temp_pointer中。
 
 `swap()`:将__temp_pointer的表数据同步到_data_temp表中.
 
 `load_changes()`:加载RouteChange得到修改的modid/cmdid,将结果放在vector中
 
 `remove_changes()`:清空之前的修改记录。
10.Lars-DnsV0.2的发布问题bug修复
### 7.2 Backend Thread业务流程实现
 
 
 
 > lars_dns/src/dns_route.cpp
 
 ```c
 //周期性后端检查db的route信息的更新变化业务
 //backend thread完成
 void *check_route_changes(void *args)
 {
     int wait_time = 10;//10s自动修改一次,也可以从配置文件读取
     long last_load_time = time(NULL);
 
     //清空全部的RouteChange
     Route::instance()->remove_changes(true);
 
     //1 判断是否有修改
     while (true) {
         sleep(1);
         long current_time = time(NULL);
 
         //1.1 加载RouteVersion得到当前版本号
         int ret = Route::instance()->load_version();
         if (ret == 1) {
             //version改版 有modid/cmdid修改
             //2 如果有修改
 
             //2.1 将最新的RouteData加载到_temp_pointer中
             if (Route::instance()->load_route_data() == 0) {
                 //2.2 更新_temp_pointer数据到_data_pointer map中
                 Route::instance()->swap();
                 last_load_time = current_time;//更新最后加载时间
             }
 
             //2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送         
             std::vector<uint64_t> changes;
             Route::instance()->load_changes(changes);
 
             //推送
             SubscribeList::instance()->publish(changes);
 
             //2.4 删除当前版本之前的修改记录
             Route::instance()->remove_changes();
 
         }
         else {
             //3 如果没有修改
             if (current_time - last_load_time >= wait_time) {
                 //3.1 超时,加载最新的temp_pointer
                 if (Route::instance()->load_route_data() == 0) {
                     //3.2 _temp_pointer数据更新到_data_pointer map中
                     Route::instance()->swap();
                     last_load_time = current_time;
                 }
             }
         }
     }
 
     return NULL;
 }
 ```
11.Lars-DnsV0.2订阅发布流程梳理
完成dns模块的订阅功能测试V0.3
 
         我们提供一个修改一个modid/cmdid的sql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。
 
 > lars_dns/test/test_insert_dns_route.sql
 
 ```sql
 USE lars_dns;
 
 SET @time = UNIX_TIMESTAMP(NOW());
 
 INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
 UPDATE RouteVersion SET version = @time WHERE id = 1;
 
 INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
 ```
 
 
 
 客户端代码:
 
 > lars_dns/test/lars_dns_test1.cpp
 
 ```c
 #include <string.h>
 #include <unistd.h>
 #include <string>
 #include "lars_reactor.h"
 #include "lars.pb.h"
 
 //命令行参数
 struct Option
 {
     Option():ip(NULL),port(0) {}
 
     char *ip;
     short port;
 };
 
 
 Option option;
 
 void Usage() {
     printf("Usage: ./lars_dns_test -h ip -p port\n");
 }
 
 //解析命令行
 void parse_option(int argc, char **argv)
 {
     for (int i = 0; i < argc; i++) {
         if (strcmp(argv[i], "-h") == 0) {
             option.ip = argv[i + 1];
         }
         else if (strcmp(argv[i], "-p") == 0) {
             option.port = atoi(argv[i + 1]);
         }
     }
 
     if ( !option.ip || !option.port ) {
         Usage();
         exit(1);
     }
 
 }