VEC系列-RabbitMQ 入门笔记

消息队列(MQ)对于开发者来说是一个经常听到的词汇,但在实际开发中,大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的详细讲解,官网文档和技术博客也都介绍得很深入,因此,我在这里就不再赘述。

我一直认为,学习一项技术不仅要知道它是什么,更重要的是知道怎么用,以及在哪些场景下应该用。所以这篇文章主要就是站在一个新手的角度进行描述以及实现MQ的实际运用。

使用MQ的常见情景

  1. 系统解耦:比如电商系统,订单系统 → 库存系统 → 物流系统 订单系统发送“新订单”消息到 MQ,库存系统和物流系统各自订阅处理。即使库存系统或物流系统短暂不可用,消息仍然可以暂存,系统整体不会受影响。这一方面说实话不是架构师也没必要太过关注,毕竟系统的底层普通开发也没这个资格去搭建。只是用于了解,不要因为这段话阻拦学习的脚步。

  2. 流量削峰,降低并发:这个比较好理解,也是最能遇到的情况。用户请求先进入 MQ 队列,由后台的消费端按照数据库的最大承载能力逐步处理请求。确保数据库不会被瞬间压垮,提高系统稳定性。还是电商系统常用些。

  3. 异步任务处理:邮件、短信、推送通知,日志处理等。

理论上MQ能做的不止这些,抛砖引玉,一起深入学习吧。

对MQ进行拆分理解

MQ里常说生产者,消费者等。我会通过简单的例子来描述:

  • 生产者:一个游戏,我是GM,我要发送公告,玩家分为普通玩家和VIP玩家等。在这里,发布公告的人就是消息的生产者。应该很好理解嗷?

  • 交换器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前还是要舔下的……那么我会发布一条给普通玩家的消息,和一条给VIP玩家的消息。交换器的作用在我看来就是消息的承载体,类似一条运输船,负责把消息运输给玩家们。产生消息的地方很多,但是交换器不用关心是谁发布了消息,他只承载你的消息。

  • 队列:如上述,有了运输船。那么队列有点像是码头了。普通玩家进普通码头,VIP玩家进黄金码头。各自码头停泊各自的船。总不会在普通码头取出黄金码头的货哦?

补充:交换器是有类型的:Direct(直连交换器)Fanout(扇形交换器)Topic(主题交换器)Headers(头交换器)

概念不多说了。比较常用的是Direct,Fanout

Direct:通过路由键进行匹配,运输船是一艘,但是分为普通区和VIP区,玩家凭借船票(路由键)进行取货(取消息)

Fanout:只要是是绑定了某个交换器的队列都能进行取货。玩家进普通码头就拿普通货,进黄金码头拿黄金货。当然这是举例子,玩家的队列还是要看你如何分配的。

  • 消费者:说了这么多,玩家就是消费者嗷。

MQ代码演示 

最新代码是通过 事件总线 来跨方法传递信息和触发动作。通过发布和订阅事件,模块之间能够解耦通信,使得事件的发布和处理不再依赖于直接调用方法的方式,而是通过事件总线进行跨模块、跨方法的异步传递。这种方式提高了系统的灵活性和扩展性,同时保持了模块之间的松耦合。

长代码警告,有兴趣可以fork仓库进行实际操练 VerEasy.Core

必要的知识点大致如此,通过代码+注释的形式来演示更好理解。

我这里是NETCore项目,所以还是接口的形式方便依赖注入。

