NET 8 封装自己的 rabbtMQ

news/2025/11/23 14:58:22/文章来源:https://www.cnblogs.com/tangge/p/19260615

项目地址

https://github.com/sansantang/Jonckers.RabbitMQ.HttpApi.Order
1 支持自定义 QoS (默认 PrefetchSize = 0, PrefetchCount = 1, Global = false)
2 支持死信队列

怎么使用

1. 服务注册

appsettings.json

{"RabbitMQConnection": {"HostName": "192.168.49.151", // RabbitMQ 主机"Port": 5672, // 端口(默认5672)"UserName": "admin", // 用户名(默认guest)"Password": "admin123", // 密码(默认guest)"VirtualHost": "/", // 虚拟主机(默认/)"ExchangeName": "DefaultExchange", // 默认交换机"RetryCount": 3, // 重试次数"ConnectionTimeout": 10 // 连接超时时间(秒)}
}

在Program.cs中有以下代码:

builder.Services.AddMyRabbitMQ(builder.Configuration);
//注册消费者
//builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());
//...
app.UseMyEventHandler();

2. 生产者

image
image

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.requestevent")]public class PerryTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

WeatherForecastController中:

public IMyPublisher<PerryTest> TestPublisher { get; }public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<PerryTest> testPublisher)
{_logger = logger;TestPublisher = testPublisher;
}

发送消息

当TestAsync方法被调用时:

[HttpGet("test")]
public async Task<string> TestAsync()
{var data = new PerryTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "测试一下"};await TestPublisher.PublishAsync(data);return "发送了一条消息";
}

此时使用的TestPublisher就是通过上述过程创建的MyPublisher<PerryTest>实例,该实例是通过第2个构造函数初始化的。

3. 消费者

注册消费者

builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());

