【RabbitMQ 项目】服务端:服务器模块

文章目录

  • 一.编写思路
  • 二.代码实践
  • 三.服务端模块关系总结

一.编写思路

成员变量:

  1. muduo 库中的 TCP 服务器
  2. EventLoop 对象:用于主线程循环监控连接事件
  3. 协议处理句柄
  4. 分发器:用于初始化协议处理器,便于把不同请求派发给不同的业务处理函数
  5. 连接管理句柄
  6. 虚拟机句柄
  7. 消费者管理句柄
  8. 线程池管理句柄
    成员方法:
    向外提供的只有 2 个方法:
  9. start:启动服务
  10. 构造函数:
  • 完成各项成员的初始化,
  • 注册 TCP 服务器的两个回调函数:
    OnMessage:从接收缓冲区把数据读到用户缓冲区后的回调函数
    OnConnection:Tcp 连接建立或断开时的回调函数
  • 给分发器注册业务处理函数(私有成员方法,共 12 个)
    信道打开与与关闭;交换机,队列,绑定添加与删除,订阅与取消订阅,发布与确认消息
    私有成员(业务处理函数):
    如果是创建或关闭信道,直接用连接管理句柄新增或删除信道,然后构建响应返回
    如果是其他请求,先用连接管理句柄找到信道(请求中携带了信道 id),再使用信道提供的服务

二.代码实践

BrokerServer.hpp:

