1. 引言
事件总线这个概念对你来说可能很陌生,但提到观察者(发布-订阅)模式,你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
从上图可知,核心就4个角色:
事件(事件源+事件处理)
事件发布者
事件订阅者
事件总线
实现事件总线的关键是:
事件总线维护一个事件源与事件处理的映射字典;
通过单例模式,确保事件总线的唯一入口;
利用反射完成事件源与事件处理的初始化绑定;
提供统一的事件注册、取消注册和触发接口。
以上源于我在事件总线知多少(1)中对于EventBus的分析和简单总结。基于以上的简单认知,我们来梳理下eShopOnContainers中EventBus的实现机制。
2. 高屋建瓴--看类图
我们直接以上帝视角,来看下其实现机制,上类图。
我们知道事件的本质是:事件源+事件处理。 针对事件源,其定义了 Handle方法用于响应事件。不同之处在于方法参数的类型: 第一个接受的是一个强类型的 dynamic。 为什么要单独提供一个事件源为 dynamic可以简化事件源的构建,更趋于灵活。
有了事件源和事件处理,接下来就是事件的注册和订阅了。为了方便进行订阅管理,系统提供了额外的一层抽象 InMemoryEventBusSubscriptionsManager就是使用内存进行存储事件源和事件处理的映射字典。 从类图中看 SubscriptionInfo,其主要用于表示事件订阅方的订阅类型和事件处理的类型。
我们来近距离看下
//InMemoryEventBusSubscriptionsManager.cs//定义的事件名称和事件订阅的字典映射(1:N)private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;//保存所有的事件处理类型private readonly List<Type> _eventTypes;//定义事件移除后事件public event EventHandler<string> OnEventRemoved;//构造函数初始化public InMemoryEventBusSubscriptionsManager(){_handlers = new Dictionary<string, List<SubscriptionInfo>>();_eventTypes = new List<Type>();}//添加动态类型事件订阅(需要手动指定事件名称)public void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{DoAddSubscription(typeof(TH), eventName, isDynamic: true);}//添加强类型事件订阅(事件名称为事件源类型)public void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = GetEventKey<T>();DoAddSubscription(typeof(TH), eventName, isDynamic: false);if (!_eventTypes.Contains(typeof(T))){_eventTypes.Add(typeof(T));}}//移除动态类型事件订阅public void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);DoRemoveHandler(eventName, handlerToRemove);}//移除强类型事件订阅public void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent{var handlerToRemove = FindSubscriptionToRemove<T, TH>();var eventName = GetEventKey<T>();DoRemoveHandler(eventName, handlerToRemove);}
添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。 IEventBusSubscriptionsManager的依赖,即可完成订阅管理。 你这里可能会好奇,为什么要暴露一个 EventBusRabbitMQ源码亲密接触。
3.3.1. 构造函数定义
IRabbitMQPersistentConnection以便连接到对应的Broke。
使用空对象模式注入
OnEventRemoved事件,取消队列的绑定。(这也就回答了上面遗留的问题)
3.3.2. 事件订阅的逻辑:
public void Publish(IntegrationEvent @event){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}var policy = RetryPolicy.Handle<BrokerUnreachableException>().Or<SocketException>().WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex.ToString());});using (var channel = _persistentConnection.CreateModel()){var eventName = @event.GetType().Name;channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");var message = JsonConvert.SerializeObject(@event);var body = Encoding.UTF8.GetBytes(message);policy.Execute(() =>{var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2; // persistentchannel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body);});}}
这里面有以下几个知识点:
使用Polly,以2的阶乘的时间间隔进行重试。(第一次2s后,第二次4s后,第三次8s后...重试)
使用direct全匹配、单播形式的路由机制进行消息分发
消息主体是格式化的json字符串
指定
mandatory:true告知服务器当根据指定的routingKey和消息找不到对应的队列时,直接返回消息给生产者。
3.3.4. 然后看看事件消息的监听
Received事件委托处理消息接收事件调用
以上代码主要包括以下知识点:
4. EventBus的集成和使用
以上介绍了EventBus的实现要点,那各个微服务是如何集成呢?
1. 注册
2. 注册单例模式的 services.AddSingleton<IEventBusSubscriptionsManager,InMemoryEventBusSubscriptionsManager>();
3. 注册单例模式的
完成了以上集成,就可以在代码中使用事件总线进行事件的发布和订阅。
4. 发布事件
若要发布事件,需要根据是否需要事件源(参数传递)来决定是否需要申明相应的集成事件,需要则继承自 IEventBus的实例的
IIntegrationEventHandler或 IEventBus的实例调用
TestEvent事件,B服务订阅该事件,同样需要在B服务复制定义一个 <code class="prettyprint code-in-text prettyprinted" style="box-sizing: border-box;background: rgb(243, 241, 241);color: rgb(88, 88, 88);line-height: 18px;font-family: consolas, menlo, courier, monospace, " initial="" microsoft="" !important;"="" 0px="">TestEvent。 这也是微服务的一个通病,重复代码。5. 最后
通过一步一步的源码梳理,我们发现eShopOnContainers中事件总线的总体实现思路与引言部分的介绍十分契合。所以对于事件总线,不要觉得高深,明确参与的几个角色以及基本的实现步骤,那么不管是基于RabbitMQ实现也好还是基于Azure Service Bus也好,万变不离其宗!
//定义事件处理public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>{public async Task Handle(ProductPriceChangedIntegrationEvent @event){//do something}}//事件源的声明public class ProductPriceChangedIntegrationEvent : IntegrationEvent{public int ProductId { get; private set; }public decimal NewPrice { get; private set; }public decimal OldPrice { get; private set; }public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice){ProductId = productId;NewPrice = newPrice;OldPrice = oldPrice;}}services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>{var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();var retryCount = 5;if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])){retryCount = int.Parse(Configuration["EventBusRetryCount"]);}return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});services.AddSingleton<IRabbitMQPersistentConnection>(sp =>{var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();//...return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);});
Json字符串的反序列化
利用依赖注入容器解析集成事件(Integration Event)和事件处理(Event Handler)类型
反射调用具体的事件处理方法
private async Task ProcessEvent(string eventName, string message){if (_subsManager.HasSubscriptionsForEvent(eventName)){using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){var subscriptions = _subsManager.GetHandlersForEvent(eventName);foreach (var subscription in subscriptions){if (subscription.IsDynamic){var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;dynamic eventData = JObject.Parse(message);await handler.Handle(eventData);}else{var eventType = _subsManager.GetEventTypeByName(eventName);var integrationEvent = JsonConvert.DeserializeObject(message, eventType);var handler = scope.ResolveOptional(subscription.HandlerType);var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}}