在Asp.Net Core中集成Kafka

  在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的不同,这篇文章的主要目的是用来系统讲述如何在Asp.Net Core中使用Kafka,整篇文章将介绍如何写消息发送方代码、消费方代码、配套的工具的使用,希望读完这篇文章之后对整个消息的运行机制有一定的理解,在这里通过一张图来简要了解一下消息队列中的一些概念。


640?wx_fmt=png

图一 Kafka消息队列

  一 安装NUGET包

  在写代码之前首先要做的就是安装nuget包了,我们这里使用的是Confluent.Kafka 1.0.0-RC4版本,具体项目要根据具体的时间来确定引用包的版本,这些包可能更新比较快。

640?wx_fmt=png

图二 引用Kafka包依赖

  二 消息发送方(Producer)

  1 在项目中添加所有触发事件的接口 IIntegrationEvent,后面所有的触发事件都是继承自这个接口。


/// <summary>
/// 集成事件的接口定义
/// </summary>
public interface IIntegrationEvent {
string Key { get; set; }
}

  2 定义Kafka生产者


/// <summary>
/// Kafka 生产者的 Domain Service
/// </summary>
public class KafkaProducer : DomainService {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger;
public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger;
}
/// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
var topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}");
var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build()) {
try {
var json = JsonConvert.SerializeObject(@event);
var dr = producer.ProduceAsync(topic, new Message<string, string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
_logger.LogDebug("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", topic, ex.Error.Reason);
}
}
}
}

  在这里我们的Producer根据业务的需要定义在领域服务中,这里面最关键的就是Produce方法了,该方法的参数是继承自IIntegrationEvent 接口的各种各样事件,在这个方法中,我们获取配置在appsetting.json中配置的各种Topic以及Kafka服务器的地址,具体的配置如下方截图所示。  

640?wx_fmt=png

图三 配置服务器地址以及各种Topic

  通过当前配置我们就知道我们的消息要发往何处,然后我们就可以创建一个producer来将我们的事件(实际上是定义的数据结构)序列化成Json,然后通过异步的方式发送出去,这里需要注意我们创建的Producer要放在一个using块中,这样在创建完成并发送消息之后就会释放当前生产者。这里如果发送失败会在当前日志中记录发送的值以及错误的原因从而便于进行调试。这里举出其中的一个事件RepairContractFinishedEvent为例来说明。


/// <summary>
/// 维修合同完成的事件
/// </summary>
public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContract RepairContract { get; set; }
//一个维修合同会对应多个调整单
public List<RepairContractAdjust> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}

  这个里面RepairContract以及List集合都是我们定义的一种数据结构。

  最后我们来看看在具体的领域层中我们该如何触发此事件的,这里我们也定义了一个叫做IRepairContractEventManager接口的领域服务,并在里面定义了一个叫做Finished的接口,然后在RepairContractEventManager中实现该方法。


public class RepairContractEventManager : DomainService, IRepairContractEventManager {
private readonly KafkaProducer _producer;
private readonly IRepository<RepairContract, Guid> _repairContractRepository;
private readonly IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository;
public RepairContractEventManager(KafkaProducer producer,
IRepository<RepairContract, Guid> repairContractRepository,
IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {
_producer = producer;
_repairContractRepository = repairContractRepository;
_repairContractAdjustRepository = repairContractAdjustRepository;
}
public void Finished(Guid repairContractId) {
var repairContract = _repairContractRepository.GetAll()
.Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
.SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
.Include(a => a.WorkItems).ThenInclude(w => w.Materials)
.Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();
var @event = new RepairContractFinishedEvent {
Key = repairContract?.Code,
RepairContract = repairContract,
RepairContractAdjusts = repairContractAdjusts
};
_producer.Produce(@event);
}
}

 

 这段代码就是组装RepairContractFinishedEvent的具体实现过程,然后调用我们之前创建的KafkaProducer对象然后将消息发送出去,这样在需要触发当前RepairContractFinishedEvent 的地方来注入IRepairContractEventManager接口,然后调对应的Finished方法,这样就完成了整个消息的发送的过程了。

  三 查看消息的发送

  在发送完消息后我们可以到Kafka 集群 Control Center中查找我们发送的所有消息。选择其中的一条消息,双击,然后选择INSPECT来查看发送的消息

640?wx_fmt=png

图四 Kafka Control Center中查看发送消息

  四 消息的接收方(Consumer)