#pragma once
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include "VirtualHost.hpp"
#include "Connection.hpp"
#include "Consumer.hpp"
#include <functional>
#include <stdio.h>
#include <unistd.h>namespace ns_server
{using ConnectionManagerPtr = std::shared_ptr<ns_connection::ConnectionManager>;using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;/************* 定义协议的结构化数据的智能指针(在分发器中注册时需要的格式)* *************/using OpenChannelRequestPtr = std::shared_ptr<ns_protocol::OpenChannelRequest>;using CloseChannelRequestPtr = std::shared_ptr<ns_protocol::CloseChannelRequest>;using DeclareExchangeRequestPtr = std::shared_ptr<ns_protocol::DeclareExchangeRequest>;using DeleteExchangeRequestPtr = std::shared_ptr<ns_protocol::DeleteExchangeRequest>;using DeclareMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeclareMsgQueueRequest>;using DeleteMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeleteMsgQueueRequest>;using BindRequestPtr = std::shared_ptr<ns_protocol::BindRequest>;using UnbindRequestPtr = std::shared_ptr<ns_protocol::UnbindRequest>;using PublishMessageRequestPtr = std::shared_ptr<ns_protocol::PublishMessageRequest>;using SubscribeQueueRequestPtr = std::shared_ptr<ns_protocol::SubscribeQueueRequest>;using CancelSubscribeRequestPtr = std::shared_ptr<ns_protocol::CancelSubscribeRequest>;using AckRequestPtr = std::shared_ptr<ns_protocol::AckRequest>;class BrokerServer{public:private:muduo::net::EventLoop _baseLoop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codecPtr;VirtualHostPtr _vhPtr;ConsumerManagerPtr _consumerManagerPtr;ConnectionManagerPtr _connManagerPtr;ThreadPoolPtr _threadPoolPtr;public:BrokerServer(int serverPort, const std::string &dbName, const std::string &msgDir): _baseLoop(),_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", serverPort), "TcpServer", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&BrokerServer::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)){// 初始化成员_codecPtr = std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));_vhPtr = std::make_shared<ns_data::VirtualHost>(dbName, msgDir);_threadPoolPtr = std::make_shared<ns_tp::ThreadPool>();_threadPoolPtr->start();std::vector<std::string> qnames;_vhPtr->getAllQueueName(&qnames);_consumerManagerPtr = std::make_shared<ns_consumer::ConsumerManager>(qnames);_connManagerPtr = std::make_shared<ns_connection::ConnectionManager>();// 给_server注册两个回调函数_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));// 给分发器注册业务处理函数_dispatcher.registerMessageCallback<ns_protocol::OpenChannelRequest>(std::bind(&BrokerServer::onOpenChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CloseChannelRequest>(std::bind(&BrokerServer::onCloseChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareMsgQueueRequest>(std::bind(&BrokerServer::onDeclareMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteMsgQueueRequest>(std::bind(&BrokerServer::onDeleteMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::BindRequest>(std::bind(&BrokerServer::onBind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::UnbindRequest>(std::bind(&BrokerServer::onUnbind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::SubscribeQueueRequest>(std::bind(&BrokerServer::onSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CancelSubscribeRequest>(std::bind(&BrokerServer::onCancelSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::PublishMessageRequest>(std::bind(&BrokerServer::onPublishMessage,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::AckRequest>(std::bind(&BrokerServer::onAck,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){// 开启监听状态_server.start();// 开始循环监控事件_baseLoop.loop();}private:// 给TcpServer设置的回调函数void onConnection(const muduo::net::TcpConnectionPtr &connPtr){if (connPtr->connected()){_connManagerPtr->newConnection(connPtr, _codecPtr, _vhPtr, _consumerManagerPtr, _threadPoolPtr);}else{_connManagerPtr->deleteConnection(connPtr);}}// 业务处理函数void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr, MessagePtr msgPtr, muduo::Timestamp time){cout << "未知消息" << endl;connPtr->shutdown();}/************* 信道创建与删除* ***************/void onOpenChannel(const muduo::net::TcpConnectionPtr &connPtr, const OpenChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "打开信道时, 未找到Connection" << endl;return;}myConnPtr->openChannel(*reqPtr);LOG(DEBUG) << "create new channel, channelId: " << reqPtr->channel_id() << endl;}void onCloseChannel(const muduo::net::TcpConnectionPtr &connPtr, const CloseChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "关闭信道时, 未找到Connection" << endl;return;}myConnPtr->closeChannel(*reqPtr);LOG(DEBUG) << "close channel, channelId: " << reqPtr->channel_id() << endl;}/********** 交换机声明与删除* ********/void onDeclareExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeclareExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareExchange(*reqPtr);LOG(DEBUG) << "声明交换机, exchangeName: " << reqPtr->exchange_name() << endl;}void onDeleteExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeleteExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteExchange(*reqPtr);LOG(DEBUG) << "删除信道, exchangeName: " << reqPtr->exchange_name() << endl;}/************* 队列声明与删除* ***************/void onDeclareMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeclareMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareMsgQueue(*reqPtr);LOG(DEBUG) << "声明队列, queueName: " << reqPtr->queue_name() << endl;}void onDeleteMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeleteMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteMsgQueue(*reqPtr);LOG(DEBUG) << "删除队列, queueName: " << reqPtr->queue_name() << endl;}/*********** 绑定与解绑* ***********/void onBind(const muduo::net::TcpConnectionPtr &connPtr, const BindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "添加绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->bind(*reqPtr);LOG(DEBUG) << "绑定: " << reqPtr->ename() << "->" << reqPtr->qname() << ": " << reqPtr->binding_key() << endl;}void onUnbind(const muduo::net::TcpConnectionPtr &connPtr, const UnbindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->unbind(*reqPtr);LOG(DEBUG) << "解绑: " << reqPtr->ename() << "->" << reqPtr->qname() << endl;}/************** 订阅与取消订阅* ************/void onSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const SubscribeQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->subscribeQueue(*reqPtr);LOG(DEBUG) << "订阅队列" << ", qname: " << reqPtr->qname() << endl;}void onCancelSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const CancelSubscribeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "取消订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->cancelSubscribe(*reqPtr);LOG(DEBUG) << "取消订阅队列" << ", qname: " << reqPtr->qname() << endl;}/********* 发布与应答* **************/void onPublishMessage(const muduo::net::TcpConnectionPtr &connPtr, const PublishMessageRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "发布消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->publishMessage(*reqPtr);LOG(DEBUG) << "publish message: " << reqPtr->msg().saved_info().body() << endl;}void onAck(const muduo::net::TcpConnectionPtr &connPtr, const AckRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "确认消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->ackMessage(*reqPtr);LOG(DEBUG) << "应答消息, msgId: " << reqPtr->msg_id() << endl;}void sendCommonResponse(const muduo::net::TcpConnectionPtr &connPtr, const std::string &channelId,const std::string &responseId, bool ok){ns_protocol::CommomResponse resp;resp.set_channel_id(channelId);resp.set_response_id(responseId);resp.set_ok(ok);_codecPtr->send(connPtr, resp);}};
}