接口部分代码

    public interface IRabbitMQPersistentConnection{/// <summary>/// 是否已经连接:判断MQ是否是连接状态/// </summary>bool IsConnected { get; }/// <summary>/// 尝试连接:断连重连方法/// </summary>/// <returns></returns>Task<bool> TryConnectAsync();/// <summary>/// 唯一通道:发布通道可以随时关闭,消费通道需要保持打开状态,否则无法进行消费。/// </summary>IChannel Channel { get; }/// <summary>/// 唯一连接:同理,一个连接可以有N个通道,无需建立过多连接。/// </summary>IConnection Connection { get; }/// <summary>/// 释放/// </summary>/// <returns></returns>Task DisposeAsync();/// <summary>/// 发布:发布消息/// </summary>/// <param name="msg"></param>/// <param name="exChangeName"></param>/// <param name="routeKey"></param>/// <param name="type"></param>/// <returns></returns>Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);/// <summary>/// 订阅:订阅队列。/// </summary>/// <returns></returns>Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);}

 接口实现部分代码

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{//构造函数注入,获取MQ的地址账号密码端口,如果不传就用我默认配置的。public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5){_connectionFactory = connectionFactory ?? new ConnectionFactory{HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};//使用Policy进行重连,这个是重连次数=5_retryCount = retryCount;}//私有变量,获取连接成功时创建的Mq通道。private IChannel _channel = default!;public IChannel Channel{get{return _channel;}}/// <summary>/// RabbitMQ 连接工厂/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 私有变量 RabbitMQ 连接上下文/// </summary>private IConnection _connection = default!;/// <summary>/// 重连次数/// </summary>private readonly int _retryCount;/// <summary>/// 标志是否已释放/// </summary>private bool _disposed;/// <summary>/// 是否有效连接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}public IConnection Connection{get{return _connection;}}/// <summary>/// 手动释放/// </summary>/// <returns></returns>public async Task DisposeAsync(){if (_disposed) return;_disposed = true;try{await _connection.DisposeAsync();}catch (IOException ex){Console.WriteLine(ex.Message);}}/// <summary>/// 重连机制/// </summary>/// <returns></returns>public async Task<bool> TryConnectAsync(){var policy = Policy.Handle<SocketException>()//捕获连接异常.Or<BrokerUnreachableException>()//无法连接异常.WaitAndRetryAsync(_retryCount, x =>TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>{//日志});try{await policy.ExecuteAsync(async () =>{//重建连接【赋值给私有化变量,通过get同步给接口里的Connection和Channel】_connection = await _connectionFactory.CreateConnectionAsync();_channel = await _connection.CreateChannelAsync();});//如果连接成功if (IsConnected){// 连接成功后,注册连接关闭、异常、阻塞的事件处理程序_connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;_connection.CallbackExceptionAsync += OnCallbackExceptionAsync;_connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;return true;}else{return false;}}catch (Exception ex){Console.WriteLine($"重连失败,最终抛出异常: {ex.Message}");return false;}}private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ连接关闭,正在尝试重连...");await TryConnectAsync();}private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e){if (_disposed) return;Console.WriteLine($"RabbitMQ连接出现异常,正在尝试重连... 异常信息: {e.Exception.Message}");await TryConnectAsync();}private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ连接被阻塞,正在尝试重连...");await TryConnectAsync();}//发布消息public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){//判断是否连接状态,没有连接就重连if (!IsConnected){await TryConnectAsync();}//创建通道,因为是发布消息,通道不用常打开,所以使用了USINGusing var channel = await _connection.CreateChannelAsync();//【ExchangeDeclareAsync】声明交换机,exchange:交换机名称,type:交换机类型await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);//msg就是消息,需要传递Byte[]var body = Encoding.UTF8.GetBytes(msg);//启动消息持久化,我的项目里使用MQ来进行公告的推送,使用的Fanout类型交换机,故此消息保持持久化。var properties = new BasicProperties(){Persistent = true,};//发布消息await channel.BasicPublishAsync(exchange: exChangeName,routingKey: routeKey,mandatory: false,basicProperties: properties,body: body);}//订阅消息public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){if (!IsConnected){await TryConnectAsync();}//【queue】队列string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;//【durable】持久化队列,MQ服务器不会删除它。QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false);//根据queue,exchange,routingKey 对 交换机和队列进行绑定,如果是Fanout类型不需要routeKey。await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);//创建消费者var consumer = new AsyncEventingBasicConsumer(Channel);//消费者消费后执行方法consumer.ReceivedAsync += async (model, ea) =>{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);//确认消息已被消费,这样后续该消息就不会被该队列继续消费到了。await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);};//启动消费者队列,将消费者和队列绑定await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);}}

 

MQ服务注入

            if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var connectionFactory = new ConnectionFactory(){HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};var mq = new RabbitMQPersistentConnection(connectionFactory);return mq;});}

 


