目录
01 lars 系统架构回顾
02 lars-lbAgentV0.4-route_lb处理report业务流程
03 lars-lbAgentV0.4-负责均衡判断参数配置
04 lars-lbAgentV0.4-负载均衡idle节点的失败率判断
05 lars-lbAgentV0.4-负载均衡overload节点的成功率判断
06 lars-lbAgentV0.4-负载均衡上报提交模块
07 lars-lbAgentV0.4-数据问题修改-上报业务测试
08 lars-lbAgentV0.4-模拟器测试工具
09 lars-lbAgentV0.5-idle连续成功-overload连续失败的定期窗口重置机制
10 lars-lbAgentV0.6-loadbalance主动更新主机信息功能
11 昨日回顾
12 Lars-LbAgentV0.7-GetRouteAPI实现
13 Lars-LbAgentV0.7-Udp-server-获取路由服务业务注册
14 Lars-LbAgentV0.7-Udp-server-API和udpserver的测试
01 lars 系统架构回顾
**启动 Lars Reporter**
```bash
$ ./run_lars reporter
▄▄
██
██ ▄█████▄ ██▄████ ▄▄█████▄
██ ▀ ▄▄▄██ ██▀ ██▄▄▄▄ ▀
██ ▄██▀▀▀██ ██ ▀▀▀▀██▄
██▄▄▄▄▄▄ ██▄▄▄███ ██ █▄▄▄▄▄██
▀▀▀▀▀▀▀▀ ▀▀▀▀ ▀▀ ▀▀ ▀▀▀▀▀▀
Load balance And Remote service schedule System
_____ _
| __ \ | |
| |__) |___ _ __ ___ _ __| |_ ___ _ __
| _ // _ \ '_ \ / _ \| '__| __/ _ \ '__|
| | \ \ __/ |_) | (_) | | | || __/ |
|_| \_\___| .__/ \___/|_| \__\___|_|
| |
|_|
ITCAST(https://www.itcast.cn)
------------------------------------
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 3
```
**启动 Lars dns**
```bash
$ ./run_lars dns
▄▄
██
██ ▄█████▄ ██▄████ ▄▄█████▄
██ ▀ ▄▄▄██ ██▀ ██▄▄▄▄ ▀
██ ▄██▀▀▀██ ██ ▀▀▀▀██▄
██▄▄▄▄▄▄ ██▄▄▄███ ██ █▄▄▄▄▄██
▀▀▀▀▀▀▀▀ ▀▀▀▀ ▀▀ ▀▀ ▀▀▀▀▀▀
Load balance And Remote service schedule System
_____
| __ \
| | | |_ __ ___
| | | | '_ \/ __|
| |__| | | | \__ \
|_____/|_| |_|___/
ITCAST(https://www.itcast.cn)
------------------------------------
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
lars dns service ....
now route version is 1574674421
```
**启动 Lars Lb Agent**
```c
$ ./run_lars lbagent
▄▄
██
██ ▄█████▄ ██▄████ ▄▄█████▄
██ ▀ ▄▄▄██ ██▀ ██▄▄▄▄ ▀
██ ▄██▀▀▀██ ██ ▀▀▀▀██▄
██▄▄▄▄▄▄ ██▄▄▄███ ██ █▄▄▄▄▄██
▀▀▀▀▀▀▀▀ ▀▀▀▀ ▀▀ ▀▀ ▀▀▀▀▀▀
Load balance And Remote service schedule System
_ _ _
| | | | /\ | |
| | | |__ / \ __ _ ___ _ __ | |_
| | | '_ \ / /\ \ / _` |/ _ \ '_ \| __|
| |____| |_) | / ____ \ (_| | __/ | | | |_
|______|_.__/ /_/ \_\__, |\___|_| |_|\__|
__/ |
|___/
ITCAST(https://www.itcast.cn)
------------------------------------
```
02 lars-lbAgentV0.4-route_lb处理report业务流程

LB Agent拥有5个线程,一个LB算法:
- UDP Server服务,并运行LB算法,对业务提供节点获取和节点调用结果上报服务;为了增大系统吞吐量,使用3个UDP Server服务互相独立运行LB算法:`modid+cmdid % 3 = i`的那些模块的服务与调度,由第`i+1`个UDP Server线程负责
- Dns Service Client:是dnsserver的客户端线程,负责根据需要,向dnsserver获取一个模块的节点集合(或称为获取路由);UDP Server会按需向此线程的MQ写入获取路由请求,DSS Client将MQ到来的请求转发到dnsserver,之后将dnsserver返回的路由信息更新到对应的UDP Server线程维护的路由信息中
- Report Service Client:是reporter的客户端线程,负责将每个模块下所有节点在一段时间内的调用结果、过载情况上报到reporter Service端,便于观察情况、做报警;本身消费MQ数据,UDP Server会按需向MQ写入上报状态请求
03 lars-lbAgentV0.4-负责均衡判断参数配置
## 2) 构建Lars-Agent项目
### 2.1 构建目录结构
依次创建如下目录:
> Lars/lars_loadbalance_agent
```bash
lars_loadbalance_agent/
├── bin/
├── conf/
│ └── lars_lb_agent.conf
├── include/
├── Makefile
└── src/
```
其中Makefile如下:
```makefile
TARGET= bin/lars_lb_agent
CXX=g++
CFLAGS=-g -O2 -Wall -Wno-deprecated
BASE=../base
BASE_H=$(BASE)/include
PROTO = $(BASE)/proto
PROTO_H = $(BASE)/proto
LARS_REACTOR=../lars_reactor
LARS_REACTOR_H =$(LARS_REACTOR)/include
LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor
MYSQL=$(BASE)/mysql-connector-c
MYSQL_H=$(MYSQL)/include
MYSQL_LIB=$(MYSQL)/lib/libmysqlclient.a
OTHER_LIB = -lpthread -ldl -lprotobuf
SRC= ./src
INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(MYSQL_H) -I$(PROTO_H)
LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB)
OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp)))
OBJS += $(PROTO)/lars.pb.o
$(TARGET): $(OBJS)
mkdir -p bin
$(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB)
%.o: %.cpp
$(CXX) $(CFLAGS) -c -o $@ $< $(INC)
.PHONY: clean
clean:
-rm -f src/*.o $(PROTO)/lars.pb.o $(TARGET)
```
实际上reporter、dns相似。
> conf/lars_lb_agent.conf
```ini
[reporter]
ip = 127.0.0.1
port = 7779
[dnsserver]
ip = 127.0.0.1
port = 7778
```
目前的基本配置文件,因为loadbalance_agent是充当reporter和dnsserver的客户端,所以需要知道对应的reporter和dnsserver的ip和port信息。
04 lars-lbAgentV0.4-负载均衡idle节点的失败率判断
### 2.2 主模块业务搭建

首先我们要在主线程中,启动3个UDP Server线程,这个是提供业务层/API层的服务。然后分别启动report_client线程,用来和reporter Service进行通信,将请求上报信息发送给Reporter Service。 然后再启动dns_client线程,用来和dns service通信。
> lars_loadbalance_agent/include/main_server.h
```c
#pragma once
#include "lars_reactor.h"
#include "lars.pb.h"
//与report_client通信的thread_queue消息队列
extern thread_queue<lars::ReportStatusRequest>* report_queue;
//与dns_client通信的thread_queue消息队列
extern thread_queue<lars::GetRouteRequest>* dns_queue;
// 启动udp server服务,用来接收业务层(调用者/使用者)的消息
void start_UDP_servers(void);
// 启动lars_reporter client 线程
void start_report_client(void);
// 启动lars_dns client 线程
void start_dns_client(void);
```
05 lars-lbAgentV0.4-负载均衡overload节点的成功率判断
> lars_loadbalance_agent/src/main_server.cpp
```c
#include "main_server.h"
#include "lars.pb.h"
//与report_client通信的thread_queue消息队列
thread_queue<lars::ReportStatusRequest>* report_queue = NULL;
//与dns_client通信的thread_queue消息队列
thread_queue<lars::GetRouteRequest>* dns_queue = NULL;
int main(int argc, char **argv)
{
//1 加载配置文件
//2 启动udp server服务,用来接收业务层(调用者/使用者)的消息
start_UDP_servers();
//3 启动lars_reporter client 线程
report_queue = new thread_queue<lars::ReportStatusRequest>();
if (report_queue == NULL) {
fprintf(stderr, "create report queue error!\n");
exit(1);
}
start_report_client();
//4 启动lars_dns client 线程
dns_queue = new thread_queue<lars::GetRouteRequest>();
if (dns_queue == NULL) {
fprintf(stderr, "create dns queue error!\n");
exit(1);
}
start_dns_client();
std::cout <<"done!" <<std::endl;
while (1) {
sleep(10);
}
return 0;
}
```
06 lars-lbAgentV0.4-负载均衡上报提交模块
这里我们分别在main()中 ,开启以上线程。
其中`report_client`线程需要携带`thread_queue<lars::ReportStatusRequest>`消息队列通道。`agent`负责将上报请求消息`lars::ReportStatusRequest`通过thread_queue发送给reporter service。
其中`dns_client`线程需要携带`thread_queue<lars::GetRouteRequest>`。`agent`负责将请求modid/cmdid的route消息`lars::GetRouteRequest`通过thread_queue发送给dns service。
3个udp server的线程开辟实现如下:
> lars_loadbalance_agent/src/agent_udp_server.cpp
```c
#include "lars_reactor.h"
#include "main_server.h"
void * agent_server_main(void * args)
{
int *index = (int*)args;
short port = *index + 8888;
event_loop loop;
udp_server server(&loop, "0.0.0.0", port);
//TODO 给server注册消息分发路由业务
printf("agent UDP server :port %d is started...\n", port);
loop.event_process();
return NULL;
}
07 lars-lbAgentV0.4-数据问题修改-上报业务测试
void start_UDP_servers(void)
{
for (int i = 0; i < 3; i ++) {
pthread_t tid;
int ret = pthread_create(&tid, NULL, agent_server_main, &i);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
pthread_detach(tid);
}
}
```
reporter thread创建实现如下:
> lars_loadbalance_agent/src/reporter_client.cpp
```c
#include "lars_reactor.h"
#include "main_server.h"
#include <pthread.h>
void *report_client_thread(void* args)
{
printf("report client thread start\n");
#if 0
event_loop loop;
//1 加载配置文件得到repoter ip + port
std::string ip = config_file::instance()->GetString("reporter", "ip", "");
short port = config_file::instance()->GetNumber("reporter", "port", 0);
//2 创建客户端
tcp_client client(&loop, ip.c_str(), port, "reporter client");
//3 将 thread_queue消息回调事件,绑定到loop中
report_queue->set_loop(&loop);
report_queue->set_callback()
//4 启动事件监听
loop.event_process();
#endif
return NULL;
}
08 lars-lbAgentV0.4-模拟器测试工具
void start_report_client()
{
//开辟一个线程
pthread_t tid;
//启动线程业务函数
int ret = pthread_create(&tid, NULL, report_client_thread, NULL);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
//设置分离模式
pthread_detach(tid);
}
```
dns thread创建实现如下:
> lars_loadbalance_agent/src/dns_client.cpp
```c
#include "lars_reactor.h"
#include "main_server.h"
#include <pthread.h>
void *dns_client_thread(void* args)
{
printf("dns client thread start\n");
return NULL;
}
void start_dns_client()
{
//开辟一个线程
pthread_t tid;
//启动线程业务函数
int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
//设置分离模式
pthread_detach(tid);
}
```
09 lars-lbAgentV0.5-idle连续成功-overload连续失败的定期窗
口重置机制
### 2.3 测试lb_agentV0.1开发
编译,然后我们简单启动一下`./bin/lars_lb_agent`
```bash
$ ./bin/lars_lb_agent
dns client thread start
report client thread start
done!
msg_router init...
server on 0.0.0.0:8888 is running...
agent UDP server :port 8888 is started...
msg_router init...
server on 0.0.0.0:8888 is running...
agent UDP server :port 8888 is started...
msg_router init...
server on 0.0.0.0:8888 is running...
agent UDP server :port 8888 is started...
...
```
10 lars-lbAgentV0.6-loadbalance主动更新主机信息功能
## 3) Report Client设计与实现
report client主要是实现thread_queue的回调业务,udp server会定期的上传上报数据到reporter,那么请求对于report client就是透传给reporter serivce即可。
> lars_loadbalance_agent/src/reporter_client.cpp
```c
#include "lars_reactor.h"
#include "main_server.h"
#include <string>
#include <pthread.h>
//typedef void io_callback(event_loop *loop, int fd, void *args);
//只要thread_queue有数据,loop就会触发此回调函数来处理业务
void new_report_request(event_loop *loop, int fd, void *args)
{
tcp_client *client = (tcp_client*)args;
//1. 将请求数据从thread_queue中取出,
std::queue<lars::ReportStatusRequest> msgs;
//2. 将数据放在queue队列中
report_queue->recv(msgs);
//3. 遍历队列,通过client依次将每个msg发送给reporter service
while (!msgs.empty()) {
lars::ReportStatusRequest req = msgs.front();
msgs.pop();
std::string requestString;
req.SerializeToString(&requestString);
//client 发送数据
client->send_message(requestString.c_str(), requestString.size(), lars::ID_ReportStatusRequest);
}
}
11 昨日回顾
void *report_client_thread(void* args)
{
printf("report client thread start\n");
event_loop loop;
//1 加载配置文件得到repoter ip + port
std::string ip = config_file::instance()->GetString("reporter", "ip", "");
short port = config_file::instance()->GetNumber("reporter", "port", 0);
//2 创建客户端
tcp_client client(&loop, ip.c_str(), port, "reporter client");
//3 将 thread_queue消息回调事件,绑定到loop中
report_queue->set_loop(&loop);
report_queue->set_callback(new_report_request, &client);
//4 启动事件监听
loop.event_process();
return NULL;
}
12 Lars-LbAgentV0.7-GetRouteAPI实现
void start_report_client()
{
//开辟一个线程
pthread_t tid;
//启动线程业务函数
int ret = pthread_create(&tid, NULL, report_client_thread, NULL);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
//设置分离模式
pthread_detach(tid);
}
```
## 4) Dns Client设计与实现
dns client 和report client的业务十分相似,只是针对的协议不同了。dns client的thread_queue 回调业务主要是透传`lars::GetRouteRequest`数据包。
> lars_loadbalance_agent/src/dns_client.cpp
```c
#include "lars_reactor.h"
#include "main_server.h"
#include <pthread.h>
//typedef void io_callback(event_loop *loop, int fd, void *args);
//只要thread_queue有数据,loop就会触发此回调函数来处理业务
void new_dns_request(event_loop *loop, int fd, void *args)
{
tcp_client *client = (tcp_client*)args;
//1. 将请求数据从thread_queue中取出,
std::queue<lars::GetRouteRequest> msgs;
//2. 将数据放在queue队列中
dns_queue->recv(msgs);
//3. 遍历队列,通过client依次将每个msg发送给reporter service
while (!msgs.empty()) {
lars::GetRouteRequest req = msgs.front();
msgs.pop();
std::string requestString;
req.SerializeToString(&requestString);
//client 发送数据
client->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
}
}
13 Lars-LbAgentV0.7-Udp-server-获取路由服务业务注册
void *dns_client_thread(void* args)
{
printf("dns client thread start\n");
event_loop loop;
//1 加载配置文件得到dns service ip + port
std::string ip = config_file::instance()->GetString("dnsserver", "ip", "");
short port = config_file::instance()->GetNumber("dnsserver", "port", 0);
//2 创建客户端
tcp_client client(&loop, ip.c_str(), port, "dns client");
//3 将thread_queue消息回调事件,绑定到loop中
dns_queue->set_loop(&loop);
dns_queue->set_callback(new_dns_request, &client);
//4 启动事件监听
loop.event_process();
return NULL;
}
void start_dns_client()
{
//开辟一个线程
pthread_t tid;
//启动线程业务函数
int ret = pthread_create(&tid, NULL, dns_client_thread, NULL);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
//设置分离模式
pthread_detach(tid);
}
```
14 Lars-LbAgentV0.7-Udp-server-API和udpserver的测试
## 5) 负载均衡模块基础设计(V0.2)
### 5.1 基础
每个模块`modid/cmdid`下有若干节点,节点的集合称为此模块的路由; 对于每个节点,有两种状态:
- `idle`:此节点可用,可作为API**(相当于Agent的客户端)**请求的节点使用;
- `overload`:此节点过载,暂时不可作为API请求的节点使用
在请求节点时,有几个关键属性:
- 虚拟成功次数`vsucc`,API汇报节点调用结果是成功时,该值+1
- 虚拟失败次数`verr`,API汇报节点调用结果是失败时,该值+1
- 连续成功次数`contin_succ`,连续请求成功的次数
- 连续失败次数`contin_err`,连续请求失败的次数
这4个字段,在节点状态改变时(idle<—>overload),会被重置。