实战:基于 BRPC+Etcd 打造轻量级 RPC 服务 —— 从注册到调用的完整实现 - 教程

news/2025/9/25 18:08:54/文章来源:https://www.cnblogs.com/slgkaifa/p/19111704

实战:基于 BRPC+Etcd 打造轻量级 RPC 服务 —— 从注册到调用的完整实现 - 教程

在实际开发中,当多个服务需要跨进程通信时,直接用 HTTP 或自定义协议会面临很多问题:比如服务地址硬编码导致扩容困难、服务下线后客户端还在调用僵尸节点、缺乏统一的日志和监控等。为了解决这些痛点,我基于百度的 BRPC 框架和 Etcd 服务注册中心,搭了一套轻量级 RPC 服务,支持服务自动注册、实时发现和负载均衡,今天就把整个实现过程拆解开讲清楚,代码可直接复用。

一、项目背景:为什么要造这个轮子?

先说说为什么选 BRPC 和 Etcd:

  • BRPC:相比 gRPC,BRPC 更贴合国内场景,支持百度_std、HTTP 等多种协议,性能稳定,而且自带连接池、重试等机制,不用重复造轮子;

  • Etcd:作为分布式键值存储,天生适合做服务注册中心 —— 支持租约(避免僵尸节点)、Watcher(实时感知服务变化),轻量且容易部署;

  • 场景适配:这套方案针对中小型项目设计,没有过度封装,核心逻辑清晰,后续加熔断、监控都很方便。

整个服务的核心目标很简单:让客户端能 “无感” 调用服务端,不用关心服务端在哪、有多少个节点,服务下线后自动剔除

二、整体架构:从注册到调用的闭环

先画个简单的架构图(文字描述更直观),整个流程分四步:

  1. 服务端启动:初始化 BRPC 服务 → 实现 Echo 业务逻辑 → 把自己的地址注册到 Etcd(带 3 秒租约);

  2. 客户端启动:连接 Etcd → 拉取已注册的服务节点 → 初始化信道管理;

  3. 实时发现:Etcd 通过 Watcher 监控服务节点变化,新节点上线时客户端自动加信道,节点下线时自动删信道;

  4. 客户端调用:通过轮询(RR)策略从信道池选一个节点,发起 RPC 调用,拿到响应后打印结果。

核心模块分为 4 个:日志模块(统一日志输出)、Etcd 注册发现模块(连接 Etcd)、BRPC 服务模块(业务逻辑载体)、信道管理模块(负载均衡 + 资源复用)。

三、核心模块实现:一步步拆轮子

1. 先搞定基础:日志模块(logger.hpp)

日志是排查问题的关键,我用了轻量级的 spdlog 库,支持调试 / 发布两种模式,还能自动打印文件名和行号。

1.1 日志模块代码
#pragma once
#include
#include
#include
#include
// 全局日志器,所有模块共用
std::shared_ptr g_default_logger;
/**
* @brief 初始化日志器
* @param mode 运行模式:false-调试(输出控制台),true-发布(输出文件)
* @param file 发布模式下的日志文件路径
* @param level 日志等级(0-trace,1-debug,2-info,3-warn,4-error,5-critical)
*/
void init_logger(bool mode, const std::string &file, int32_t level) {
// 调试模式:控制台输出,最低等级(trace),方便开发定位
if (!mode) {
g_default_logger = spdlog::stdout_color_mt("rpc-debug");
g_default_logger->set_level(spdlog::level::trace);
g_default_logger->flush_on(spdlog::level::trace);
}
// 发布模式:文件输出,按参数控制等级,避免控制台刷屏
else {
if (file.empty()) {
std::cerr  5) ? spdlog::level::info : (spdlog::level::level_enum)level;
g_default_logger->set_level(log_level);
g_default_logger->flush_on(log_level);
}
// 日志格式:[日志器名][时间][线程ID][等级] 内容(文件名:行号)
g_default_logger->set_pattern("[%n][%H:%M:%S][%t][%-8l] %v (%s:%#)");
}
// 日志宏定义,简化调用
#define LOG_TRACE(fmt, ...) g_default_logger->trace(fmt, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) g_default_logger->debug(fmt, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...)  g_default_logger->info(fmt, ##__VA_ARGS__)
#define LOG_WARN(fmt, ...)  g_default_logger->warn(fmt, ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) g_default_logger->error(fmt, ##__VA_ARGS__)
#define LOG_FATAL(fmt, ...) g_default_logger->critical(fmt, ##__VA_ARGS__)
1.2 关键逻辑解释
  • 全局日志器g_default_logger让所有模块不用重复创建日志器,避免资源浪费;

  • 模式区分:调试时输出控制台,带颜色区分等级(spdlog 的 stdout_color_sink),发布时输出文件,避免线上服务器控制台堆积日志;

  • 格式设计:包含线程 ID 和文件名行号,比如[rpc-debug][15:30:00][1234][debug] 初始化信道成功 (channel.hpp:45),定位问题时一眼就能找到代码位置。

2. 核心中的核心:Etcd 注册发现模块(etcd.hpp)

这是解决 “服务在哪” 的关键模块,分两个类:Registry(服务端用,注册服务)和Discovery(客户端用,发现服务)。

