本文目标
-
理解AMQP模型中的核心概念:Connection, Channel, Producer, Consumer, Queue。
-
创建一个.NET项目并添加RabbitMQ客户端库。
-
使用C#编写代码发送一条消息("Hello World")。
-
使用C#编写代码接收并处理这条消息。
一、理论部分
1. AMQP 0-9-1 核心模型简介
在编写代码前,我们需要理解几个核心概念,它们构成了RabbitMQ一切功能的基础:
-
生产者 (Producer):发送消息的应用程序。
-
消费者 (Consumer):接收消息的应用程序。
-
队列 (Queue):一个类似于邮箱的存储结构,位于RabbitMQ内部,用于存储消息。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息。消息只能存储在队列中。
-
连接 (Connection):一个TCP连接,应用程序通过它与RabbitMQ服务器建立网络连接。创建连接的开销较大。
-
通道 (Channel):建立在连接之上的虚拟连接。几乎所有的操作都在通道中进行。使用通道的原因是为了避免频繁创建和销毁TCP连接带来的巨大开销。一个连接可以包含多个通道。
简单工作流:Producer
-> (Connection
-> Channel
) -> Queue
-> (Channel
-> Connection
) -> Consumer
2. RabbitMQ.Client
库
这是RabbitMQ官方提供的.NET客户端库,它实现了AMQP协议,是我们与RabbitMQ服务器交互的桥梁。我们将通过NuGet包管理器来安装它。
二、实操部分:创建"Hello World"
我们将创建两个控制台应用程序:一个生产者(Send)和一个消费者(Receive)。
第1步:创建项目并添加NuGet包
-
打开你的IDE(如Visual Studio或VS Code),创建一个新的解决方案(Solution)。
-
在该解决方案中,创建两个新的控制台应用程序项目,分别命名为
Send
和Receive
。 -
为两个项目添加
RabbitMQ.Client
NuGet包。-
Visual Studio:右键点击项目 -> "Manage NuGet Packages..." -> 浏览 -> 搜索
RabbitMQ.Client
-> 安装。 -
.NET CLI:
-
第2步:编写生产者(Send.cs)
将 Send
项目中的 Program.cs
替换为以下代码。请务必将 hostName
、userName
和 password
替换为你在第1章中设置的值(如果你按教程做,应该是 localhost
, myuser
, mypassword
)。
代码解析:
-
我们首先创建了一个连接工厂,并配置了连接到我们本地RabbitMQ服务器所需的参数。
-
然后,我们建立了连接和通道。这是与RabbitMQ交互的标准方式。
-
QueueDeclare
是幂等的——它只会在队列不存在时创建它。 -
默认交换机(
""
)是一个直连交换机(Direct Exchange),它会把消息路由到routingKey
完全匹配的队列中。所以这里routingKey: "hello"
意味着消息会被投递到名为hello
的队列。
第3步:运行生产者并查看管理后台
-
运行
Send
项目(在Visual Studio中按F5,或使用CLI命令dotnet run
)。 -
你会在控制台看到
[x] Sent Hello World!
。 -
现在,打开RabbitMQ管理后台 (http://localhost:15672),登录后点击 Queues 标签页。你应该能看到一个名为
hello
的队列,并且它下面有 1 条消息正准备被消费("Ready"状态)!
https://i.imgur.com/3Xk7ywI.png
第4步:编写消费者(Receive.cs)
现在我们来编写消费者程序,从 hello
队列中取出消息。
将 Receive
项目中的 Program.cs
替换为以下代码:
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;// 同生产者一样,创建连接和通道 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {// 声明队列。这一步也是必需的,以防我们先启动消费者,而队列还不存在。channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);// 创建一个事件消费者(EventingBasicConsumer)对象,并关联到我们的通道var consumer = new EventingBasicConsumer(channel);// 当消费者收到消息时,触发这个事件consumer.Received += (model, ea) =>{// 消息体是字节数组,我们需要将其转换回字符串var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");};// 开始消费队列中的消息// 参数说明:// queue: "hello" - 要消费的队列名称// autoAck: true - 自动确认模式。如果为true,消息一旦被送达,RabbitMQ会立即将其从队列中标记为删除。// consumer: consumer - 我们上面定义的消费者对象channel.BasicConsume(queue: "hello",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine(); // 保持程序运行,持续监听消息 }
代码解析:
-
前面的连接、通道、队列声明步骤与生产者完全一致,这确保了所需的队列存在。
-
我们创建了一个
EventingBasicConsumer
对象,并为它的Received
事件订阅了一个处理方法。每当有消息到达时,这个匿名方法就会被调用。 -
BasicConsume
方法启动消费过程。它将我们的消费者注册到指定的队列。 -
autoAck: true
表示自动确认模式。这意味着消费者一收到消息,RabbitMQ就认为它已成功处理并立即从队列中删除该消息。这是一种简单的模式,但如果消费者在处理消息过程中崩溃,消息就会丢失。我们将在下一章学习更可靠的手动确认模式。
第5步:运行消费者
-
运行
Receive
项目。 -
你会立刻在控制台看到
[x] Received Hello World!
。消费者程序取走了我们之前发送的消息并打印了出来。 -
再次查看管理后台的 Queues 页面,你会发现
hello
队列中的消息数又变回了 0。
尝试一下:
-
先运行
Receive
程序,让它保持运行并监听消息。 -
然后再运行
Send
程序多次。 -
观察
Receive
的控制台,它会实时地打印出每一条新收到的消息。
总结
我们已经成功实现了一个RabbitMQ应用程序。在本文中,我们:
-
创建了.NET项目并引入了
RabbitMQ.Client
库。 -
理解了AMQP模型中的
Connection
、Channel
、Queue
、Producer
和Consumer
等核心概念。 -
编写了生产者代码,成功向名为
hello
的队列发送了一条消息。 -
编写了消费者代码,成功从队列中取出了消息并进行处理。
-
使用了管理后台来验证消息的流动。
这是一个最简单的模型,生产者和消费者直接与队列打交道。在下一章,我们将引入一个更强大的概念——交换机(Exchange),并学习如何实现工作队列(Work Queue),让多个消费者共同处理任务,这将使我们的应用变得更加实用和强大。