Dev Mentor - RabbitMq

Bus is to be used to inform or broadcast the mutated state and command that need to be processed by multiple services

 

  

Scenario 1 ProductService received rest post message to persist a product and publish the product persistence message as event to rabbitmq

"rabbitMq": {"namespace": "Matt-Product","retries": 3,"retryInterval": 2,"username": "guest","password": "guest","virtualHost": "/","port": 5672,"hostnames": ["10.89.24.148"],"requestTimeout": "00:00:10","publishConfirmTimeout": "00:00:01","recoveryInterval": "00:00:10","persistentDeliveryMode": true,"autoCloseConnection": true,"automaticRecovery": true,"topologyRecovery": true,"exchange": {"durable": true,"autoDelete": false,"type": "Topic"},"queue": {"autoDelete": false,"durable": true,"exclusive": false}},
RabbitMq config section

 

using Common.Handlers;
using Common.RabbitMQ;
using Common.Repo;
using ProductService.Commands;
using ProductService.Events;
using ProductService.Models;
using System.Linq;
using System.Threading.Tasks;namespace ProductService.CommandHandlers
{public class NewProductCommandHandler : ICommandHandler<NewProductCommand>{IBusPublisher _busPublisher;public NewProductCommandHandler(IBusPublisher busPublisher){_busPublisher = busPublisher;} public async Task  HandleAsync(NewProductCommand command, ICorrelationContext context){var enumerator = DataStore<Product>.GetInstance().GetRecords(i=>i.Name == command.Name) ;if (enumerator.Count() == 0 ){DataStore<Product>.GetInstance().AddRecord(new Product(command.Id, command.Name, command.Category, command.Price));//Send product created event bus msgawait _busPublisher.PublishAsync<ProductCreated>( new ProductCreated { Id = command.Id, Name = command.Name }, context);}else{//Send rejected bus message
            }}}
}
NewProductCommandHandler

 

 

Scenario 2 OrderService and OperationService subscribe to the event and proceed with their own handling logic

 