2.1 先明确依赖

需要安装 Etcd 的 C++ 客户端库(etcd-cpp-apiv3),Ubuntu 下可以用源码编译:

git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3 && mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local
make -j4 && sudo make install
2.2 Etcd 模块代码
#pragma once
#include
#include
#include
#include
#include
#include
#include "logger.hpp"  // 依赖日志模块
/**
* @brief 服务注册类(服务端用)
* 功能:将服务端地址注册到Etcd,带租约自动续期,服务下线后自动删除注册信息
*/
class Registry {
public:
using ptr = std::shared_ptr;  // 智能指针,避免内存泄漏
/**
* @brief 构造函数:初始化Etcd客户端和租约
* @param etcd_host Etcd服务地址(如http://127.0.0.1:2379)
* @param lease_ttl 租约时间(秒),默认3秒(太短频繁续期,太长僵尸节点存活久)
*/
Registry(const std::string &etcd_host, int lease_ttl = 3) {
// 初始化Etcd客户端
_client = std::make_shared(etcd_host);
// 创建租约:租约过期前会自动续期,服务端下线后续期失败,Etcd删除节点
_keep_alive = _client->leasekeepalive(lease_ttl).get();
if (!_keep_alive) {
LOG_FATAL("创建Etcd租约失败!Etcd地址:{}", etcd_host);
exit(1);
}
_lease_id = _keep_alive->Lease();
LOG_INFO("Etcd租约初始化成功,租约ID:{},TTL:{}秒", _lease_id, lease_ttl);
}
// 析构函数:取消租约,避免服务端正常退出后租约残留
~Registry() {
if (_keep_alive) {
_keep_alive->Cancel();
LOG_INFO("Etcd租约已取消,租约ID:{}", _lease_id);
}
}
/**
* @brief 注册服务到Etcd
* @param service_key 服务唯一标识(如/service/echo/instance1)
* @param service_value 服务访问地址(如127.0.0.1:7070)
* @return true-注册成功,false-失败
*/
bool registry(const std::string &service_key, const std::string &service_value) {
if (service_key.empty() || service_value.empty()) {
LOG_ERROR("服务Key或Value不能为空!Key:{},Value:{}", service_key, service_value);
return false;
}
// 带租约写入Etcd:Key=服务标识,Value=访问地址
auto resp = _client->put(service_key, service_value, _lease_id).get();
if (resp.is_ok()) {
LOG_INFO("服务注册成功!Etcd Key:{},Value:{}", service_key, service_value);
return true;
} else {
LOG_ERROR("服务注册失败!错误信息:{}", resp.error_message());
return false;
}
}
private:
std::shared_ptr _client;       // Etcd客户端
std::shared_ptr _keep_alive;// 租约续期对象
uint64_t _lease_id;                          // 租约ID
};
/**
* @brief 服务发现类(客户端用)
* 功能:拉取Etcd中的服务节点,监控节点变化,触发回调更新信道
*/
class Discovery {
public:
using ptr = std::shared_ptr;
// 回调函数类型:参数1-服务Key,参数2-服务Value(访问地址)
using NotifyCallback = std::function;
/**
* @brief 构造函数:初始化发现并监控服务
* @param etcd_host Etcd地址
* @param base_key 服务根目录(如/service,所有服务都在这个目录下)
* @param put_cb 节点上线回调(新增信道)
* @param del_cb 节点下线回调(删除信道)
*/
Discovery(const std::string &etcd_host,
const std::string &base_key,
const NotifyCallback &put_cb,
const NotifyCallback &del_cb)
: _put_cb(put_cb), _del_cb(del_cb) {
// 初始化Etcd客户端
_client = std::make_shared(etcd_host);
if (!_client) {
LOG_FATAL("创建Etcd客户端失败!地址:{}", etcd_host);
exit(1);
}
// 第一步:先拉取已存在的服务节点(避免客户端启动时漏节点)
auto resp = _client->ls(base_key).get();
if (resp.is_ok()) {
int node_count = resp.keys().size();
LOG_INFO("拉取到{}个服务节点,根目录:{}", node_count, base_key);
for (int i = 0; i (
*_client.get(), base_key,
std::bind(&Discovery::on_etcd_event, this, std::placeholders::_1),
true
);
LOG_INFO("Etcd Watcher启动成功,监控根目录:{}", base_key);
}
// 析构函数:取消Watcher
~Discovery() {
if (_watcher) {
_watcher->Cancel();
LOG_INFO("Etcd Watcher已取消");
}
}
private:
/**
* @brief Etcd事件回调:处理节点上线/下线
* @param resp Etcd事件响应
*/
void on_etcd_event(const etcd::Response &resp) {
if (!resp.is_ok()) {
LOG_ERROR("Etcd事件回调出错!错误:{}", resp.error_message());
return;
}
// 遍历所有事件(可能同时有多个节点变化)
for (const auto &event : resp.events()) {
if (event.event_type() == etcd::Event::EventType::PUT) {
// PUT事件:节点上线或更新
std::string key = event.kv().key();
std::string val = event.kv().as_string();
LOG_DEBUG("Etcd收到PUT事件:Key={},Value={}", key, val);
if (_put_cb) _put_cb(key, val);
} else if (event.event_type() == etcd::Event::EventType::DELETE_) {
// DELETE事件:节点下线
std::string key = event.prev_kv().key();
std::string val = event.prev_kv().as_string();
LOG_DEBUG("Etcd收到DELETE事件:Key={},Value={}", key, val);
if (_del_cb) _del_cb(key, val);
}
}
}
private:
NotifyCallback _put_cb;                      // 节点上线回调
NotifyCallback _del_cb;                      // 节点下线回调
std::shared_ptr _client;       // Etcd客户端
std::shared_ptr _watcher;     // 事件监控对象
};
2.3 关键逻辑拆解
  • Registry 的租约机制:这是避免僵尸节点的核心 —— 服务端正常运行时,KeepAlive会自动续期;如果服务端崩溃,续期失败,3 秒后 Etcd 会删除该节点的注册信息,客户端不会再调用到死节点;

  • Discovery 的两步初始化:先拉取已有节点(避免客户端启动时漏节点),再启动 Watcher(实时监控新变化),这两步缺一不可;

  • 回调解耦put_cbdel_cb由客户端传入,Discovery 只负责 “发现变化”,不关心 “如何处理变化”,这样后续换信道管理逻辑时不用改 Discovery 代码。

3. 连接客户端和服务端:BRPC 服务定义(main.proto)

RPC 服务需要先定义通信协议,用 Protobuf 来定义 Echo 服务的请求和响应结构,然后生成 C++ 代码。

3.1 Proto 文件定义(main.proto)
syntax = "proto2";  // BRPC推荐用proto2,兼容性更好
package example;    // 命名空间,避免类名冲突
// Echo请求:只传一个字符串
message EchoRequest {
required string message = 1;  // required表示必填
}
// Echo响应:返回处理后的字符串
message EchoResponse {
required string message = 1;
}
// Echo服务:只有一个Echo方法
service EchoService {
// 同步RPC方法:客户端发请求,等待响应
rpc Echo(EchoRequest) returns (EchoResponse);
}
3.2 生成 C++ 代码

需要安装 Protobuf 编译器(sudo apt install protobuf-compiler),然后执行:

# 生成main.pb.h和main.pb.cc(C++代码)
protoc --cpp_out=. main.proto
# 生成BRPC服务端/客户端代码(brpc_pb.pl在BRPC安装目录的bin下)
# 注意:如果brpc_pb.pl不在PATH里,需要写全路径(如/usr/local/brpc/bin/brpc_pb.pl)
brpc_pb.pl main.proto

生成的main.pb.h里包含EchoService的基类,服务端需要继承它实现业务逻辑,客户端用它的 Stub 发起调用。

4. 服务端实现:启动 BRPC + 注册到 Etcd(server.cc)

服务端的核心是 “实现 Echo 业务逻辑” 和 “注册服务”,代码里加了详细注释。

4.1 服务端代码
#include "../common/etcd.hpp"    // 自己写的Etcd模块
#include "../common/logger.hpp"  // 自己写的日志模块
#include        // 解析命令行参数
#include
#include          // BRPC服务端头文件
#include        // BRPC自带日志(会被我们的日志覆盖)
#include "main.pb.h"             // Proto生成的代码
// 命令行参数定义(gflags库),--help可以查看所有参数
DEFINE_bool(run_mode, false, "运行模式:false-调试(日志输控制台),true-发布(日志输文件)");
DEFINE_string(log_file, "", "发布模式必填:日志文件路径(如./server.log)");
DEFINE_int32(log_level, 2, "日志等级:0-trace,1-debug,2-info,3-warn,4-error,5-critical");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "Etcd服务地址");
DEFINE_string(base_service, "/service", "服务根目录(Etcd中的Key前缀)");
DEFINE_string(instance_name, "/echo/instance1", "当前服务实例名(确保唯一)");
DEFINE_string(access_host, "127.0.0.1:7070", "服务外部访问地址(客户端用这个地址调用)");
DEFINE_int32(listen_port, 7070, "BRPC服务监听端口(要和access_host的端口一致)");
/**
* @brief Echo服务实现类:继承Proto生成的EchoService
*/
class EchoServiceImpl : public example::EchoService {
public:
EchoServiceImpl() {}
~EchoServiceImpl() {}
/**
* @brief 实现Echo方法:业务逻辑核心
* @param controller BRPC控制器:用于设置错误信息、超时等
* @param request 客户端请求
* @param response 服务端响应
* @param done 回调对象:必须调用Run(),否则会内存泄漏
*/
void Echo(google::protobuf::RpcController* controller,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
// BRPC的ClosureGuard:自动调用done->Run(),避免漏写
brpc::ClosureGuard rpc_guard(done);
// 转换为BRPC控制器,方便获取更多信息(如客户端IP)
brpc::Controller* brpc_cntl = dynamic_cast(controller);
if (!brpc_cntl) {
LOG_ERROR("转换BRPC控制器失败!");
return;
}
// 打印客户端请求信息
LOG_INFO("收到客户端请求:客户端IP={}:{},消息={}",
butil::ip2str(brpc_cntl->remote_side().ip).c_str(),
brpc_cntl->remote_side().port,
request->message().c_str());
// 业务逻辑:简单拼接字符串(实际项目中这里可以调用其他模块)
std::string resp_msg = request->message() + " -- 服务端已收到,这是响应!";
response->set_message(resp_msg);
// (可选)设置调用成功标识(默认就是成功,失败时可以用brpc_cntl->SetFailed())
brpc_cntl->SetFailed(0, "success");
}
};
int main(int argc, char *argv[]) {
// 第一步:解析命令行参数(gflags库)
google::ParseCommandLineFlags(&argc, &argv, true);
// 第二步:初始化日志
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
LOG_INFO("日志模块初始化成功,运行模式:{}", FLAGS_run_mode ? "发布" : "调试");
// 第三步:关闭BRPC自带日志(避免和我们的日志冲突)
logging::LoggingSettings settings;
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(settings);
// 第四步:初始化BRPC服务端
brpc::Server server;
EchoServiceImpl echo_service;  // 实例化服务实现类
// 把服务添加到BRPC Server
// 第二个参数:SERVER_DOESNT_OWN_SERVICE表示服务对象由用户管理(不是Server)
int ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);
if (ret != 0) {
LOG_FATAL("添加Echo服务到BRPC失败!错误码:{}", ret);
return -1;
}
// 配置BRPC Server选项
brpc::ServerOptions options;
options.idle_timeout_sec = -1;  // 连接空闲不超时(长连接,适合频繁调用)
options.num_threads = 4;       // IO线程数:建议设为CPU核数(我的机器是4核)
options.max_concurrency = 100; // 最大并发数:避免服务被压垮
// 启动BRPC Server,监听指定端口
ret = server.Start(FLAGS_listen_port, &options);
if (ret != 0) {
LOG_FATAL("启动BRPC服务失败!端口:{},错误码:{}", FLAGS_listen_port, ret);
return -1;
}
LOG_INFO("BRPC服务启动成功,监听端口:{}", FLAGS_listen_port);
// 第五步:注册服务到Etcd
// 服务Key:base_service + instance_name(如/service/echo/instance1)
std::string service_key = FLAGS_base_service + FLAGS_instance_name;
Registry::ptr registry = std::make_shared(FLAGS_etcd_host);
if (!registry->registry(service_key, FLAGS_access_host)) {
LOG_FATAL("服务注册到Etcd失败!");
server.Stop(0);  // 注册失败,停止服务
return -1;
}
// 第六步:阻塞等待服务停止(按Ctrl+C触发)
LOG_INFO("服务端已就绪,等待客户端调用...(按Ctrl+C退出)");
server.RunUntilAskedToQuit();
// 退出前清理:停止服务
server.Stop(0);
LOG_INFO("服务端已退出");
return 0;
}