三.服务端模块关系总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/54761.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【FastAPI】使用FastAPI和Redis实现实时通知(SSE)

在当今快速发展的Web应用程序中&#xff0c;实时通知已成为用户体验的重要组成部分。无论是社交媒体更新、消息通知&#xff0c;还是系统状态提醒&#xff0c;实时数据推送可以极大地提升用户互动性。本文将详细介绍如何使用FastAPI和Redis实现Server-Sent Events (SSE) 来推送…

模拟电路工程师面试题

一、基础知识题 描述三极管的基本工作原理及其三种工作状态。 分析:此题考察对三极管(NPN或PNP)基本工作原理的理解,包括截止区、放大区和饱和区的特点及其条件。解释什么是反馈,并说明正反馈和负反馈的区别。 分析:反馈是放大器设计中的重要概念,正反馈会增加放大器的增…

【AI】简单了解AIGC与ChatGPT

● AIGC&#xff08;AI-Generated Content&#xff0c;人工智能生成内容&#xff09;指的是利用人工智能技术自动生成内容&#xff0c;包括文本、图像、音频、视频等。AIGC的应用非常广泛。AIGC的核心在于利用AI技术来创造新的内容&#xff0c;提高生产效率&#xff0c;降低成本…

java mybaits oracle插入返回主键

在MyBatis中&#xff0c;要实现在插入数据后返回主键&#xff0c;可以在Mapper的XML文件中使用useGeneratedKeys属性和keyProperty属性。以下是一个示例&#xff1a; 首先&#xff0c;确保你的Oracle表有一个可以自动生成主键的字段&#xff0c;比如使用Oracle的序列。 CREAT…

JMeter(需要补充请在留言区发给我,谢谢)

一、学习工具 1、CinfigElement(HTTP Request Defaults、HTTP Header Manager、HTTP Authorization、CSV Data Set Config、User Defined Variables、JDBC Connection Configuration、HTTP Cookie Manager、Random Variable) 二、协议 1、HTTP协议&#xff08;消息体数据&am…

S开头的词根词缀:se-+sub-+suc/suf/supsur+sur-+super+sym/syn+

提到s这个词缀&#xff0c; 表异同&#xff0c;既表近似syn/syn&#xff0c;又表示分离se。 表方位&#xff0c;既表示上&#xff0c;又表示下。 se 70.se-表示"分开&#xff0c;离开&#xff0c;区别开" secede正式退出&#xff08;组织&#xff09;&#xff08;…

oracle 数据库中的异常和游标管理

异常和游标管理 游标&#xff1a; 用来查询数据库&#xff0c;获取记录集合&#xff08;结果集&#xff09;的指针&#xff0c;可以让开发者一次访问一行结果集&#xff0c;在每条结果集上作操作。 分类&#xff1a; 静态游标&#xff1a; 分为显式游标和隐式游标。 REF游标&…

Golang | Leetcode Golang题解之第433题最小基因变化

题目&#xff1a; 题解&#xff1a; func diffOne(s, t string) (diff bool) {for i : range s {if s[i] ! t[i] {if diff {return false}diff true}}return }func minMutation(start, end string, bank []string) int {if start end {return 0}m : len(bank)adj : make([][…

OpenHarmony标准系统mipi摄像头适配

OpenHarmony标准系统mipi摄像头适配 本文档以rk3568为例&#xff0c;讲述如何在OpenHarmony 标准系统rk设备上适配mipi摄像头。 开发环境 OpenHarmony标准系统4.1rrk3568设备摄像头ov5648,ov8858 文档约定&#xff1a;4.1r_3568为OpenHarmony标准系统源码根目录 1.适配准备:得…

C++条件变量详解(一看就懂)