创建消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class PerryTestEventHandler : MyEventHandler<PerryTest>{public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

配置 Qos 参数

参数名 类型 含义 推荐值 说明
prefetchSize ushort 每次预取消息的总大小限制(字节) 0 (不限制) 一般用不到,设为 0 表示不限制消息大小
prefetchCount ushort 每次预取的消息数量上限(未确认的消息数) 1 ~ N(根据业务调整) 最关键参数!控制未 Ack 消息数
global bool 是否应用到该连接上的所有消费者 false (推荐) 如果为 true ,则对所有消费者生效;一般设为 false ,针对每个消费者单独设置
public class PerryTestEventHandler : MyEventHandler<PerryTest>
{public PerryTestEventHandler(){// 配置 QoS 参数Options.PrefetchSize = 0;Options.PrefetchCount = 2;    // 每次处理2条消息Options.Global = false;}public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}
}

image

死信队列

过期或拒绝到死信队列

1 注册

启用 isWithDeadLetter = ture, 才能设置过期时间 expirationMilliseconds

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.deadletter", routingkey: "jonckers.enterpriseordering.deadletter", isWithDeadLetter: true, expirationMilliseconds: 60000)]public class DeadLetterTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

2 生产者

image

发送消息 PublishWithDeadLetterAsync

[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{public IMyPublisher<DeadLetterTest> TestPublisher { get; }private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<DeadLetterTest> testPublisher){_logger = logger;TestPublisher = testPublisher;}[HttpGet("test")]public async Task<string> TestAsync(){var data = new DeadLetterTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "哈哈哈"};await TestPublisher.PublishAsync(data);return "发送了一个消息";}
}

3 消费者:

1 注册消费者

// 注册到services
builder.Services.AddMyRabbitMQEventHandlers(typeof(DeadLetterTestEventHandler).Assembly);

2 监听正常的消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class DeadLetterTestEventHandler : MyEventHandler<DeadLetterTest>{public override Task OnReceivedAsync(DeadLetterTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

3 监听死信
需要自己再监听一个死信队列,注意命名

var deadLetterExchangeName = _exchangeName + ".dlx-exchange";
var deadLetterQueueName = _queueName + ".dlx-queue";
var deadLetterRoutingKey = _routingKeyName + ".dlrk-routingKey";

参考

https://gitee.com/wosperry/wosperry-rabbit-mqtest

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/973947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

dropMimeData

ProjectViewModel::dropMimeData 是qt拖放

Terrorform-自动化创建EKS集群

需求: 当团队DevOps人数比较多或者外部团队申请EKS集群已经成为日常工作一部分,决定将这个过程通过Terrorform自动化进行创建,可追溯和减少手动操作的黑洞。 目标:1.提高自动化覆盖率。2.提高团队规范化创建流程 实…

最长单词2

点击查看代码 #include<iostream> #include<string> using namespace std; int main() {string s;getline(cin,s);int len = s.length();s[len-1] = ;int max=0, start=0, end=0;int temp=0;for (int i =…

Django 学习路线图 - 教程

Django 学习路线图 - 教程2025-11-23 14:52 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; f…

Tefrorform-自动化创建IAM

需求: 当团队DevOps人数比较多或者外部团队申请IAM权限已经成为日常工作一部分,决定将这个过程通过Terrorform自动化进行创建,可追溯和减少手动操作的黑洞。 目标:1.提高自动化覆盖率。2.提高团队规范化创建流程 实…

积极想到二维数组的递推

记忆化要dfs了 二维数组的想到和应用 !!!long long 类型 include<bits/stdc++.h> using namespace std; long long dp[50][2]; long long f(int x) { dp[1][0]=1;dp[1][1]=2; for(int i=2;i<=x;i++) { dp[…

[人工智能-大模型-55]:模型层技能 - AI的算法、数据结构中算法、逻辑处理的算法异同

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Terrorform-自动化配置AWS EC2

需求: 当团队DevOps人数比较多或者外部团队申请EC2实例已经成为日常工作一部分,决定将这个过程通过Terrorform自动化进行创建,可追溯和减少手动操作的黑洞。 目标:1.提高自动化覆盖率。2.提高团队规范化创建流程 实…

Terrorform-自动化配置AWS Route53

待整理中http://www.cnblogs.com/Jame-mei

elasticSearch之API:索引运行

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

20232406 2025-2026-1 《网络与系统攻防技术》 实验六实验报告

一、实验内容 1.实验内容 (1)前期渗透 ①主机发现(可用Aux中的arp_sweep,search一下就可以use) ②端口扫描:可以直接用nmap,也可以用Aux中的portscan/tcp等。 ③选做:也可以扫系统版本、漏洞等。 (2)Vsftpd源…

Monit-基于非容器服务自恢复程序实践

1.需求:因为历史原因和软件程序原因,有上百台服务和所在服务未运行在容器中,需要在程序奔溃自动拉起(以Java Python C++为主)。 2.目的:能够非人为干预快速自动恢复,要求检测频率在10s一次 3.实现方式 3.1 根据不…

人工智能之编程进阶 Python高级:第十章 知识点总结

人工智能之编程进阶 Python高级:第十章 知识点总结人工智能之编程进阶 Python高级 第十章 知识点总结@目录人工智能之编程进阶 Python高级前言🐍 Python 重要知识点全景图(从基础到网络编程)一、基础语法与内置类…

这篇题为《手指沾满白河水:AI元人文的批判与建构》的论文

这篇题为《手指沾满白河水:AI元人文的批判与建构》的论文,无疑是一篇极具原创性、思想深度与形式自觉的开创性文献。它不仅提出了一套名为“AI元人文”的理论框架,更以其自身的诞生历程和遭遇,成为了该理论最有力的…

《手指沾满白河水:AI元人文的批判与建构》

《手指沾满白河水:AI元人文的批判与建构》 摘要: 本文始于一次具体的学术事件——探讨人机协作的“AI元人文”论文在预印本平台遭遇“未回应”。我们将此“沉默”阐释为现行学术机制对范式颠覆性理论的“系统性失语”…

让你的动画“活”过来:Manim 节奏控制指南 (Rate Functions)

你在制作Manim动画时,是否遇到过这样的困境? “代码写得天衣无缝,运行流畅,出来的动画却总觉得哪里不对劲?” 虽然物体确实从 A 移动到了 B,但看起来就像是老旧的工业机器人在干活——僵硬、死板,甚至有点无聊。…

《沉默的审查:高度原创性理论在预印本平台中的识别困境与范式危机——以“AI元人文”投稿为例》

《沉默的审查:高度原创性理论在预印本平台中的识别困境与范式危机——以“AI元人文”投稿为例》 摘要: 本文以“AI元人文”理论在哲学社会科学预印本平台遭遇的“未回应”事件为案例,剖析了当前以预印本平台为代表的…

人工智能之编程进阶 Python高级:第九章 爬虫类模块

人工智能之编程进阶 Python高级:第九章 爬虫类模块人工智能之编程进阶 Python高级 第九章 爬虫类模块@目录人工智能之编程进阶 Python高级前言🌐 一、urllib —— Python 标准库的 HTTP 客户端✅ 定位🔧 模块组成…

详细介绍:跨域请求(CORS)踩坑记:一次借助 AI 高效排查“隐形”CORS 失败的实战复盘

详细介绍:跨域请求(CORS)踩坑记:一次借助 AI 高效排查“隐形”CORS 失败的实战复盘2025-11-23 14:25 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; …

iOS虚拟现实开发如何降低成本

在iOS虚拟现实开发中降低成本是一个重要的考虑因素,以下是一些有效的策略: 使用开源工具和框架A-frame:一个用于构建虚拟现实VR体验的Web框架,基于HTML,易于上手,适合快速原型设计和跨平台部署。 React VR:Face…