5. 客户端实现:发现服务 + 发起 RPC 调用(client.cc)

客户端需要 “管理信道”(避免每次调用创建连接)和 “负载均衡”,所以依赖之前写的信道管理模块(channel.hpp)。

5.1 先补信道管理模块(channel.hpp)
#pragma once
#include
#include
#include
#include
#include
#include "logger.hpp"  // 依赖日志模块
/**
* @brief 单个服务的信道管理类:管理一个服务的所有节点信道,实现轮询负载均衡
*/
class ServiceChannel {
public:
using ptr = std::shared_ptr;
using ChannelPtr = std::shared_ptr;  // BRPC信道智能指针
ServiceChannel(const std::string &service_name)
: _service_name(service_name), _index(0) {}
/**
* @brief 添加节点信道:服务上线时调用
* @param host 节点访问地址(如127.0.0.1:7070)
*/
void append(const std::string &host) {
if (host.empty()) {
LOG_ERROR("添加信道失败:节点地址为空!服务名:{}", _service_name);
return;
}
// 初始化BRPC信道
auto channel = std::make_shared();
brpc::ChannelOptions options;
options.connect_timeout_ms = 3000;  // 连接超时3秒
options.timeout_ms = 5000;          // 调用超时5秒
options.max_retry = 2;              // 重试2次(避免临时网络问题)
options.protocol = "baidu_std";     // BRPC默认协议
int ret = channel->Init(host.c_str(), &options);
if (ret != 0) {
LOG_ERROR("初始化信道失败!服务名:{},节点:{},错误码:{}",
_service_name, host, ret);
return;
}
// 加锁:避免多线程同时修改信道列表
std::unique_lock lock(_mutex);
_host_map[host] = channel;  // 地址→信道映射,方便删除
_channel_list.push_back(channel);  // 信道列表,用于轮询
LOG_DEBUG("添加信道成功!服务名:{},节点:{},当前信道数:{}",
_service_name, host, _channel_list.size());
}
/**
* @brief 删除节点信道:服务下线时调用
* @param host 节点访问地址
*/
void remove(const std::string &host) {
std::unique_lock lock(_mutex);
auto it = _host_map.find(host);
if (it == _host_map.end()) {
LOG_WARN("删除信道失败:节点不存在!服务名:{},节点:{}",
_service_name, host);
return;
}
// 从信道列表中删除
for (auto vit = _channel_list.begin(); vit != _channel_list.end(); ++vit) {
if (*vit == it->second) {
_channel_list.erase(vit);
break;
}
}
_host_map.erase(it);
LOG_DEBUG("删除信道成功!服务名:{},节点:{},当前信道数:{}",
_service_name, host, _channel_list.size());
}
/**
* @brief 轮询选择一个信道:客户端调用时用
* @return 可用的信道(空表示没有可用节点)
*/
ChannelPtr choose() {
std::unique_lock lock(_mutex);
if (_channel_list.empty()) {
LOG_ERROR("没有可用的信道!服务名:{}", _service_name);
return nullptr;
}
// 轮询策略:_index自增取模,保证每个节点调用次数均匀
int idx = _index++ % _channel_list.size();
LOG_DEBUG("轮询选择信道:服务名:{},信道索引:{},当前计数器:{}",
_service_name, idx, _index);
return _channel_list[idx];
}
private:
std::mutex _mutex;                  // 互斥锁:保护信道列表线程安全
int32_t _index;                     // 轮询计数器
std::string _service_name;          // 服务名(如/service/echo)
std::vector _channel_list;  // 信道列表:用于轮询
std::unordered_map _host_map;  // 地址→信道映射
};
/**
* @brief 多服务信道管理类:管理多个服务的ServiceChannel,是客户端的核心
*/
class ServiceManager {
public:
using ptr = std::shared_ptr;
ServiceManager() {}
/**
* @brief 声明要关注的服务:避免处理无关服务的节点变化
* @param service_name 服务名(如/service/echo)
*/
void declared(const std::string &service_name) {
if (service_name.empty()) {
LOG_ERROR("声明服务失败:服务名为空!");
return;
}
std::unique_lock lock(_mutex);
_关注的服务.insert(service_name);
LOG_INFO("已声明关注服务:{}", service_name);
}
/**
* @brief 选择服务的一个信道:客户端发起调用前调用
* @param service_name 服务名
* @return 可用信道(空表示无可用节点)
*/
ServiceChannel::ChannelPtr choose_channel(const std::string &service_name) {
std::unique_lock lock(_mutex);
auto it = _service_channel_map.find(service_name);
if (it == _service_channel_map.end()) {
LOG_ERROR("选择信道失败:服务未初始化!服务名:{}", service_name);
return nullptr;
}
return it->second->choose();
}
/**
* @brief 服务节点上线回调:由Etcd Discovery触发
* @param service_key Etcd中的Key(如/service/echo/instance1)
* @param host 节点地址(如127.0.0.1:7070)
*/
void on_service_online(const std::string &service_key, const std::string &host) {
// 从Key中提取服务名(如/service/echo/instance1 → /service/echo)
std::string service_name = extract_service_name(service_key);
if (service_name.empty()) {
LOG_ERROR("服务上线回调失败:Key格式错误!Key:{}", service_key);
return;
}
// 检查是否关注该服务
std::unique_lock lock(_mutex);
if (_关注的服务.count(service_name) == 0) {
LOG_DEBUG("服务上线但不关注:服务名:{},节点:{}", service_name, host);
return;
}
// 初始化ServiceChannel(不存在则创建)
auto it = _service_channel_map.find(service_name);
if (it == _service_channel_map.end()) {
auto service_channel = std::make_shared(service_name);
_service_channel_map[service_name] = service_channel;
LOG_INFO("初始化服务信道管理:服务名:{}", service_name);
it = _service_channel_map.find(service_name);
}
// 解锁后调用append(避免append中的锁和当前锁嵌套)
lock.unlock();
it->second->append(host);
}
/**
* @brief 服务节点下线回调:由Etcd Discovery触发
* @param service_key Etcd中的Key
* @param host 节点地址
*/
void on_service_offline(const std::string &service_key, const std::string &host) {
std::string service_name = extract_service_name(service_key);
if (service_name.empty()) {
LOG_ERROR("服务下线回调失败:Key格式错误!Key:{}", service_key);
return;
}
std::unique_lock lock(_mutex);
if (_关注的服务.count(service_name) == 0) {
LOG_DEBUG("服务下线但不关注:服务名:{},节点:{}", service_name, host);
return;
}
auto it = _service_channel_map.find(service_name);
if (it == _service_channel_map.end()) {
LOG_WARN("服务下线但信道管理未初始化:服务名:{}", service_name);
return;
}
// 解锁后调用remove
lock.unlock();
it->second->remove(host);
}
private:
/**
* @brief 从Etcd Key中提取服务名
* @param service_key Etcd Key(如/service/echo/instance1)
* @return 服务名(如/service/echo)
*/
std::string extract_service_name(const std::string &service_key) {
// 找到最后一个'/'的位置(如/service/echo/instance1 → 最后一个/在index=13)
size_t last_slash_pos = service_key.find_last_of('/');
if (last_slash_pos == std::string::npos || last_slash_pos == 0) {
LOG_ERROR("Key格式错误,必须是/xxx/xxx格式!Key:{}", service_key);
return "";
}
// 截取从0到last_slash_pos的部分(不含last_slash_pos)
return service_key.substr(0, last_slash_pos);
}
private:
std::mutex _mutex;  // 互斥锁:保护服务列表和信道映射
// 关注的服务集合:只处理这些服务的节点变化
std::unordered_set _关注的服务;
// 服务名→ServiceChannel映射:一个服务对应一个信道管理
std::unordered_map _service_channel_map;
};
5.2 客户端代码(client.cc)
#include "../common/etcd.hpp"    // 自己写的Etcd模块
#include "../common/channel.hpp"  // 自己写的信道管理模块
#include "../common/logger.hpp"  // 自己写的日志模块
#include        // 解析命令行参数
#include
#include "main.pb.h"             // Proto生成的代码
// 命令行参数定义
DEFINE_bool(run_mode, false, "运行模式:false-调试,true-发布");
DEFINE_string(log_file, "", "发布模式必填:日志文件路径(如./client.log)");
DEFINE_int32(log_level, 2, "日志等级:0-trace~5-critical");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "Etcd服务地址");
DEFINE_string(base_service, "/service", "服务根目录(Etcd中的Key前缀)");
DEFINE_string(call_service, "/service/echo", "要调用的服务名(如/service/echo)");
DEFINE_int32(call_interval, 1, "调用间隔(秒):默认1秒调用一次");
int main(int argc, char *argv[]) {
// 第一步:解析命令行参数
google::ParseCommandLineFlags(&argc, &argv, true);
// 第二步:初始化日志
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
LOG_INFO("客户端日志模块初始化成功,运行模式:{}", FLAGS_run_mode ? "发布" : "调试");
// 第三步:初始化信道管理器
ServiceManager::ptr service_manager = std::make_shared();
// 声明要调用的服务(只处理这个服务的节点变化)
service_manager->declared(FLAGS_call_service);
// 第四步:初始化Etcd Discovery,绑定节点变化回调
// 节点上线回调:绑定到service_manager的on_service_online
auto on_online = std::bind(
&ServiceManager::on_service_online,
service_manager.get(),
std::placeholders::_1,  // 第一个参数:service_key
std::placeholders::_2   // 第二个参数:host
);
// 节点下线回调:绑定到service_manager的on_service_offline
auto on_offline = std::bind(
&ServiceManager::on_service_offline,
service_manager.get(),
std::placeholders::_1,
std::placeholders::_2
);
// 创建Discovery对象:拉取节点+启动监控
Discovery::ptr discovery = std::make_shared(
FLAGS_etcd_host,
FLAGS_base_service,
on_online,
on_offline
);
// 第五步:循环发起RPC调用
LOG_INFO("客户端已就绪,开始调用服务:{},间隔:{}秒(按Ctrl+C退出)",
FLAGS_call_service, FLAGS_call_interval);
while (true) {
// 5.1 从信道管理器获取一个可用信道
auto channel = service_manager->choose_channel(FLAGS_call_service);
if (!channel) {
LOG_ERROR("获取信道失败,{}秒后重试...", FLAGS_call_interval);
std::this_thread::sleep_for(std::chrono::seconds(FLAGS_call_interval));
continue;
}
// 5.2 创建BRPC Stub(用于发起调用)
example::EchoService_Stub stub(channel.get());
// 5.3 构造请求
example::EchoRequest req;
req.set_message("你好,这是客户端的请求!");  // 实际项目中可以传业务参数
// 5.4 发起同步RPC调用
brpc::Controller cntl;  // BRPC控制器
example::EchoResponse rsp;
stub.Echo(&cntl, &req, &rsp, nullptr);  // 最后一个参数:异步回调(这里用同步)
// 5.5 处理调用结果
if (cntl.Failed()) {
LOG_ERROR("RPC调用失败!错误信息:{},错误码:{}",
cntl.ErrorText(), cntl.ErrorCode());
} else {
LOG_INFO("RPC调用成功!请求:{},响应:{}",
req.message().c_str(), rsp.message().c_str());
}
// 5.6 间隔指定时间后继续调用
std::this_thread::sleep_for(std::chrono::seconds(FLAGS_call_interval));
}
return 0;
}

