Actor-ES框架:Ray-Handler-消息订阅器编写

消息订阅器:

Ray是基于Event Sourcing设计的ES/Actor框架,消息发布后需要订阅处理,订阅器主要有以下两类:

  • CoreHandler消息订阅器=RabbitSub+SubHandler

  • ToReadHandler消息订阅器=RabbitSub+SQLToReadHandler(ToReadHandler的子类)

    RabbitSub特性

RabbitSub特性是RabbitMQ消息队列订阅器。

RabbitSub特性有两个构造函数,常用的是这个:

public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)
  • group:通常用于分类。示例中,X-CoreHandler的group是Core,X-ToReadHandler是Read。

  • exchange:RabbitMQ中的exchange名称。

  • queue:RabbitMQ中的queue名称。

  • queueCount:消息队列数。用于消息的负载均衡。

示例:

[RabbitSub("Core", "Account", "account")]
public sealed class AccountCoreHandler : SubHandler<string, MessageInfo>
{……
}

RabbitSub可以单独使用,用于订阅消息。


CoreHandler消息订阅器

Ray中的ESActor通过RaiseEvent方法发布事件,传递消息。Ray默认使用RabbitMQ传递消息。ESActor发起事件后,CoreHandler订阅事件,以处理事件。

实现方式是:

  • 继承SubHandler。

  • 添加RabbitSub特性。 exchange名称、queue名称与ESGrain上RabbitPub特性的标识一致。

  • 添加构造函数(必须)。

    public AccountCoreHandler(IServiceProvider svProvider) : base(svProvider)
    {
    }
  • 事件被订阅后会流转到Tell方法中,data是要处理的事件。

    public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
    {switch (data){case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));default: return task;}
    }
    ToReadHandler消息订阅器
  1. SQLToReadHandler

ESActor发起事件后,X-ToReadHandler订阅事件,以处理事件。X-ToReadHandler继承自X-SQLToReadHandler,X-SQLToReadHandler继承自ToReadHandler。

示例图:

X-SQLToReadHandler需要使用者继承PartSubHandler,根据使用的关系型数据库自己实现。Ray默认提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他关系型数据库,请自定义实现。

X-SQLToReadHandler实现细节:
修改对应关系型数据库的Integrity Constraint Violation(违反完整性约束)的异常。
可以将实例中PSQLToReadHandler当做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))即可。


说明:

当X-ToReadHandler订阅消息,消息有重放的场景,如果该消息已经得到处理,数据库中已经存在其处理后的结果,这是可能会报Integrity Constraint Violation(违反完整性约束)异常,默认不做处理,其他异常将其抛出,这是这段代码的作用。


示例模板:

public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo>
{public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider){ }public override Task Notice(byte[] data){return base.Notice(data).ContinueWith(t =>{if (t.Exception != null){//根据使用数据库,修改这个if判断if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505")){throw t.Exception;}}});}
}

  2. X-ToReadHandler
X-ToReadHandler订阅器主要用于订阅感兴趣的消息,将数据写入到数据库中。

实现方式是:

  • 实现SQLToReadHandler

  • ToReadHandler继承SQLToReadHandler(ToReadHandler的子类)

  • 添加RabbitSub特性。

  • 添加构造函数(必须),在构造函数中注册关注的事件。

    public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider)
    {Register<AmountAddEvent>();Register<AmountTransferEvent>();
    }

    代码如下所示:

[RabbitSub("Read", "Account", "account")]
public sealed class AccountToReadHandler : PSQLToReadHandler<string>
{public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider){Register<AmountAddEvent>();Register<AmountTransferEvent>();}
}
X-ToReadHandler消息订阅器与CoreHandler消息订阅器差异

X-ToReadHandler消息订阅器使用时,需要在构造函数中注册关心的事件,而X-CoreHandler中不需要,原因是事件在处理中需要反序列化,X-CoreHandler会对RabbitSub参数指定订阅的所有的消息反序列化,X-ToReadHandler在此基础上做了进一步的控制,在订阅的消息中只对Register的事件处理。这样做的原因:1.反序列化会消耗一定的性能,进一步控制有助于提高性能;2.Ray提供两种实现方式,为开发者扩展自定义源码提供借鉴。

总结:

  • CoreHandler消息订阅器=RabbitSub+SubHandler

  • ToReadHandler消息订阅器=RabbitSub+SQLToReadHandler(ToReadHandler的子类)

相关文章:


原文地址:http://www.cnblogs.com/CharlesZHENG/p/8425856.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322136.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nacos(十三)之naocs 1.4.1单机和集群部署配置

一、下载安装nacos Nacos 依赖 Java 环境来运行。如果您是从代码开始构建并运行Nacos&#xff0c;还需要为此配置 Maven环境&#xff0c;请确保是在以下版本环境中安装使用: 64 bit OS&#xff0c;支持 Linux/Unix/Mac/Windows&#xff0c;推荐选用 Linux/Unix/Mac。64 bit J…

P3807-[模板]卢卡斯定理

正题 题目链接:https://www.luogu.org/problem/P3807 题目大意 求Cnmm%pC_{nm}^m\% pCnmm​%p LucasLucasLucas定理 Cnm%pC⌊np⌋⌊mp⌋∗Cn%pm%pC_{n}^m\% pC_{\lfloor \frac{n}{p}\rfloor}^{\lfloor \frac{m}{p}\rfloor}*C_{n\%p}^{m\%p}Cnm​%pC⌊pn​⌋⌊pm​⌋​∗Cn%pm…

Actor-ES框架:Actor编写-ESGrain与ESRepGrain

ESGrain生命周期Ray中ESGrain继承自Grain扩展了Grain的生命周期。Grain的生命周期参加文档附录&#xff1a;1-Grain生命周期-译注.mdESGrain重写了Grain的OnActivateAsync方法。ESGrain的初始化过程如下&#xff1a;初始化ESGrain中的State调用ReadSnapshotAsync()读快照。如果…

SpringBoot @Async加在实现接口类的非接口方法上获取Bean异常

一、场景复现 报错日志 *************************** APPLICATION FAILED TO START ***************************Description:A component required a bean of type com.mk.service.TestService that could not be found.Action:Consider defining a bean of type com.mk.ser…

DotNetAnywhere:可供选择的 .NET 运行时

我最近在收听一个名为DotNetRock 的优质播客&#xff0c;其中有以Knockout.js而闻名的Steven Sanderson 正在讨论 " WebAssembly And Blazor "。也许你还没听过&#xff0c;Blazor 正试图凭借WebAssembly的魔力将 .NET 带入到浏览器中。如果您想了解更多信息&#xf…

SpringCloud Greenwich(二)注册中心之consul、Zuul和 gateway网关配置

本项目是搭建基于consul注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;2&#xff09;环境 consul 1.9.0…

jzoj6274-[NOIP提高组模拟1]梦境【贪心,堆】

正题 题目大意 nnn个区间mmm个点&#xff0c;一个区间只能匹配一个点&#xff0c;求最大匹配数。 解题思路 我们可以先将点排序&#xff0c;和区间按照左端点排序。然后从左往右枚举点&#xff0c;遇到一个区间的左端点就加入这样我们就可以忽略区间的左端点了&#xff0c;然后…

Actor-ES框架:消息发布器与消息存储器

消息发布器&#xff1a;Ray是基于Event Sourcing设计的ES/Actor框架&#xff0c;ESGrain状态&#xff08;State&#xff09;的修改、ESGrain之间的通信默认使用RabbitMQ通信。消息的发布器主要是RabbitPubESGrain。RabbitPub特性RabbitPub特性是RabbitMQ消息发布器。RabbitSub特…

consul的安装搭建

一、下载consul consul官网下载地址&#xff1a;https://www.consul.io/downloads 旧版本下载 consul 1.9.3直接下载地址&#xff1a; consul_1.9.3_windows_amd64.zip consul_1.9.3_linux_amd64.zip 二、安装 将consul_1.9.3_xxx.zip解压的xxx/consul目录 &#xff08;1&…

使用xUnit为.net core程序进行单元测试

一. 导读为什么要编写自动化测试程序&#xff08;Automated Tests&#xff09;&#xff1f;可以频繁的进行测试可以在任何时间进行测试&#xff0c;也可以按计划定时进行&#xff0c;例如&#xff1a;可以在半夜进行自动测试。肯定比人工测试要快。可以更快速的发现错误。基本上…

jzoj6276-[Noip提高组模拟1]树【线段树,扫描线,倍增】

正题 题目大意 一棵树&#xff0c;若干个点对&#xff0c;求不包括任何一个点对的路径数量。 解题思路 我们考虑将不合法的方案在坐标系上表示。 我们先只考虑一个点对(x,y)(x,y)(x,y)&#xff0c;若xxx和yyy没有祖先关系&#xff0c;则不合法的路径一个点在xxx的子树中&…

SpringCloud Greenwich(三)注册中心之zookeeper、Zuul和 gateway网关配置

本项目是搭建基于zookeeper注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;2&#xff09;环境 zookeeper…

Metrics, tracing 和 logging 的关系

译者注Peter Bourgon原作&#xff1a; Metrics, tracing, and logging译者&#xff1a;吴晟原作发表时间&#xff1a; 2017年2月21日这是在OpenTracing和分布式追踪领域内广受欢迎的一篇博客文章。在构建监控系统时&#xff0c;大家往往在这几个名词和方式之间纠结。 通过这篇文…

jzoj6275-[NOIP提高组模拟1]小L的数列【矩阵乘法,欧拉定理】

正题 题目大意 有递推式fi∏j1kfi−jbjf_{i}\prod_{j1}^kf_{i-j}^{b_j}fi​j1∏k​fi−jbj​​ 给出f1∼kf_{1\sim k}f1∼k​和b1∼kb_{1\sim k}b1∼k​ 求fnf_nfn​ 解题思路 首先这题的指数如此之大所以不能直接存&#xff0c;而指数又不能直接模所以我们要用欧拉定理 (b…

快速序列化组件MessagePack介绍

简介MessagePack for C&#xff03;&#xff08;MessagePack-CSharp&#xff09;是用于C&#xff03;的极速MessagePack序列化程序&#xff0c;比MsgPack-Cli快10倍&#xff0c;与其他所有C&#xff03;序列化程序相比&#xff0c;具有最好的性能。 MessagePack for C&#xff…

SpringCloud Greenwich(四)注册中心之eureka、Zuul和 gateway网关配置

本项目是搭建基于eureka注册中心的springcloud&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 eureka-server eureka注册中心 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 &#xff08;…

欢乐纪中A组赛【2019.8.7】

前言 在短暂的比赛时间中&#xff0c;我发现本菜鸡越是功于心计想ACACAC&#xff0c;越是拿不到分&#xff0c;所以。。。 我不写比赛了JOJO!JOJO!JOJO! 成绩 JJJ表示初中&#xff0c;HHH表示高中后面加的是几年级 RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC14…

【ASP.NET Core】给路由规则命名有何用处

上一篇中老周给伙伴们介绍了自定义视图搜索路径的方法&#xff0c;本篇咱们扯一下有关 URL 路径规则的名称问题。在扯今天的话题之前&#xff0c;先补充点东东。在上一篇中设置视图搜索路径时用到三个有序参数&#xff1a;{2}{1}{0}&#xff0c;分别是 Area、Controller、Actio…

SpringCloud Greenwich(五)之nacos、dubbo、Zuul和 gateway集成

本项目是搭建基于nacos注册中心的springcloud&#xff0c;集成dubbo框架&#xff0c;使用zuul网关和gateway网关 一、框架搭建 &#xff08;1&#xff09;项目结构 micro-service 服务提供者 zuul-gateway zuul网关 springcloud-gateway gateway网关 class-provider dubo…

P4318,bzoj2440-完全平方数【二分答案,莫比乌斯函数,容斥】

正题 题目链接: https://www.luogu.org/problem/P4318 https://www.lydsy.com/JudgeOnline/problem.php?id2440 题目大意 完全平方数只对应任意一个的正整数满足d∣n,d2∤nd\mid n,d^2\nmid nd∣n,d2∤n(也就是nnn的质因数分解后都没有次数)。 求第kkk个完全平方数 解题思路…