我在注入各种服务时,添加了一些日志进行输出,效果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/897127.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js中??是什么意思

在 JavaScript 中&#xff0c;?? 是一个逻辑运算符&#xff0c;称为 空值合并运算符&#xff08;Nullish Coalescing Operator&#xff09;。它用于检查左侧的值是否为 null 或 undefined&#xff0c;如果是&#xff0c;则返回右侧的值&#xff1b;否则返回左侧的值。 语法 …

常见限流算法

限流是指在高并发、大流量请求的情况下&#xff0c;限制新的流量对系统的访问&#xff0c;以保证系统服务的安全性。常见的限流算法及其详细介绍如下&#xff1a; 计数器算法&#xff08;Fixed Window Counter&#xff09; 原理&#xff1a;使用一个固定时间窗口内的计数器来…

YOLOv12本地部署教程——42%速度提升,让高效目标检测触手可及

YOLOv12 是“你只看一次”&#xff08;You Only Look Once, YOLO&#xff09;系列的最新版本&#xff0c;于 2025 年 2 月发布。它引入了注意力机制&#xff0c;提升了检测精度&#xff0c;同时保持了高效的实时性能。在保持速度的同时&#xff0c;显著提升了检测精度。例如&am…

【原创】C# HttpClient 读取流数据的问题

默认情况下HttpClient中有缓存&#xff0c;在读取流数据的时候&#xff0c;往往要等一小会儿&#xff0c;然后读出一大堆。 我们在请求OpenAI类的大模型的时候&#xff0c;往往要一边读取一边显示&#xff08;输出&#xff09;&#xff0c;这时候需要禁止HttpClient 中内置的缓…

能源行业标杆:信创系统在智能电网中的3个创新应用案例

在当今数字化浪潮汹涌澎湃的时代&#xff0c;信息技术应用创新&#xff08;信创&#xff09;已成为推动我国经济社会发展的重要引擎。智能电网作为能源行业的核心领域&#xff0c;其信息化建设对于保障国家能源安全和促进能源转型具有重要意义。今天&#xff0c;让我们一同探索…

AcWing 蓝桥杯集训·每日一题2025·5526. 平衡细菌

5526. 平衡细菌 题意 给定一个序列 ( a i ) (a_i) (ai​)&#xff0c;每次操作可以选择一个位置 (p)&#xff0c;令从 ( a p ) (a_p) (ap​) 开始的每个数都加上一个以 (1) 或者 (-1) 为公差的从 ( 1 / − 1 ) (1 / -1) (1/−1) 开始的等差数列。求最小化让序列归零的操作…

PTA 7-6 列出连通集

题目详情&#xff1a; 给定一个有 n 个顶点和 m 条边的无向图&#xff0c;请用深度优先遍历&#xff08;DFS&#xff09;和广度优先遍历&#xff08;BFS&#xff09;分别列出其所有的连通集。假设顶点从 0 到 n−1 编号。进行搜索时&#xff0c;假设我们总是从编号最小的顶点出…

ES中数据刷新策略refresh

在 Elasticsearch 中&#xff0c;插入数据时的 refresh 参数控制文档在写入后何时对搜索可见&#xff0c;其行为直接影响数据可见性和系统性能。以下是 refresh 参数的三个可选值&#xff08;true、false、wait_for&#xff09;的详细说明及适用场景&#xff1a; 1. refreshtr…

用Python的Pandas库解锁数据科学:从入门到实战

用Python的Pandas库解锁数据科学&#xff1a;从入门到实战 引言 Python的Pandas库&#xff08;名称源自"Panel Data"&#xff09;作为数据科学生态系统的基石&#xff0c;凭借其强大的数据结构和灵活的操作功能&#xff0c;已成为全球超过90%数据工作者的首选工具。…

如何提高域名解析速度?

