项目地址
https://github.com/sansantang/Jonckers.RabbitMQ.HttpApi.Order
1 支持自定义 QoS (默认 PrefetchSize = 0, PrefetchCount = 1, Global = false)
2 支持死信队列
怎么使用
1. 服务注册
appsettings.json
{"RabbitMQConnection": {"HostName": "192.168.49.151", // RabbitMQ 主机"Port": 5672, // 端口(默认5672)"UserName": "admin", // 用户名(默认guest)"Password": "admin123", // 密码(默认guest)"VirtualHost": "/", // 虚拟主机(默认/)"ExchangeName": "DefaultExchange", // 默认交换机"RetryCount": 3, // 重试次数"ConnectionTimeout": 10 // 连接超时时间(秒)}
}
在Program.cs中有以下代码:
builder.Services.AddMyRabbitMQ(builder.Configuration);
//注册消费者
//builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());
//...
app.UseMyEventHandler();
2. 生产者


using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.requestevent")]public class PerryTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}
在WeatherForecastController中:
public IMyPublisher<PerryTest> TestPublisher { get; }public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<PerryTest> testPublisher)
{_logger = logger;TestPublisher = testPublisher;
}
发送消息
当TestAsync方法被调用时:
[HttpGet("test")]
public async Task<string> TestAsync()
{var data = new PerryTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "测试一下"};await TestPublisher.PublishAsync(data);return "发送了一条消息";
}
此时使用的TestPublisher就是通过上述过程创建的MyPublisher<PerryTest>实例,该实例是通过第2个构造函数初始化的。
3. 消费者
注册消费者
builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());
创建消费者
using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class PerryTestEventHandler : MyEventHandler<PerryTest>{public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}
配置 Qos 参数
| 参数名 | 类型 | 含义 | 推荐值 | 说明 |
|---|---|---|---|---|
| prefetchSize | ushort |
每次预取消息的总大小限制(字节) | 0 (不限制) |
一般用不到,设为 0 表示不限制消息大小 |
| prefetchCount | ushort |
每次预取的消息数量上限(未确认的消息数) | 1 ~ N(根据业务调整) |
最关键参数!控制未 Ack 消息数 |
| global | bool |
是否应用到该连接上的所有消费者 | false (推荐) |
如果为 true ,则对所有消费者生效;一般设为 false ,针对每个消费者单独设置 |
public class PerryTestEventHandler : MyEventHandler<PerryTest>
{public PerryTestEventHandler(){// 配置 QoS 参数Options.PrefetchSize = 0;Options.PrefetchCount = 2; // 每次处理2条消息Options.Global = false;}public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}
}

死信队列
过期或拒绝到死信队列
1 注册
启用 isWithDeadLetter = ture, 才能设置过期时间 expirationMilliseconds
using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.deadletter", routingkey: "jonckers.enterpriseordering.deadletter", isWithDeadLetter: true, expirationMilliseconds: 60000)]public class DeadLetterTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}
2 生产者

发送消息 PublishWithDeadLetterAsync
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{public IMyPublisher<DeadLetterTest> TestPublisher { get; }private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<DeadLetterTest> testPublisher){_logger = logger;TestPublisher = testPublisher;}[HttpGet("test")]public async Task<string> TestAsync(){var data = new DeadLetterTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "哈哈哈"};await TestPublisher.PublishAsync(data);return "发送了一个消息";}
}
3 消费者:
1 注册消费者
// 注册到services
builder.Services.AddMyRabbitMQEventHandlers(typeof(DeadLetterTestEventHandler).Assembly);
2 监听正常的消费者
using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class DeadLetterTestEventHandler : MyEventHandler<DeadLetterTest>{public override Task OnReceivedAsync(DeadLetterTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}
3 监听死信
需要自己再监听一个死信队列,注意命名
var deadLetterExchangeName = _exchangeName + ".dlx-exchange";
var deadLetterQueueName = _queueName + ".dlx-queue";
var deadLetterRoutingKey = _routingKeyName + ".dlrk-routingKey";
参考
https://gitee.com/wosperry/wosperry-rabbit-mqtest