using Common.Messages;
using Common.Handlers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RawRabbit;
using System;
using RawRabbit.Common;
using System.Threading.Tasks;
using System.Reflection;
using Polly;
using RawRabbit.Enrichers.MessageContext;
using Common.Exceptions;namespace Common.RabbitMQ
{public class BusSubscriber : IBusSubscriber{private readonly ILogger _logger;private readonly IBusClient _busClient;private readonly IServiceProvider _serviceProvider;private readonly string _defaultNamespace;private readonly int _retries;private readonly int _retryInterval;public BusSubscriber(IApplicationBuilder app){_logger = app.ApplicationServices.GetService<ILogger<BusSubscriber>>();_serviceProvider = app.ApplicationServices.GetService<IServiceProvider>();_busClient = _serviceProvider.GetService<IBusClient>();var options = _serviceProvider.GetService<RabbitMqOptions>();_defaultNamespace = options.Namespace;_retries = options.Retries >= 0 ? options.Retries : 3;_retryInterval = options.RetryInterval > 0 ? options.RetryInterval : 2;}public IBusSubscriber SubscribeCommand<TCommand>(string @namespace = null, string queueName = null,Func<TCommand, Exception, IRejectedEvent> onError = null)where TCommand : ICommand{_busClient.SubscribeAsync<TCommand, CorrelationContext>(async (command, correlationContext) =>{var commandHandler = _serviceProvider.GetService<ICommandHandler<TCommand>>();return await TryHandleAsync(command, correlationContext,() => commandHandler.HandleAsync(command, correlationContext), onError);},ctx => ctx.UseSubscribeConfiguration(cfg =>cfg.FromDeclaredQueue(q => q.WithName(GetQueueName<TCommand>(@namespace, queueName)))));return this;}public IBusSubscriber SubscribeEvent<TEvent>(string @namespace = null, string queueName = null,Func<TEvent, Exception, IRejectedEvent> onError = null)where TEvent : IEvent{_busClient.SubscribeAsync<TEvent, CorrelationContext>(async (@event, correlationContext) =>{var eventHandler = _serviceProvider.GetService<IEventHandler<TEvent>>();return await TryHandleAsync(@event, correlationContext,() => eventHandler.HandleAsync(@event, correlationContext), onError);},ctx => ctx.UseSubscribeConfiguration(cfg =>cfg.FromDeclaredQueue(q => q.WithName(GetQueueName<TEvent>(@namespace, queueName)))));return this;}// Internal retry for services that subscribe to the multiple events of the same types.// It does not interfere with the routing keys and wildcards (see TryHandleWithRequeuingAsync() below).private async Task<Acknowledgement> TryHandleAsync<TMessage>(TMessage message,CorrelationContext correlationContext,Func<Task> handle, Func<TMessage, Exception, IRejectedEvent> onError = null){var currentRetry = 0;var retryPolicy = Policy.Handle<Exception>().WaitAndRetryAsync(_retries, i => TimeSpan.FromSeconds(_retryInterval));var messageName = message.GetType().Name;return await retryPolicy.ExecuteAsync<Acknowledgement>(async ()=>{try{var retryMessage = currentRetry == 0? string.Empty: $"Retry: {currentRetry}'.";_logger.LogInformation($"Handling a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");await handle();_logger.LogInformation($"Handled a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");return new Ack();}//catch (CustomizedException<IEventHandler<IEvent>> exception)//{//    System.Diagnostics.Debug.WriteLine(exception.Message);//    return new Ack();//}catch (Exception exception){currentRetry++;_logger.LogError(exception, exception.Message);if (exception.GetType().FullName.Contains("CustomizedException") && onError != null){var rejectedEvent = onError(message, exception);await _busClient.PublishAsync(rejectedEvent, ctx => ctx.UseMessageContext(correlationContext));_logger.LogInformation($"Published a rejected event: '{rejectedEvent.GetType().Name}' " +$"for the message: '{messageName}' with correlation id: '{correlationContext.Id}'.");return new Ack();}throw new Exception($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}', " +$"retry {currentRetry - 1}/{_retries}...");}});}// RabbitMQ retry that will publish a message to the retry queue.// Keep in mind that it might get processed by the other services using the same routing key and wildcards.private async Task<Acknowledgement> TryHandleWithRequeuingAsync<TMessage>(TMessage message,CorrelationContext correlationContext,Func<Task> handle, Func<TMessage, Exception, IRejectedEvent> onError = null){var messageName = message.GetType().Name;var retryMessage = correlationContext.Retries == 0? string.Empty: $"Retry: {correlationContext.Retries}'.";_logger.LogInformation($"Handling a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");try{await handle();_logger.LogInformation($"Handled a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");return new Ack();}catch (Exception exception){_logger.LogError(exception, exception.Message);if (exception is Exception dShopException && onError != null){var rejectedEvent = onError(message, dShopException);await _busClient.PublishAsync(rejectedEvent, ctx => ctx.UseMessageContext(correlationContext));_logger.LogInformation($"Published a rejected event: '{rejectedEvent.GetType().Name}' " +$"for the message: '{messageName}' with correlation id: '{correlationContext.Id}'.");return new Ack();}if (correlationContext.Retries >= _retries){await _busClient.PublishAsync(RejectedEvent.For(messageName),ctx => ctx.UseMessageContext(correlationContext));throw new Exception($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}' " +$"after {correlationContext.Retries} retries.", exception);}_logger.LogInformation($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}', " +$"retry {correlationContext.Retries}/{_retries}...");return Retry.In(TimeSpan.FromSeconds(_retryInterval));}}private string GetQueueName<T>(string @namespace = null, string name = null){@namespace = string.IsNullOrWhiteSpace(@namespace)? (string.IsNullOrWhiteSpace(_defaultNamespace) ? string.Empty : _defaultNamespace): @namespace;var separatedNamespace = string.IsNullOrWhiteSpace(@namespace) ? string.Empty : $"{@namespace}.";return (string.IsNullOrWhiteSpace(name)? $"{Assembly.GetEntryAssembly().GetName().Name}/{separatedNamespace}{typeof(T).Name.Underscore()}": $"{name}/{separatedNamespace}{typeof(T).Name.Underscore()}").ToLowerInvariant();}}
}
BusSubscriber

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1174481.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PyTorch 自动微分:超越 `backward()` 的动态图深度探索

PyTorch 自动微分&#xff1a;超越 backward() 的动态图深度探索 引言&#xff1a;自动微分的范式之争 在深度学习的工程实践中&#xff0c;自动微分&#xff08;Automatic Differentiation, AD&#xff09;已成为模型训练的基石。与符号微分和数值微分不同&#xff0c;自动微分…

计算机毕业设计 java 疫情物资管理系统 Java 疫情物资智能管理与调配平台 基于 Spring Boot 的疫情物资申请捐赠系统

计算机毕业设计 java 疫情物资管理系统 v5rne9&#xff08;配套有源码 程序 mysql 数据库 论文&#xff09;本套源码可以先看具体功能演示视频领取&#xff0c;文末有联 xi 可分享在疫情防控常态化背景下&#xff0c;疫情物资的高效管理、精准调配与供需对接成为关键需求&#…

震惊!浙江这家AI科技公司,竟是光景泽创!

浙江光景泽创科技&#xff1a;AI 企服领域的创新引领者在当今数字化浪潮汹涌的时代&#xff0c;AI 技术在企业服务领域的应用正成为行业发展的关键驱动力。然而&#xff0c;企业在引入 AI 服务时&#xff0c;往往面临着诸多挑战。从行业实操反馈来看&#xff0c;许多企业在 AI …

Dev Mentor - Seq Serilog

Dev Mentor - Seq Serilog {"app": {"name": "order-service"},"elk": {"enabled": false,"url": "http://10.89.24.148:9200","indexFo…

基于深度学习的棉花分类检测系统(YOLOv8+YOLO数据集+UI界面+Python项目+模型)

一、项目介绍 摘要 本项目基于YOLOv8深度学习目标检测算法&#xff0c;开发了一套高效、精准的棉花品种智能分类检测系统。该系统能够自动识别并分类四种主要棉花品种&#xff1a;亚洲棉&#xff08;G. arboreum&#xff09;、海岛棉&#xff08;G. barbadense&#xff09;、…

基于深度学习的手势识别检测系统(YOLOv8+YOLO数据集+UI界面+Python项目+模型)

一、项目介绍 摘要 本项目基于先进的YOLOv8深度学习算法&#xff0c;开发了一套高效精准的实时手势识别检测系统。系统能够准确识别10种常见手势&#xff0c;包括字母手势&#xff08;A、D、I、L、V、W、Y&#xff09;、数字手势&#xff08;5、7&#xff09;以及特殊手势&am…

Dev Mentor - Distributed tracing

Dev Mentor - Distributed tracingOpenTrace take a tracer instance (e.g. Jaeger) to post the metrics via UDP to the remote Jaeger instance for display OpenTrace then can be acquired in DI manner and get …

基于深度学习的火焰烟雾检测系统(YOLOv10+YOLO数据集+UI界面+Python项目+模型)

一、项目介绍 项目背景 火焰与烟雾的检测在很多领域中都至关重要&#xff0c;特别是在火灾监控、工业安全、环境保护等领域。准确、实时地识别火焰和烟雾的存在&#xff0c;不仅可以有效减少灾害发生的损失&#xff0c;还能够为相关部门提供及时的预警信息。因此&#xff0c;…

VIRTUALIZATION - Dev Mentor - Kubernates (Continue)

VIRTUALIZATION - Dev Mentor - Kubernates (Continue) kubectl apply -f /home/Asdf1234/pod.ymlkubectl get podskubectl port-forward nanoservice-pod 5000:5000kubectl describe pod nanoservice-pod kubectl de…

VIRTUALIZATION - Dev Mentor - Docker

VIRTUALIZATION - Dev Mentor - Docker Remove all inactive dockers : docker container prune -f Copy files in inactive docker to host server: docker container cp 5f9c2f1893c4:/app/migrator/ ./ VS.NET doc…

无需专业技能!AI小程序一句话高效改图出片

拍照总遇尴尬&#xff1f;自拍眼镜泛绿光、风景照路人乱入&#xff0c;修图又难又费钱&#xff1f;别慌&#xff01;安利小程序AI生图&#xff0c;小白也能一键精准修图&#xff0c;轻松拯救废片。实战演示&#xff1a;两大拍照痛点&#xff0c;一键解决▶场景一&#xff1a;人…

【性能测试】9_JMeter _JMeter录制脚本(了解)

文章目录一、录制脚本原理二、应用场景三、操作步骤四、jmeter问题不能联网五、过滤规则和Cookie管理器一、录制脚本原理 Jmeter在客户端和服务器之间做代理。收到所有的请求和响应数据后&#xff0c;Jmeter再进行逆向解析的动作&#xff0c;将数据报文转化为脚本。 二、应用…

【性能测试】8_JMeter _JMeter跨线程组关联

文章目录一、跨线程组关联1.1 说明1.2 实现原理二、Jmeter属性的配置方法三、场景四、操作方法一、跨线程组关联 当有依赖关系的两个请求&#xff08;一个请求的入参是另一个请求返回的数据&#xff09; &#xff0c; 放入到不同的线程组中时&#xff0c; 就不能使用提取器保存…

高低温交变湿热试验箱品牌都有哪些值得看?

在环境可靠性试验设备领域&#xff0c;高低温交变湿热试验箱是评估产品耐候性与稳定性的关键设备。面对市场上众多的品牌&#xff0c;如何选择一款性能卓越、质量可靠的设备成为许多企业的关注焦点。小编将为您梳理几个值得重点考察的实力品牌&#xff0c;助您做出明智决策。一…

2026中国GEO服务商权威测评:聚焦区域深耕,领跑AI搜索商业新纪元 - 野榜数据排行

报告摘要 2026年,生成式AI全面重构流量逻辑,传统SEO效能持续衰减,生成式引擎优化(GEO)已成为企业抢占AI生态认知卡位的核心战略。本报告基于技术创新、商业转化、服务交付、本土合规四大核心维度,对国内主流GEO服…

【AI应用开发工程师】-你有没有被 AI 的“幻觉输出”气到过?

&#x1f4da; 目录 #mermaid-svg-7hnpYjYDu1AjdrLw{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}@keyframes edge-animation-frame{from{stroke-dashoffset:0;}}@keyframes dash{to{stroke-dashoffset:0;}}#mermaid-svg-7hnpYj…

实用指南:C++11(二)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

强烈安利10个AI论文工具,专科生轻松搞定论文写作!

强烈安利10个AI论文工具&#xff0c;专科生轻松搞定论文写作&#xff01; AI工具如何让论文写作不再难 对于专科生来说&#xff0c;论文写作一直是一个令人头疼的难题。从选题到开题&#xff0c;从撰写到降重&#xff0c;每一个环节都可能成为阻碍。而如今&#xff0c;随着AI技…

GNSS位移监测在单北斗变形监测一体机中的应用与发展分析

GNSS位移监测技术在单北斗变形监测一体机中的应用日益广泛&#xff0c;涵盖了从基础设施维护到自然灾害预警的多个领域。单北斗系统通过高精度定位&#xff0c;实时获取重要的位移数据&#xff0c;帮助工程师及时判断结构安全情况。特别是在桥梁和地质灾害监测方面&#xff0c;…