首先&#xff0c;我们先来认识一下条件变量。 条件变量是一种同步原语&#xff0c;通常用于在多线程编程中&#xff0c;使一个线程在特定条件满足之前等待&#xff0c;同时允许其他线程在该条件发生更改时通知等待的线程。 1. “等待”&#xff1a;当条件不满足时&#xff08;…

树莓派pico上手

0 介绍 不同于作为单板计算机的树莓派5&#xff0c;树莓派 pico 是一款低成本、高性能的微控制器板&#xff0c;具有灵活的数字接口。主要功能包括&#xff1a; 英国树莓派公司设计的 RP2040 微控制器芯片双核 Arm Cortex M0 处理器&#xff0c;弹性的时钟频率高达 133 MHz26…

js 如何监听 body 内容是否改变

如果您想监听body内容的变化&#xff0c;并作出响应&#xff0c;可以使用MutationObserver。以下是一个简单的例子&#xff0c;它会在body内容变化时在控制台输出一条消息&#xff1a; // 创建一个观察者对象 const observer new MutationObserver(function(mutations, obser…

Spring AOP的应用

目录 1、maven坐标配置与xml头配置 2、代理方式的选择与配置 3、AOP的三种配置方式 3.1、XML模式 3.1.1 创建目标类和方法 3.1.2 创建切面 3.1.3 切面xml配置与表达式说明 3.1.4 单测 3.2 纯注解模式 3.2.1 开启注解相关配置 3.2.2 创建目标类和方法 3.2.3 创建切面…

FGPA实验——触摸按键

本文系列都基于正点原子新起点开发板 FPGA系列 1&#xff0c;verlog基本语法&#xff08;随时更新&#xff09; 2&#xff0c;流水灯&#xff08;待定&#xff09; 3&#xff0c;FGPA实验——触摸按键 一、触摸操作原理实现 分类&#xff1a;电阻式&#xff08;不耐用&…

二叉树进阶

目录 1. 二叉搜索树实现 1.1 二叉搜索树概念 2.2 二叉搜索树操作 ​编辑 ​编辑 2.3 二叉搜索树的实现 2.3.0 Destroy() 析构 2.3.1 Insert&#xff08;&#xff09;插入 2.3.2 InOrder&#xff08;&#xff09; 打印搜索二叉树 ​编辑​编辑 2.3.3 Find() 查找 …

el-table表格点击该行任意位置时也勾选上其前面的复选框

需求&#xff1a;当双击表格某一行任意位置时&#xff0c;自动勾选上其前面的复选框 1、在el-table 组件的每一行添加row-dblclick事件&#xff0c;用于双击点击 <el-table:data"tableData"ref"tableRef"selection-change"handleSelectionChange&q…

几种主流的`Content-Type`与其对应的数据格式的例子

application/json: 用于发送和接收JSON格式的数据。例如&#xff0c;可以使用以下代码将JSON数据发送到服务器&#xff1a; $.ajax({url: "/api/endpoint",type: "POST",contentType: "application/json",data: JSON.stringify({ key: "va…

如何在Chrome最新浏览器中调用ActiveX控件?

小编最近登陆工商银行网上银行&#xff0c;发现工商银行的个人网银网页&#xff0c;由于使用了ActiveX安全控件&#xff0c;导致不能用高版本Chrome浏览器打开&#xff0c;目前只有使用IE或基于IE内核的浏览器才能正常登录网上银行&#xff0c;而IE已经彻底停止更新了&#xff…

AI绘图网页版工具

https://chat.bushao.info/?inVitecodeCHBEPQQOOM 一款AI绘图工具&#xff0c;很好玩&#xff0c;推荐&#xff1b; 我自己根据文本生成的图&#xff0c;感觉还不错。

深入理解Java中的序列化与反序列化

目录 1. 引言 2. 什么是序列化&#xff1f; 3. 为什么需要序列化&#xff1f; 4. 如何实现序列化&#xff1f; 5. 示例代码 6. 序列化和反序列化操作 7. 注意事项 8. 拓展&#xff1a;Transient关键字 9. 拓展&#xff1a;序列化的性能优化 10. 结论 1. 引言 在软件…