四、部署运行:从编译到测试

1. 目录结构

先理清楚目录结构,避免编译时路径错误:

rpc-demo/
├── common/                # 公共模块
│   ├── etcd.hpp           # Etcd注册发现
│   ├── channel.hpp        # 信道管理
│   └── logger.hpp         # 日志模块
├── server/                # 服务端
│   └── server.cc          # 服务端代码
├── client/                # 客户端
│   └── client.cc          # 客户端代码
└── proto/                 # Proto文件和生成的代码
├── main.proto         # Proto定义
├── main.pb.h          # 生成的头文件
└── main.pb.cc         # 生成的源文件

2. 编译命令

需要链接的库:BRPC、Etcd 客户端、Protobuf、gflags、spdlog、pthread(线程库)等。

rpc-demo根目录下创建Makefile(方便编译):

# Makefile for RPC Demo
CC = g++
CFLAGS = -std=c++11 -Wall -g  # -g用于调试,发布时可以去掉
INCLUDES = -I./common -I./proto  # 头文件路径
# 依赖库路径(如果库不在默认路径,需要加-L/usr/local/lib)
LIBS = -lbrpc -letcd-cpp-api -lprotobuf -lgflags -lspdlog -lpthread -lz -lssl -lcrypto
# 目标:服务端和客户端
TARGETS = server/client server/server
all: $(TARGETS)
# 编译服务端
server/server: server/server.cc proto/main.pb.cc
$(CC) $(CFLAGS) $(INCLUDES) $^ -o $@ $(LIBS)
# 编译客户端
server/client: client/client.cc proto/main.pb.cc
$(CC) $(CFLAGS) $(INCLUDES) $^ -o $@ $(LIBS)
# 清理生成的文件
clean:
rm -f $(TARGETS)
rm -f proto/*.pb.h proto/*.pb.cc  # 可选:清理生成的Proto代码

然后执行编译:

make -j4  # -j4表示用4个线程编译,速度更快

3. 启动步骤

3.1 先启动 Etcd

如果没有 Etcd,先安装(Ubuntu):

sudo apt install etcd
# 启动Etcd(默认监听2379端口)
etcd
3.2 启动服务端
# 调试模式启动:日志输控制台,监听7070端口
./server/server --run_mode=false --listen_port=7070 --instance_name=/echo/instance1
# 或者发布模式启动:日志输文件
# ./server/server --run_mode=true --log_file=./server.log --listen_port=7070 --instance_name=/echo/instance1

服务端启动成功后,会输出:

[rpc-debug][16:00:00][1234][info] 日志模块初始化成功,运行模式:调试 (logger.hpp:58)
[rpc-debug][16:00:00][1234][info] BRPC服务启动成功,监听端口:7070 (server.cc:103)
[rpc-debug][16:00:00][1234][info] Etcd租约初始化成功,租约ID:123456,TTL:3秒 (etcd.hpp:45)
[rpc-debug][16:00:00][1234][info] 服务注册成功!Etcd Key:/service/echo/instance1,Value:127.0.0.1:7070 (etcd.hpp:78)
[rpc-debug][16:00:00][1234][info] 服务端已就绪,等待客户端调用...(按Ctrl+C退出) (server.cc:115)
3.3 启动客户端

打开新的终端,启动客户端:

# 调试模式启动:1秒调用一次
./server/client --run_mode=false --call_interval=1
# 或者发布模式启动
# ./server/client --run_mode=true --log_file=./client.log --call_interval=1

客户端启动成功后,会输出:

[rpc-debug][16:01:00][5678][info] 客户端日志模块初始化成功,运行模式:调试 (logger.hpp:58)
[rpc-debug][16:01:00][5678][info] 已声明关注服务:/service/echo (channel.hpp:165)
[rpc-debug][16:01:00][5678][info] 拉取到1个服务节点,根目录:/service (etcd.hpp:153)
[rpc-debug][16:01:00][5678][info] Etcd Watcher启动成功,监控根目录:/service (etcd.hpp:172)
[rpc-debug][16:01:00][5678][info] 客户端已就绪,开始调用服务:/service/echo,间隔:1秒(按Ctrl+C退出) (client.cc:75)
[rpc-debug][16:01:00][5678][debug] 轮询选择信道:服务名:/service/echo,信道索引:0,当前计数器:1 (channel.hpp:109)
[rpc-debug][16:01:00][5678][info] RPC调用成功!请求:你好,这是客户端的请求!,响应:你好,这是客户端的请求! -- 服务端已收到,这是响应! (client.cc:98)
3.4 测试负载均衡

可以启动多个服务端(修改instance_namelisten_port),比如:

# 第二个服务端:监听7071端口,实例名instance2
./server/server --run_mode=false --listen_port=7071 --instance_name=/echo/instance2 --access_host=127.0.0.1:7071

此时客户端会自动发现新节点,调用时会轮询两个服务端,日志中会显示 “信道索引 0” 和 “信道索引 1” 交替出现。

五、实战优化:那些容易忽略的细节

  1. Etcd 租约时间:3 秒是比较均衡的选择 —— 太短会导致频繁续期(增加 Etcd 压力),太长会让僵尸节点存活久(客户端可能调用失败);

  2. BRPC 线程数options.num_threads建议设为 CPU 核数(如 4 核设 4),太多会导致线程切换开销,太少会处理不过来;

  3. 日志轮转:发布模式下,日志文件会越来越大,需要用logrotate工具配置轮转(比如每天生成一个新文件,保留 7 天);

  4. 异常处理:代码中加了很多错误检查(如信道初始化失败、Etcd 注册失败),实际项目中还可以加告警(如调用失败次数超过阈值时发邮件);

  5. 服务名规范:建议用/业务/服务名的格式(如/payment/order),避免不同业务的服务名冲突。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/917348.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【2025最新】ArcGIS 点聚合功能实现全教程(进阶版) - 实践

【2025最新】ArcGIS 点聚合功能实现全教程(进阶版) - 实践2025-09-25 18:04 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !importa…

隐藏在众目睽睽之下:从PEB中解除恶意DLL的链接

本文深入探讨了一种恶意软件用于隐藏注入DLL的反取证技术。详细解析了Windows进程环境块(PEB)的结构,并展示了如何通过操作PEB中的三重双向链表来隐藏已加载的恶意DLL,包括具体的代码实现和检测方法。隐藏在众目睽睽…

详细介绍:Java 领域中 Java-EE 的异步编程实现

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

营销型网站公司名称手机wap网站模板 带后台

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到教程。 单点登录SSO(Single Sign On)说得简单点就是在一个多系统共存的环境下,用户在一处登录后,…

网站qq聊天代码深圳网站建设公司服务商

mqtt:轻量级物联网消息推送协议。 目录 一、介绍 1、官方文档 1)npm网 2) 中文网 MQTT中文网_MQTT 物联网接入平台-MQTT.CN 2、官方示例 二、准备工作 1、安装依赖包 2、示例版本 三、使用步骤 1、在单页面引入 mqtt 四、完整示例 tips 一、介…

设计模式六大原则 - 实践

设计模式六大原则 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", &q…

徐霞客的《青云志》

徐霞客的《青云志》明代著名的旅行家和地理学家徐霞客的《青云志》在网络上炒得火爆。全文是: “身处低谷不自弃,我命由我不由天。 无人扶我青云志,我自踏雪至山巅。 若是命中无此运,亦可孤身登昆仑。 红尘赠我三尺…

深入解析:豆包Seedream 4.0:全面测评、玩法探索与Prompt解读

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

运营商 API 安全最佳实践、案例与方案推荐(2025)|千万级接口的全链路实战

在 5G、云原生与边缘协同的现实架构里,运营商的 API 安全应遵循一条清晰主线:资产可视 → 行为基线 → 联动处置 → 可审计证据。落地层面需要可度量、可复核的硬指标作为抓手,例如 分钟级增量捕获、告警≤0.5s、MT…

HyperWorks许可与多用户支持

在工程项目中,软件许可管理和多用户支持是确保团队协作顺畅进行的核心要素。HyperWorks作为一款领先的工程仿真软件,不仅提供了灵活的许可管理方案,还具备卓越的多用户支持功能,助力团队高效协作,共创卓越成果。 …

免费素材库短视频素材网站如何做网站知乎

大家都知道光模块是影响整个网络性能的关键因素,特别是在工业以太网中,网络连接控制的多为大型工业设备,光模块的稳定性尤为重要,那么,我们该如何选购工业级光模块呢?接下来就由飞畅科技的小编来为大家详细…

破局与进化:火山引擎Data Agent从落地实践到架构未来

本文为火山引擎技术专家陈硕,在AICon全球人工智能与机器学习技术大会上的演讲分享。本文围绕以下五部分展开:Data Agent整体介绍 智能分析Agent产品演进 智能分析Agent技术架构演进 智能分析Agent落地新进展 Data Ag…

建立网站平台做ppt的软件怎么下载网站

文章目录 写在前面Tag题目来源解题思路方法一:链表转数组方法二:自顶向下归并排序方法三:自底向上的归并排序 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法,两到三天更新一篇文章,欢迎催更…… 专栏内…

使用trace进行排查网络瓶颈

func NewHTTPTraceLogger(ctx context.Context, fileUrl string, fragmentID, attempt int) context.Context {traceStart := time.Now()var dnsStart, connectStart, tlsStart, gotConnTime time.Timetrace := &h…

五项能力斩获满分!天翼云云WAF获IDC权威认可!

近日,国际数据公司(IDC)发布《协同大模型防火墙能力的中国WAAP厂商技术能力评估,2025》报告,围绕Web安全、Bot管理、威胁情报等核心现代应用防护需求,对厂商的产品技术与服务能力展开全面考察。天翼云云WAF产品在…

什么样的代码可以称得上是好代码? - 浪矢

目录 好代码不仅需满足功能需求,还要考虑未来的拓展,问题的排查,用户的体验和成本控制。 在Code Review & 开发行为规范中我从代码开发与实现规范,异常处理规范,日志管理规范以及版本控制与写作规范四个维度梳…

抖胆代理商,DD3118S芯片,USB3.0读卡方案,替代GL3213S方案

抖胆代理商,DD3118S芯片,USB3.0读卡方案,替代GL3213S方案DD3118s是抖胆科技推出的一款读卡器控制芯片,DD3118S基于40nm低功耗工艺设计,集成了USB 3.0、SD 3.0和eMMC 4.5协议支持能力。其最大特点是创新性地采用了…

JavaEE 导读与环境配置 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

深圳 响应式网站建设广州网站推广哪家强

在C中,函数参数的传递方式主要有三种:值传递、引用传递和指针传递。下面我会分别解释这三种方式的区别: 值传递(Pass by Value): 值传递是将实际参数的值复制给函数的形式参数。这意味着函数接收的是原始数据的一个副本…

中宁网站建设公司网页美工设计中职期末试卷

【漏洞详情】 漏洞描述:Alibaba Nacos derby 存在远程代码执行漏洞,由于Alibaba Nacos部分版本中derby数据库默认可以未授权访问,恶意攻击者利用此漏洞可以未授权执行SQL语句,从而远程加载恶意构造的jar包,最终导致任意…