  在正确创建消息的发送方后紧接着就是定义消息的接收方了,消息的接收方顾名思义就是消费刚才消息的一方,这里的步骤和发送类似,但是也有很大的不同,消息的消费方核心是一个后台服务,并且在单独的线程中监听来自发送方的消息,并进行消费,这里我们先定义一个叫做KafkaConsumerHostedService的基类,我们具体来看看代码。


/// <summary>
/// Kafka 消费者的后台服务基础类
/// </summary>
/// <typeparam name="T">事件类型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
protected readonly IServiceProvider _services;
protected readonly IConfiguration _config;
protected readonly ILogger<KafkaConsumerHostedService<T>> _logger;
public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {
_services = services;
_config = config;
_logger = logger;
}
/// <summary>
/// 消费该事件,比如调用 Application Service 持久化数据等
/// </summary>
/// <param name="event">事件内容</param>
protected abstract void DoWork(T @event);
/// <summary>
/// 构造 Kafka 消费者实例,监听指定 Topic,获得最新的事件
/// </summary>
/// <param name="stoppingToken">终止标识</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(() => {
var topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
var consumerConfig = new ConsumerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = _config.GetValue<string>("Application:Name"),
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build()) {
consumer.Subscribe(topic);
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
var @event = JsonConvert.DeserializeObject<T>(result.Value);
DoWork(@event);
//consumer.StoreOffset(result);
} catch (OperationCanceledException ex) {
consumer.Close();
_logger.LogDebug(ex, "Kafka 消费者结束,退出后台线程");
} catch (AbpValidationException ex) {
_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");
} catch (ConsumeException ex) {
_logger.LogError(ex, "Kafka 消费者产生异常");
} catch (KafkaException ex) {
_logger.LogError(ex, "Kafka 产生异常");
} catch (ValidationException ex) {
_logger.LogError(ex, "Kafka 消息验证失败");
} catch (Exception ex) {
_logger.LogError(ex, "Kafka 捕获意外异常");
}
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("验证过程中检测到以下错误");
foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
}
return detailBuilder.ToString();
}
}

  这段代码中我们会创建一个consumer,这里我们会在一个While循环中去订阅特定Topic消息,这里的BootstrapServers是和发送方保持一致,并且也是在当前应用程序中的appsetting.json中进行配置的,而且这里的consumer.Consume方法是一个阻塞式方法,当发送方发送特定事件后,这里会接收到同样名称的Topic的消息,然后将接收到的Json数据进行反序列化,然后交由后面的DoWork方法进行处理。这里还是以之前生成者发送的RepairContractFinished事件为例,这里也需要定义一个RepairContractFinishedEventHandler来处理生产者发送的消息。


public class RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {
public RepairContractFinishedEventHandler(IServiceProvider services,
IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger)
: base(services, config, logger) {
}
/// <summary>
/// 调用 Application Service,新增或更新维修合同及关联实体
/// </summary>
/// <param name="event">待消费的事件</param>
protected override void DoWork(RepairContractFinishedEvent @event) {
using (var scope = _services.CreateScope()) {
var service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();
service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
}
}
}

  这里需要特别注意的是在这里我么也需要定义一个继承自IIntegrationEvent接口的事件,这里也是定义一种数据结构,并且这里的数据结构和生成者定义的要保持一致,否则消费方在反序列化的时候会丢失不能够匹配的信息。


public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContractDto RepairContract { get; set; }
public List<RepairContractAdjustDto> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}

  另外在DoWork方法中我们也需要注意代码也需要用using包裹,从而在消费方消费完后释放掉当前的应用服务。最后需要注意的就是我们的每一个Handle都是一个后台服务,我们需要在Asp.Net Core的Startup的ConfigureServices进行配置,从而将当前的后台服务添加到Asp.Net Core依赖注入容器中。


/// <summary>
/// 注册集成事件的处理器
/// </summary>
/// <param name="services"></param>
private void AddIntegrationEventHandlers(IServiceCollection services) {
services.AddHostedService<RepairContractFinishedEventHandler>();
services.AddHostedService<ProductTransferDataEventHandler>();
services.AddHostedService<PartUpdateEventHandler>();
services.AddHostedService<VehicleSoldFinishedEventHandler>();
services.AddHostedService<AddOrUpdateDealerEventHandler>();
services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();
services.AddHostedService<CustomerFinishedEventHandler>();
services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();
services.AddHostedService<AddCustomerEventHandler>();
}

  最后我们也看看我们的appsetting.json的配置文件关于kafka的配置。


