.Net使用RabbitMQ详解

序言

这几天呢,公司风波再起,去年一年公司CTO换啦4任,CEO换啦三个,这不刚来个新老大,感觉还不错,却没干过3个月又要走,索性趁老大们走来走去的时候,就给自己空出来,稍稍总结一下刚写的一个日志服务组件中用到的RabbitMQ,在.net中的实战中应用。

首先不去讨论我的日志组件怎么样。因为有些日志需要走网络,有的又不需要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,我们只谈消息RabbitMQ。

那么什么是RabbitMQ,它是用来解决什么问题的,性能如何,又怎么用?我会在下面一一阐述,如有错误,不到之处,还望大家不吝赐教。

RabbitMQ简介

必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点)。

百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议。

RabbitMQ官网:http://www.rabbitmq.com/ 如果你要下载安装,那么必须先把Erlang语言装上。

RabbitMQ的.net客户端,可以在nuget中输入rabbitmq轻松获得。

RabbitMQ与其他消息队列的对比,早有仙人给写出来。 Message Queue Shootout

这篇文章中的测试案例为:1百万条1k的消息,每秒种的收发情况如下图。

 

如果你安装好啦,rabbitmq,他会提供一个操作监控页面,页面如下,他几乎提供啦,对rabbitmq的所有操作,与监控,所以,你装上后,自己多看看,多操作下。

 

RabbitMQ中的一些名词阐述与消息从投递到消费的整个过程

从上图的标题中可以看到一些陌生的英文单词,让我们感觉一无所知,更无从操作,那么我给大家弄啦一个图片大家可以看下,或许对您理解这些新鲜的单词有所帮助。

 

看过这些名词,之后,或许你还毫无头绪,那么我把消息从生产到消费的整个流程给大家说一下,或许会更深入一点,其中Exchange,与Queue都是可以设置相关属性,队列的持久化,交换器类型制定。

 

Note:首先这个过程走分三个部分,1、客户端(生产消息队列),2、RabbitMQ服务端(负责路由规则的绑定与消息的分发),3、客户端(消费消息队列中的消息)

 

 

Note:由图可以看出,一个消息可以走一次网络却被分发到不同的消息队列中,然后被多个的客户端消费,那么这个过程就是RabbitMQ的核心机制,RabbitMQ的路由类型与消费模式。

RabbitMQ中Exchange的类型

类型有4种,direct,fanout,topic,headers。其中headers不常用,本篇不做介绍,其他三种类型,会做详细介绍。

那么这些类型是什么意思呢?就是Exchange与队列进行绑定后,消息根据exchang的类型,按照不同的绑定规则分发消息到消息队列中,可以是一个消息被分发给多个消息队列,也可以是一个消息分发到一个消息队列。具体请看下文。

介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。具体看下文。

1、Exchange类型direct

他是根据交换器名称与routingkey来找队列的。

 

Note:消息从client发出,传送给交换器ChangeA,RoutingKey为routingkey.ZLH,那么不管你发送给Queue1,还是Queue2一个消息都会保存在Queue1,Queue2,Queue3,三个队列中。这就是交换器的direct类型的路由规则。只要找到路由器与routingkey绑定的队列,那么他有多少队列,他就分发给多少队列。

2、Exchange类型fanout

这个类型忽略Routingkey,他为广播模式。

 

Note:消息从客户端发出,只要queue与exchange有绑定,那么他不管你的Routingkey是什么他都会将消息分发给所有与该exchang绑定的队列中。

3、Exchange类型topic

这个类型的路由规则如果你掌握啦,那是相当的好用,与灵活。他是根据RoutingKey的设置,来做匹配的,其中这里还有两个通配符为:

*,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

#,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 

Note:这个图看上去很乱,但是他是根据匹配符做匹配的,这里我建议你自己做下消息队列的具体操作。

