本章目标
- 
理解RabbitMQ RPC模式的工作原理和适用场景。 
- 
掌握回调队列(Callback Queue)和关联ID(Correlation Id)的使用。 
- 
实现基于RabbitMQ的异步RPC调用。 
- 
学习RPC模式下的错误处理和超时机制。 
- 
构建完整的微服务间同步通信解决方案。 
一、理论部分
1. RPC模式简介
RPC(Remote Procedure Call)模式允许客户端应用程序调用远程服务器上的方法,就像调用本地方法一样。在RabbitMQ中,RPC是通过消息队列实现的异步RPC。
与传统HTTP RPC的区别:
- 
HTTP RPC:同步,直接连接,需要服务端在线 
- 
消息队列RPC:异步,通过消息代理,支持解耦和负载均衡 
2. RabbitMQ RPC核心组件
- 
请求队列(Request Queue):客户端发送请求的队列 
- 
回复队列(Reply Queue):服务器返回响应的队列 
- 
关联ID(Correlation Id):匹配请求和响应的唯一标识 
- 
消息属性:使用 IBasicProperties.ReplyTo和IBasicProperties.CorrelationId
3. RPC工作流程
Client端: 1. 生成唯一CorrelationId 2. 创建临时回复队列 3. 发送请求到请求队列,设置ReplyTo和CorrelationId 4. 监听回复队列,等待匹配的CorrelationIdServer端: 1. 监听请求队列 2. 处理请求 3. 将响应发送到请求中的ReplyTo队列 4. 设置相同的CorrelationIdClient端: 5. 收到响应,根据CorrelationId匹配请求 6. 处理响应
4. 适用场景
- 
需要同步响应的异步操作 
- 
微服务间的同步通信 
- 
计算密集型任务的分布式处理 
- 
需要负载均衡的同步调用 
二、实操部分:构建分布式计算服务
我们将创建一个分布式斐波那契数列计算服务,演示完整的RPC模式实现。
第1步:创建项目结构
# 创建解决方案
dotnet new sln -n RpcSystem# 创建项目
dotnet new webapi -n RpcClient.API
dotnet new classlib -n RpcClient.Core
dotnet new classlib -n RpcServer.Service
dotnet new classlib -n RpcShared# 添加到解决方案
dotnet sln add RpcClient.API/RpcClient.API.csproj
dotnet sln add RpcClient.Core/RpcClient.Core.csproj
dotnet sln add RpcServer.Service/RpcServer.Service.csproj
dotnet sln add RpcShared/RpcShared.csproj# 添加项目引用
dotnet add RpcClient.API reference RpcClient.Core
dotnet add RpcClient.API reference RpcShared
dotnet add RpcClient.Core reference RpcShared
dotnet add RpcServer.Service reference RpcShared# 添加NuGet包
cd RpcClient.API
dotnet add package RabbitMQ.Clientcd ../RpcClient.Core
dotnet add package RabbitMQ.Clientcd ../RpcServer.Service
dotnet add package RabbitMQ.Client
第2步:定义共享模型(RpcShared)
Models/RpcRequest.cs
using System.Text.Json.Serialization;namespace RpcShared.Models
{public class RpcRequest{[JsonPropertyName("requestId")]public string RequestId { get; set; } = Guid.NewGuid().ToString();[JsonPropertyName("method")]public string Method { get; set; } = string.Empty;[JsonPropertyName("parameters")]public Dictionary<string, object> Parameters { get; set; } = new();[JsonPropertyName("timestamp")]public DateTime Timestamp { get; set; } = DateTime.UtcNow;public RpcRequest WithParameter(string key, object value){Parameters[key] = value;return this;}public T? GetParameter<T>(string key){if (Parameters.TryGetValue(key, out var value)){try{return (T)Convert.ChangeType(value, typeof(T));}catch{return default;}}return default;}}
}
Models/RpcResponse.cs
using System.Text.Json.Serialization;namespace RpcShared.Models
{public class RpcResponse{[JsonPropertyName("requestId")]public string RequestId { get; set; } = string.Empty;[JsonPropertyName("success")]public bool Success { get; set; }[JsonPropertyName("data")]public object? Data { get; set; }[JsonPropertyName("error")]public string? Error { get; set; }[JsonPropertyName("timestamp")]public DateTime Timestamp { get; set; } = DateTime.UtcNow;[JsonPropertyName("processingTimeMs")]public long ProcessingTimeMs { get; set; }public static RpcResponse SuccessResponse(string requestId, object data, long processingTimeMs = 0){return new RpcResponse{RequestId = requestId,Success = true,Data = data,ProcessingTimeMs = processingTimeMs};}public static RpcResponse ErrorResponse(string requestId, string error, long processingTimeMs = 0){return new RpcResponse{RequestId = requestId,Success = false,Error = error,ProcessingTimeMs = processingTimeMs};}public T? GetData<T>(){if (Data is JsonElement jsonElement){return jsonElement.Deserialize<T>();}return Data is T typedData ? typedData : default;}}
}
Messages/FibonacciRequest.cs
namespace RpcShared.Messages
{public class FibonacciRequest{public int Number { get; set; }public bool UseOptimizedAlgorithm { get; set; } = true;}public class FibonacciResponse{public long Result { get; set; }public long CalculationTimeMs { get; set; }public int InputNumber { get; set; }}
}
第3步:RPC客户端核心库(RpcClient.Core)
Services/IRpcClient.cs
using RpcShared.Models;namespace RpcClient.Core.Services
{public interface IRpcClient : IDisposable{Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout);Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class;}
}
Services/RpcClient.cs
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RpcShared.Models;namespace RpcClient.Core.Services
{public class RpcClient : IRpcClient{private readonly IConnection _connection;private readonly IModel _channel;private readonly ILogger<RpcClient> _logger;private readonly string _replyQueueName;private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests;private readonly AsyncEventingBasicConsumer _consumer;private bool _disposed = false;public RpcClient(IConnectionFactory connectionFactory,ILogger<RpcClient> logger){_logger = logger;_pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>();// 建立连接和通道_connection = connectionFactory.CreateConnection();_channel = _connection.CreateModel();// 声明临时回复队列(排他性,连接关闭时自动删除)_replyQueueName = _channel.QueueDeclare(queue: "",durable: false,exclusive: true,autoDelete: true,arguments: null).QueueName;// 创建消费者监听回复队列_consumer = new AsyncEventingBasicConsumer(_channel);_consumer.Received += OnResponseReceived;// 开始消费回复队列_channel.BasicConsume(queue: _replyQueueName,autoAck: false,consumer: _consumer);_logger.LogInformation("RPC Client initialized with reply queue: {ReplyQueue}", _replyQueueName);}public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout){if (_disposed)throw new ObjectDisposedException(nameof(RpcClient));var tcs = new TaskCompletionSource<RpcResponse>();var cancellationTokenSource = new CancellationTokenSource(timeout);// 注册超时取消cancellationTokenSource.Token.Register(() =>{if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs)){removedTcs.TrySetException