"Kafka": {
"BootstrapServers": "127.0.0.1:9092",
"MessageTimeoutMs": 5000,
"Topics": {
"RepairContractFinishedEvent": "repair-contract-finished",
"AddOrUpdateProductCategoryEvent": "add-update-product-category",
"AddOrUpdateDealerEvent": "add-update-dealer",
"ClaimApproveEvent": "claim-approve",
"ProductTransferDataEvent": "product-update",
"PartUpdateEvent": "part-update",
"VehicleSoldFinishedEvent": "vehiclesold-finished",
"CustomerFinishedEvent": "customer-update",
"VehicleInformationUpdateStatusEvent": "add-update-vehicle-info",
"AddCustomerEvent": "add-customer"
}
},

  这里需要注意的是发送方和接收方必须保证Topic一致,并且配置的服务器名称端口保持一致,这样才能够保证消息的准确发送和接收。最后对于服务端,这里推荐一个VSCode的插件kafka,能够创建并发送消息,这样就方便我们来发送我们需要的数据了,这里同样需要我们先建立一个.kafka的文件,然后配置Kafka服务的地址和端口号。

640?wx_fmt=png

图五 利用VSCode Kafka插件发送消息

原文地址:https://www.cnblogs.com/seekdream/p/10757541.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 
640?wx_fmt=jpeg


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/316139.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生成函数(母函数)

参考文章&#xff1a; 生成函数(母函数)——目前最全的讲解 《小学生都能看懂的生成函数从入门到升天教程》《生成函数全家桶》 Acwing 进阶课程–生成函数 引入 任意给定一个无限长的序列a0,a1....an....a_{0},a_{1}....a_{n}....a0​,a1​....an​.... 定义函数g(x)a0x0a1x…

AtCoder Beginner Contest 234