具体操作如下

 public static void Producer(int value){            try{                var qName = "lhtest1";                var exchangeName = "fanoutchange1";                var exchangeType = "fanout";//topic、fanoutvar routingKey = "*";                var uri = new Uri("amqp://192.168.10.121:5672/");                var factory = new ConnectionFactory{UserName = "123",Password = "123",RequestedHeartbeat = 0,Endpoint = new AmqpTcpEndpoint(uri)};                using (var connection = factory.CreateConnection()){                    using (var channel = connection.CreateModel()){                        //设置交换器的类型                        channel.ExchangeDeclare(exchangeName, exchangeType);                        //声明一个队列,设置队列是否持久化,排他性,与自动删除channel.QueueDeclare(qName, true, false, false, null);                        //绑定消息队列,交换器,routingkey                        channel.QueueBind(qName, exchangeName, routingKey);                        var properties = channel.CreateBasicProperties();                        //队列持久化properties.Persistent = true;                        var m = new QMessage(DateTime.Now, value+"");                        var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m));                        //发送信息                        channel.BasicPublish(exchangeName, routingKey, properties, body);}}}            catch (Exception ex){Console.WriteLine(ex.Message);}}

消息队列的消费与消息确认Ack

1、消息队列的消费

Note:如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。

2、为啦保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。

ack应答有两种方式:1、自动应答,2、手动应答。具体实现如下。

 public static void Consumer(){            try{                var qName = "lhtest1";                var exchangeName = "fanoutchange1";                var exchangeType = "fanout";//topic、fanoutvar routingKey = "*";                var uri = new Uri("amqp://192.168.10.121:5672/");                var factory = new ConnectionFactory{UserName = "123",Password = "123",RequestedHeartbeat = 0,Endpoint = new AmqpTcpEndpoint(uri)};                using (var connection = factory.CreateConnection()){                    using (var channel = connection.CreateModel()){channel.ExchangeDeclare(exchangeName, exchangeType);channel.QueueDeclare(qName, true, false, false, null);channel.QueueBind(qName, exchangeName, routingKey);                        //定义这个队列的消费者QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);                        //false为手动应答,true为自动应答channel.BasicConsume(qName, false, consumer);                        while (true){BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           byte[] bytes = ea.Body;                            var messageStr = Encoding.UTF8.GetString(bytes);                            var message = DoJson.JsonToModel<QMessage>(messageStr);Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);                            //如果是自动应答,下下面这句代码不用写啦。if ((Convert.ToInt32(message.Title) % 2) == 1){channel.BasicAck(ea.DeliveryTag, false);}}}}}            catch (Exception ex){Console.WriteLine(ex.Message);}}


总结

RabbitMQ个人感觉比较简易,文章写的也可能比较简易,呵呵,见谅。如果您开发中用到啦RabbitMQ,或者开发中有什么疑惑,欢迎加入群161474816,232458226 ,我们一起讨论学习。

原文地址:http://www.cnblogs.com/knowledgesea/p/5296008.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329874.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot整合Redis要注意的那些

前言 昨天自己在重新学习SpringBoot整合Redis时&#xff0c;遇到了一个问题java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericObjectPoolConfig&#xff0c;错误很明显找不到需要的类。下面主要记录一下错误是怎么出线的&#xff0c;并且如何解决。 错…

Java之Socket与HTTP区别

转自&#xff1a; Java之Socket与HTTP区别 - 曹刚 - 博客园我们都知道TCP/IP协议共分四层&#xff1a;①链路层&#xff0c;有时也称作数据链路层或网络接口层&#xff0c;通常包括操作系统中的设备驱动程序和计算机中对应的网络接口卡。它们一起处理与电缆&#xff08;或其他…

人人都能掌握的Java服务端性能优化方案

转载自 人人都能掌握的Java服务端性能优化方案 作为一个Java后端开发&#xff0c;我们写出的大部分代码都决定着用户的使用体验。如果我们的代码性能不好&#xff0c;那么用户在访问我们的网站时就要浪费一些时间等待服务器的响应。这就可能导致用户投诉甚至用户的流失。 关于性…

python模板模式_python-模板方法模式

说明&#xff1a; 模板方法模式时行为模式中比较简单的设计模式之一。模板方法关注这样的一类行为&#xff1a;该类行为在执行过程中拥有大致相同的动作次序&#xff0c;只是动作在实现的具体细节上有所差异。例如&#xff1a;泡茶和泡咖啡&#xff0c;泡茶&#xff1a;把水煮沸…

nacos启动失败:org.springframework.boot.web.server.WebServerExceptio

准备环境 系统环境: windows nacos: 2.0.0-BETA 错误信息 org.springframework.context.ApplicationContextException: Unable to start web server; nested exception is org.springframework.boot.web.server.WebServerException: Unable to start embedded Tomcat 配置文件…

post使用form-data和x-www-form-urlencoded的本质区别

转自&#xff1a; post使用form-data和x-www-form-urlencoded的本质区别_null-CSDN博客一是数据包格式的区别&#xff0c;二是数据包中非ANSCII字符怎么编码&#xff0c;是百分号转码发送还是直接发送一、application/x-www-form-urlencoded1、它是post的默认格式&#xff0c;…

使用Nancy打造TaskManager2.0管理系统

上一篇开源任务管理平台TaskManager介绍发布后&#xff0c;有网友联系我看看能不能做个后台管理界面&#xff0c;方便管理系统中所有的任务。由于时间和技术问题1.0版本的时候&#xff0c;新增了一个3分钟读取配置文件动态修改任务的功能&#xff0c;不过总体来说还是不直观&am…

Java对象的序列化与反序列化

转载自 Java对象的序列化与反序列化 序列化与反序列化 序列化 (Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。一般将一个对象存储至一个储存媒介&#xff0c;例如档案或是记亿体缓冲等。在网络传输过程中&#xff0c;可以是字节或是XML等格式。而字节的…

网站能拿到其他网站的cookie_网站能给公司带来哪些好处?

事实上&#xff0c;许多企业仍在努力建立一个网站&#xff0c;因为如果它建立起来&#xff0c;它需要一笔钱和其他费用。但此时客户业务不太好&#xff0c;企业客户需要考虑很多问题&#xff0c;但创建网站的企业已经尝到了甜头。其实有些朋友可能会有一些偏差。例如&#xff0…

笨办法学习@ConditionalOnProperty 烧脑配置记录

前言 今天继续学习springboot时&#xff0c;一不小心就被ConditionalOnProperty注解的配置真假搞得我真的变得真真假假了。。&#xff08;此为真&#xff0c;彼为假&#xff0c;到底你是真还是你是假&#xff0c;晕了晕了。。。&#xff09; 本片主要记录一下注解的真假情况 …

javaI/O流小结

【README】 1.本文总结java IO读取或写入数据的方式和相关类说明&#xff1b; 2.java IO建立在流之上的。输入流读取数据&#xff0c;输出流写入数据&#xff1b; 3.过滤器流-filter stream&#xff0c;可以串连&#xff08;修饰&#xff09;到输入流和输出流上&#xff1b;…

送给微软中文.NET社区的一份礼物,.NET FM

自报家门 大家好&#xff0c;我是.NET FM。做为一档专业而轻松的播客节目&#xff0c;在今后的日子里&#xff0c;我将为你奉上有关.NET和微软公司其他技术的新鲜资讯&#xff08;偷偷讲下&#xff0c;还有各种八卦哦&#xff09;。 Lex Li的回忆 认识吕鹏同学真的是非常偶然…

你真的以为你了解Java的序列化了吗

转载自 你真的以为你了解Java的序列化了吗 上一篇文章《Java对象的序列化与反序列化》中&#xff0c;简单介绍了Java中对象的序列化和反序列化的一些基础知识。看文那篇文章后&#xff0c;有小伙伴留言说&#xff1a;我终于了解了Java的序列化了。我只想说&#xff1a;小伙子&a…

SpringBoot配置mybatis-mysql数据源

前言 学习SpringBoot整合mybatis mysql配置&#xff0c;首先需要了解什么是ORM(对象映射关系)框架&#xff0c;ORM&#xff08;Object Relational Mapping&#xff09;对象关系映射&#xff0c;是 一种为了解决面向对象与关系型数据库不匹配而出现的技术&#xff0c;使开发者…

编程猜单词游戏python_Python实现简单的猜单词小游戏

本文实例为大家分享了Python实现猜单词小游戏的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下 思路 1、一个words列表里存放若干的单词&#xff0c;例如&#xff1a;["extends", "private", "static", "public"]2、在words…

java URL和URI

【README】 本文阐述了 URL&#xff0c; URI&#xff0c;以及对应的java类的api&#xff1b; 1.URI&#xff0c;统一资源标识符&#xff0c;标识互联网上的某个网络资源&#xff0c;标识方式如 名称&#xff0c;位置等&#xff1b;就像人的标识一样&#xff0c;可以通过身份证…

HoloLens开发手记-全息Hologram

全息 Hologram HoloLens使我们可以通过周边世界的光线和声音来创建全息场景和物体&#xff0c;使得它们像真实物体那样。全息场景能够响应你的凝视、手势和语音指令&#xff0c;同时还会和你周边世界的表面交互。借助全息场景&#xff0c;你可以在周边世界创建数码物体。 class…

全网把Map中的hash()分析的最透彻的文章,别无二家。

转载自 全网把Map中的hash()分析的最透彻的文章&#xff0c;别无二家。你知道HashMap中hash方法的具体实现吗&#xff1f; 你知道HashTable、ConcurrentHashMap中hash方法的实现以及原因吗&#xff1f; 你知道为什么要这么实现吗&#xff1f; 你知道为什么JDK 7和JDK 8中hash方…

python下面的代码_解析一下下面的python代码?

class Model(dict, metaclassModelMetaclass): # 初始化, 没啥好说的 def __init__(self, **kw): super(Model, self).__init__(**kw) # 如果取不到值, 报错, 这是一个魔术方法, 使用时直接getattr(obj, key) def __getattr__(self, key): try: return self[key] except KeyErr…

SpringBoot多数据源(主从数据源)配置

&#x1f3b6;前言 学习springboot配置多数据源&#xff0c;先回顾一下springboot配置单数据源的方式 SpringBoot配置mybatis-mysql数据源 &#x1f520;主从数据源搭建 项目依赖 本次记录多数据源配置主要是通过druid mybatis plus aop的形式实现的&#xff0c;mybatis …