在搭建网站或使用在线服务时&#xff0c;许多人会问&#xff1a;“为什么我的网站加载速度这么慢?”“如何提高域名解析速度?”“域名解析速度对网站性能有什么影响?”域名解析速度直接影响用户访问网站的体验&#xff0c;因此&#xff0c;了解如何提高域名解析速度尤为重要…

深度学习语义分割数据集全景解析

一、语义分割任务概述 语义分割是计算机视觉领域的核心任务之一&#xff0c;目标是通过算法将图像中的每个像素精准划分到对应的语义类别&#xff08;如道路、车辆、行人等&#xff09;。高质量标注数据集是推动该领域发展的关键因素。本文将系统梳理主流数据集的技术特征与适…

贪心算法一

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解什么是贪心算法&#xff0c;并且掌握贪心算法。 > 毒鸡汤&#xff1a;有些事情&#xff0c;总是不明白&#xff0c;所以我不会坚持。早安! >…

基于websocket的多用户网页五子棋 --- 测试报告

目录 功能测试自动化测试性能测试 功能测试 1.登录注册页面 2.游戏大厅页面 3.游戏房间页面 自动化测试 1.使用脑图编写web自动化测试用例 2.创建自动化项目&#xff0c;根据用例通过selenium来实现脚本 根据脑图进行测试用例的编写&#xff1a; 每个页面一个测试类&am…

docker学习与使用

一、docker概述 1.docker是什么 是一个开源的应用容器引擎&#xff0c;基于go语言开发并遵循apache2.0协议开源 是在Linux容器里运行应用的开源工具 是一种轻量级的 “虚拟机” Docker的容器技术,可以在一台主机上轻松为任何应用创建一个轻量级的、可移植的、自给自足的容器…

2025-03-04 学习记录--C/C++-C语言 判断是否是素数

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; C语言 判断是否是素数 一、代码 ⭐️ #include <stdio.h> #include <stdbool.h> // 使用 bool 类型// 判断是否是…

如何将飞书多维表格与DeepSeek R1结合使用:效率提升的完美搭档

将飞书的多维表格与DeepSeek R1结合使用&#xff0c;就像为你的数据管理和分析之旅装上一台涡轮增压器。两者的合作&#xff0c;不仅仅在速度上让人耳目一新&#xff0c;更是将智能化分析带入了日常的工作场景。以下是它们如何相辅相成并改变我们工作方式的一些分享。 --- 在…

离散傅里叶变换(Discrete Fourier Transform, DFT)及其在图像处理中的应用

离散傅里叶变换&#xff08;DFT&#xff09;及其在图像处理中的应用 什么是离散傅里叶变换&#xff1f; 离散傅里叶变换&#xff08;Discrete Fourier Transform, DFT&#xff09;是一种强大的数学工具&#xff0c;用于将离散信号从时域&#xff08;或空间域&#xff09;转换…

在 macOS 上使用 CLion 进行 Google Test 单元测试

介绍 Google Test&#xff08;GTest&#xff09;是 Google 开源的 C 单元测试框架&#xff0c;它提供了简单易用的断言、测试夹具&#xff08;Fixtures&#xff09;和测试运行机制&#xff0c;使 C 开发者能够编写高效的单元测试。 本博客将介绍如何在 macOS 上使用 CLion 配…

Oracle SQL优化实战要点解析(11)——索引、相关子查询及NL操作(1)

11.1. 充分利用索引有序特性,避免发生大表上的FTS,以及对中间大数据集的排序。 11.1.1. 适用场景 从一个或多个大表(例如:亿行级或TB级数据量)中过滤出全列大数据集(例如:数百万或千万行数据),对该大数据集按其中某列进行排序,最终,只取最前面的少部分数据(例如:…

软考架构师笔记-计算机网络

1.9 计算机网络 OSI/RM 七层模型 物理层 二进制传输(中继器、集线器) (typedef) 数据链路层 传送以帧为单位的信息(网桥、交换机、网卡) 网络层 分组传输和路由选择(三层交换机、路由器)ARP/RARP/IGMP/ICMP/IP 传输层 端到端的连接(TCP/UDP)在前向纠错系统中&#xff0c;当接…