A - Weird Function B - Longest Segment C - Happy New Year! D - Prefix K-th Max E - Arithmetic Number F - Reordering G - Divide a Sequence 写个函数 int f(int x){return x*x2*x3;} int main() { int t;scanf("%d", &t);cout<<f(f(f(t)t)f(f(t…

分享一个.NET平台开源免费跨平台的大数据分析框架.NET for Apache Spark

今天早上六点半左右微信群里就看到张队发的关于.NET Spark大数据的链接https://devblogs.microsoft.com/dotnet/introducing-net-for-apache-spark/ &#xff0c;正印证了“微软在不断通过.NET Core补齐各领域开发&#xff0c;真正实现一种语言的跨平台”这句话。那么我们今天就…

Codeforces Round #760 (Div. 3)

E. Singers’ Tour F. Reverse G. Trader Problem 推推式子就行了。 int a[N]; int main() {int t;scanf("%d", &t);while(t --){int n;LL sum 0;scanf("%d", &n);for(int i 1;i < n;i ) scanf("%d", ai), sum a[i];a[0] a[n]…

acwing3132. 食物(BZOJ3028)

acwing3132. 食物 题意&#xff1a; 你当然要帮他计算携带 N 件物品的方案数。 承德汉堡&#xff1a;偶数个。 可乐&#xff1a;0 个或 1 个。 鸡腿&#xff1a;0 个&#xff0c;1 个或 2 个。 蜜桃多&#xff1a;奇数个。 鸡块&#xff1a;4 的倍数个。 包子&#xff1a;0 个…

持续畅销20年的《C#高级编程》出第11版了!

TA是谁&#xff1f;Wrox精品红皮书&#xff0c;引领无数程序员进入程序开发殿堂&#xff0c;C#专家级指南&#xff0c;是经验丰富的程序员提高效率的更快捷方式&#xff0c;连续畅销20年&#xff0c;累计销量超30万册。TA出生名门&#xff1a;TA战绩辉煌&#xff1a;2019新的征…

cfF. Boring Queries

cfF. Boring Queries 题意&#xff1a; n个数组a[]&#xff0c;q个询问&#xff0c;每次询问区间[l,r]的lcm值 题目要求强制在线 1<n<1e5 1<a<2e5 1<q<1e5 题解&#xff1a; 添加链接描述 添加链接描述 添加链接描述 我们一般求lcm都是直接通过ab/gcd(a…

Educational Codeforces Round 119 (Rated for Div. 2)

D. Exact Change E. Replace the Numbers G. Subsequences Galore 因为1和2的数量最大值不是很多&#xff0c;多了的话可以用3代替&#xff0c;那么枚举1和2的数量然后二分3的数量 int a[110], n; bitset<10> bit; bool ch(int x) {for(int i 1;i < n;i ){int num …

.NET微服务体系结构中为什么使用Ocelot实现API网关

为什么要使用API网关而不是直接通信&#xff1f;在微服务架构中&#xff0c;客户端应用程序通常需要使用来自多个微服务的功能。如果直接执行该消费&#xff0c;则客户端需要处理多个微服务端点以进行呼叫。当应用程序发展并引入新的微服务或更新现有的微服务时会发生什么&…

P2000 拯救世界

P2000 拯救世界 题意&#xff1a; 为了拯救世界&#xff0c;小 a 和 uim 决定召唤出 kkksc03 大神和 lzn 大神。根据古籍记载&#xff0c;召唤出任何一位大神&#xff0c;都需要使用金木水火土五种五行神石来摆一个特定的大阵。而在古籍中&#xff0c;记载是这样的&#xff1…

Codeforces Round #762 (Div. 3)

E. MEX and Increments F. Let’s Play the Hat? G. Unusual Minesweeper H. Permutation and Queries 用个优先队列模拟。 map<int,int>ma; priority_queue<int> q;int main() {int t;scanf("%d", &t);while(t --){int n;scanf("%d", …

基于Jenkins Pipeline的ASP.NET Core持续集成实践

最近在公司实践持续集成&#xff0c;使用到了Jenkins的Pipeline来提高团队基于ASP.NET Core API服务的集成与部署&#xff0c;因此这里总结一下。一、关于持续集成与Jenkins Pipeline1.1 持续集成相关概念互联网软件的开发和发布&#xff0c;已经形成了一套标准流程&#xff0c…

踩不出足迹(牛客练习赛88 )

踩不出足迹(牛客练习赛88 ) 题意&#xff1a; 长度为n的数组a&#xff0c;每个数是一个k位二进制 定义一下操作&#xff1a; 令第一次得到的结果为 a1a_1a1​。你需要从第二个数开始&#xff0c;每次可以选择与上一次得到的结果异或或者同或起来。 问最大值是多少&#xff1f…

Codeforces Round #766 (Div. 2)

D. Not Adding E. Not Escaping F. Not Splitting 直接枚举就行了&#xff0c;原本还想的是素倍数&#xff0c;但是素倍数也不行。 bool dis[N];int main() {int n, x;scanf("%d", &n); int ans -n;while(n --)scanf("%d", &x), dis[x] 1;for(…

编程语言之父谈语言设计,龟叔大赞TypeScript

争论哪门编程语言孰优孰劣&#xff0c;长期以来都是程序员乐此不疲的“娱乐活动”。之所以说是娱乐活动&#xff0c;因为这些争论到最后往往只是各自在发泄情绪&#xff0c;再则就是&#xff0c;脱离使用场景去讨论所谓哪门语言更好并没意义。但如果让编程语言作者坐在一起讨论…

P2656 采蘑菇

P2656 采蘑菇 题意&#xff1a; 有n个点&#xff0c;m个单向边&#xff0c;每个边都有边权&#xff0c;如果经过这个边&#xff0c;可以获得其边权&#xff0c;而其边权会变成原来的p倍(0.1<p<0.8)&#xff0c;向下取整 从s点出发&#xff0c;问最多可以采到的蘑菇 题…

Codeforces Round #764 (Div. 3)

A. Plus One on the Subset B. Make AP C. Division by Two and Permutation D. Palindromes Coloring E. Masha-forgetful F. Interacdive Problem G. MinOr Tree 就是最小值逐步增加到最大值的过程。 int main() {int t;scanf("%d", &t);for(int _ 1;_ <…

你必须知道的 SmartSql

介绍SmartSql MyBatis Cache(Memory | Redis) R/W Splitting Dynamic Repository Diagnostics ......简洁、高效、高性能、扩展性、监控、渐进式开发&#xff01;她是如何工作的&#xff1f;SmartSql 借鉴了 MyBatis 的思想&#xff0c;使用 XML 来管理 SQL &#xff0c;并…

CF785D Anton and School - 2

CF785D Anton and School - 2 题意&#xff1a; 给定一个长度≤210^5由(和)组成的字符串&#xff0c;问有多少个子串&#xff08;可以不连续&#xff09;&#xff0c;前半部分是由(组成后半部分由)组成. 题解&#xff1a; 怎么括号匹配能出这么多题 如何才能不重不漏的选出…

OsharpNS轻量级.net core快速开发框架简明入门教程

OsharpNS官方资源项目地址&#xff1a;https://github.com/i66soft/osharp-ns20演示地址&#xff1a;https://www.osharp.org 直接使用QQ登录可以查看效果文档地址&#xff1a;https://docs.osharp.org 正在完善中....发布博客&#xff1a;https://www.cnblogs.com/guomingfeng…