成都市金牛区建设和交通局网站手机软件下载大全
news/
2025/9/22 21:12:34/
文章来源:
成都市金牛区建设和交通局网站,手机软件下载大全,天津网站建设公司,网站开发可选择方案有哪些背景
最近遇到了一个问题#xff0c;在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题#xff0c;使用的是.net框架#xff0c;背景是高并发压力下的mq消费#xff0c;按理说即使队列中堆了几百条消息#xff0c;我客户端可以同处理5个消息。
原因是多线程…背景
最近遇到了一个问题在使用rabbitmq的时候出现了丢消息、消息重复消费等一系列的问题使用的是.net框架背景是高并发压力下的mq消费按理说即使队列中堆了几百条消息我客户端可以同处理5个消息。
原因是多线程同时处理时导致的内存混乱。
官方文档已经解释的很全面了https://www.rabbitmq.com/dotnet-api-guide.html
一个简易的单线程消费者
注意如下代码这只是一个简易的单线程同步的消费者 每次消费1条消息消息消费完进行手动ack
Task.Run(() {AutoResetEvent autoResetEvent new AutoResetEvent(false);ConnectionFactory factory new ConnectionFactory();// guest/guest by default, limited to localhost connectionsfactory.UserName user;factory.Password pass;factory.VirtualHost vhost;factory.HostName hostName;// this name will be shared by all connections instantiated by// this factoryfactory.ClientProvidedName app:audit component:event-consumer;IConnection conn factory.CreateConnection();using (IModel channel conn .CreateModel()){channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);channel.QueueDeclare(queueName, false, false, false, null);channel.QueueBind(queueName, exchangeName, routingKey, null);consumer.Received (ch, ea) {var body ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: my-queue,autoAck: false,consumer: consumer);}ConsoleUtil.WriteLine(mq started);autoResetEvent.WaitOne();ConsoleUtil.WriteLine(mq shutdown);}
});批量消费
好的那么我现在想要同时消费5条消息想要达到并行的效果需要如何改代码呢看下面的改动
改动1
先看两个概念 prefetchCount预取计数 prefetchCount 是一个用来限制每个消费者一次性从队列中获取的消息数量的参数。当你有多个消费者同时连接到同一个队列时RabbitMQ 可以将消息均匀地分发给这些消费者。通过设置 prefetchCount你可以告诉 RabbitMQ 每个消费者一次最多获取多少条消息。这个参数的目的是确保消息在被消费者处理之前不会全部放到内存中从而提高系统的稳定性和性能。它有助于避免 一个消费者获取了太多消息而导致其他消费者无法获取任何消息的情况。 concurrentConsumers并发消费者 concurrentConsumers 是指在同一队列上允许多少个并发消费者。每个并发消费者都会独立地处理消息这有助于提高系统的处理能力和吞吐量。通过增加 concurrentConsumers 数量你可以增加并发处理消息的能力。注意这个参数不同于 prefetchCount它控制的是同时运行的消费者的数量而不是单个消费者一次性获取的消息数量。
// 参数1prefetchSize:可接收消息的大小,如果设置为0那么表示对消息本身的大小不限制
// 参数2prefetchCount:处理消息最大的数量。相当于消费者能一次接受的队列大小
// 参数3global:是不是针对整个 Connection 的因为一个 Connection 可以有多个 Channel
// globalfalse针对的是这个 Channel
// globalture 针对的是这个 Connection
channel.BasicQos(0, 5, false);
factory.ConsumerDispatchConcurrency 5;好的这时候我配置了同时处理5条消息看起来没问题了但是官网文档有这样一句话 IModel instance usage by more than one thread simultaneously should be avoided. Application code should maintain a clear notion of thread ownership for IModel instances. This is a hard requirement for publishers: sharing a channel (an IModel instance) for concurrent publishing will lead to incorrect frame interleaving at the protocol level. Channel instances must not be shared by threads that publish on them. If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion. One way of achieving this is for all users of an IModel to lock the instance itself: 大概意思就是应该避免多个线程同时使用IModel实例也就是channel对象如果这么做的后果就是高负载情况下导致内存混乱有可能你的线程1消费到了线程5本该消费的消息这听起来后果是很严重的那么我们应该怎么改动呢官网也给方案了就是给channel对象加锁看下面的代码改动
改动2
consumer.Received (ch, ea) { var body ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}
};lock (channel){channel.BasicConsume(queue: my-queue,autoAck: false,consumer: consumer);
}异步支持
新增一个配置
factory.DispatchConsumersAsync true;然后修改消费者
var consumer new AsyncEventingBasicConsumer(channel);consumer.Received async (model, ea)
{await Task.Run(() {var body ea.Body.ToArray();// copy or deserialise the payload// and process the message// ...lock (channel){channel.BasicAck(ea.DeliveryTag, false);}});
};
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910388.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!