小规模流处理kata。 第2部分:RxJava 1.x / 2.x

在第1部分:线程池中,我们设计并实现了相对简单的系统,用于实时处理事件。 确保您已阅读上一部分,因为它包含一些我们将重用的类。 以防万一这是要求:

一个系统每秒传送约一千个事件。 每个Event至少具有两个属性:

  • clientId –我们期望一个客户端每秒最多可以处理几个事件
  • UUID –全球唯一

消耗一个事件大约需要10毫秒。 设计此类流的使用者:

  1. 允许实时处理事件
  2. 与一个客户端有关的事件应按顺序进行处理,即,您不能并行处理同一clientId事件
  3. 如果10秒钟内出现重复的UUID ,请将其删除。 假设10秒钟后不会出现重复

到目前为止,我们提出的是线程池和共享缓存的组合。 这次我们将使用RxJava实现解决方案。 首先,我没有透露EventStream的实现方式,仅提供了API:

interface EventStream {void consume(EventConsumer consumer);}

实际上,对于手动测试,我构建了一个简单的RxJava流,其行为与需求类似,类似于系统:

@Slf4j
class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e -> log.error("Error emitting event", e));}Observable<Event> observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private Observable<Event> occasionallyDuplicate(Event x) {final Observable<Event> event = Observable.just(x);if (Math.random() >= 0.01) {return event;}final Observable<Event> duplicated =event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}}

了解此模拟器的工作原理不是必不可少的,但很有趣。 首先,我们产生的源源不断的Long值( 012 ...)每毫秒使用(每秒千个事件) interval()操作。 然后,我们使用delay()运算符将每个事件延迟01_000微秒之间的随机时间。 这样,事件将在难以预测的时刻出现,而情况会更加现实。 最后,我们将每个Long值映射(使用ekhem, map()运算符) map()到一个随机Event ,该EventclientId1_0001_100之间(包括端1_100在内)。

最后一点很有趣。 我们想模拟偶尔的重复。 为此,我们将每个事件(使用flatMap() )映射到自身(在99%的情况下)。 但是,在1%的情况下,我们两次返回此事件,第二次发生在10毫秒至5秒后。 在实践中,该事件的重复实例将在其他数百个事件之后出现,这使流的行为逼真。

EventStream进行交互的方式有两种:通过consume()回调和通过observe()流。 我们可以利用Observable<Event>来快速建立功能与第1部分非常相似但更简单的处理管道。

缺少背压

利用RxJava的第一个幼稚方法很快就失败了:

EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

ClientProjectionProjectionMetrics等人来自第1部分 )。 我们几乎立即获得MissingBackpressureException ,这是预期的。 还记得我们的第一个解决方案是如何通过处理越来越多的延迟来滞后吗? RxJava尝试避免这种情况,并避免队列溢出。 由于使用者( ClientProjection )无法实时处理事件,因此抛出MissingBackpressureException 。 这是快速失败的行为。 最快的解决方案是像以前一样使用RxJava的功能将消耗转移到一个单独的线程池中:

EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e -> clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

EventConsumer接口具有一个辅助方法,该方法可以在提供的Scheduler上异步使用事件:

@FunctionalInterface
interface EventConsumer {Event consume(Event event);default Observable<Event> consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() -> this.consume(event)).subscribeOn(scheduler);}}

通过在单独的Scheduler.io()使用flatMap()使用事件,可以异步调用每个使用。 这次事件几乎是实时处理的,但是存在更大的问题。 由于某种原因,我用FailOnConcurrentModification装饰了ClientProjection 。 事件彼此独立使用,因此可能会同时处理同一clientId两个事件。 不好。 幸运的是,在RxJava中解决此问题比使用普通线程要容易得多:

es.observe().groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

有点改变了。 首先,我们将事件按clientId分组。 这将单个Observable流拆分为流 。 每个名为byClient子流代表与同一clientId相关的所有事件。 现在,如果我们映射到此子流,我们可以确保与同一个clientId相关的事件不会同时发生。 外部流很懒,因此我们必须订阅它。 与其单独订阅每个事件,我们不每秒收集事件并进行计数。 这样,我们每秒就会收到一个Integer类型的单个事件,该事件表示每秒消耗的事件数。

使用全局状态的不纯,非惯常,容易出错,不安全的重复数据删除解决方案

现在我们必须删除重复的UUID 。 丢弃重复项的最简单但非常愚蠢的方法是利用全局状态。 我们可以通过在filter()运算符之外可用的缓存中查找重复项来简单地过滤掉重复项:

final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e -> seenUuids.getIfPresent(e.getUuid()) == null).doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

如果要监视此机制的使用,只需添加指标:

Meter duplicates = metricRegistry.meter("duplicates");es.observe().filter(e -> {if (seenUuids.getIfPresent(e.getUuid()) != null) {duplicates.mark();return false;} else {return true;}})

从操作员内部访问全局状态,尤其是可变状态非常危险,并且破坏了RxJava的唯一目的-简化并发。 显然,我们使用了Guava的线程安全Cache ,但是在许多情况下,很容易错过从多个线程访问共享全局可变状态的地方。 如果您发现自己在运算符链之外修改了一些变量,请非常小心。

RxJava 1.x中的自定义

RxJava 1.x有一个distinct()运算符,大概可以完成此工作:

es.observe().distinct(Event::getUuid).groupBy(Event::getClientId)

不幸的是, distinct()在内部将所有密钥( UUID distinct()存储在不断增长的HashSet 。 但是我们只关心最近10秒钟内的重复! 通过复制粘贴DistinctOperator的实现,我创建了DistinctEvent运算符,该运算符利用Guava的缓存仅存储了最后10秒钟的UUID值。 我故意在此运算符中对Event进行了硬编码,而不是使其变得更通用以使代码更易于理解:

class DistinctEvent implements Observable.Operator<Event, Event> {private final Duration duration;DistinctEvent(Duration duration) {this.duration = duration;}@Overridepublic Subscriber<? super Event> call(Subscriber<? super Event> child) {return new Subscriber<Event>(child) {final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).<UUID, Boolean>build().asMap();@Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) == null) {child.onNext(event);} else {request(1);}}@Overridepublic void onError(Throwable e) {child.onError(e);}@Overridepublic void onCompleted() {child.onCompleted();}};}
}

用法非常简单,整个实现(加上自定义运算符)如下:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

实际上,如果您跳过每秒的日志记录,它甚至可以更短:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

该解决方案比以前基于线程池和装饰器的解决方案要短得多。 唯一尴尬的部分是自定义运算符,该自定义运算符可在存储太多历史UUID时避免内存泄漏。 幸运的是RxJava 2得以解救!

RxJava 2.x和更强大的内置

实际上,我是从提交公关RxJava具有更强大的执行这种紧密distinct()操作。 但是在我检查2.x分支之前,它是: distinct()允许提供自定义Collection而不是硬编码的HashSet 。 信不信由你,依赖倒置不仅涉及Spring框架或Java EE。 当库允许您提供其内部数据结构的自定义实现时,这也是DI。 首先,我创建一个辅助方法,该方法可以构建由Map<UUID, Boolean>支持,由Cache<UUID, Boolean>支持的Set<UUID> Cache<UUID, Boolean> 。 我们一定喜欢代表团!

private Set<UUID> recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).<UUID, Boolean>build().asMap());
}

有了这种方法,我们可以使用以下表达式实现整个任务:

es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

优雅,简洁,清晰! 它读起来几乎像一个问题:

  • 观察事件流
  • 仅考虑不同的UUID
  • 客户分组活动
  • 为每个客户消耗(依次)

希望您喜欢所有这些解决方案,并发现它们对您的日常工作很有用。

也可以看看:

  • 小规模流处理kata。 第1部分:线程池
  • 小规模流处理kata。 第2部分:RxJava 1.x / 2.x

翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351480.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 计算信度,11.5.2 评分者信度实例分析

11.5.2 评分者信度实例分析【例11.5】 某校学生举办歌唱大赛&#xff0c;请5位评委老师为最终进入决赛的6名同学评定等级&#xff0c;结果如表11-10所示&#xff0c;请计算其信度。表11-10 歌唱大赛评定等级结果学生教师A1A2A3A4A5A6A344335B434343C233253D445344E324333解&a…

JAVA 类加载 随记

视频: https://www.bilibili.com/video/av29502877 1 JVM把Class加载到内存当中&#xff0c;检验解析初始化。默认使用的懒加载方式。 以下步骤是并行执行 可以把加载源看成JAVA配置文件。 解析&#xff1a; 符号引用&#xff1a; 类名&#xff0c;基本类型 直接引用&#xff…

Linux IO实时监控iostat命令详解

简介 iostat主要用于监控系统设备的IO负载情况&#xff0c;iostat首次运行时显示自系统启动开始的各项统计信息&#xff0c;之后运行iostat将显示自上次运行该命令以后的统计信息。用户可以通过指定统计的次数和时间来获得所需的统计信息。 语法 iostat [ -c ] [ -d ] [ -h ] […

打破冷漠僵局文章_研究僵局–第2部分

打破冷漠僵局文章调查死锁时最重要的要求之一就是要研究死锁。 在我的上一个博客中&#xff0c;我写了一些名为DeadlockDemo代码&#xff0c;该代码使用一堆线程在一系列银行账户之间转移随机数&#xff0c;然后陷入僵局。 该博客运行该代码以演示获取线程转储的几种方法。 线…

七牛云php20m文件上传不了,七牛云存储 - 用php上传图片,我在本地测试,用php 接口,不成功...

七牛反馈显示信息说明已经成功了&#xff0c;但是在七牛空间显示的文件却不完整请问这是为什么啊&#xff1f;&#xff01;&#xff01;测试文件如下&#xff0c;本地localhost测试&#xff1a;require_once("qiniu/io.php");require_once("qiniu/rs.php")…

洛谷 P2463 [SDOI2008]Sandy的卡片 解题报告

P2463 [SDOI2008]Sandy的卡片 题意 给\(n(\le 1000)\)串&#xff0c;定义两个串相等为“长度相同&#xff0c;且一个串每个数加某个数与另一个串完全相同”&#xff0c;求所有串的最长公共子串&#xff0c;每个串长\(\le 101\)&#xff0c;值域\(\in [0,1864]\) 先差分一下&…

linux和windows的进程的虚拟地址空间

昨晚看到了深夜&#xff0c;终于对进程的虚拟地址空间有了个大致的了解&#xff0c;很激动&#xff0c;也很欣慰。回头想来&#xff0c;一个程序员&#xff0c;真的应该知道这些知识&#xff0c;否则还真不太称职。 首先告诉大家&#xff0c;我后面提到的这些知识在《windows核…

安装Red Hat Container Development Kit 2.2版本

当应用程序开发人员或架构师负责探索容器化应用程序提供的可能性时&#xff0c;没有比Red Hat容器开发套件&#xff08;CDK&#xff09;容易的了。 Red Hat CDK具有本地OSX&#xff0c;Linux或Windows环境所需的所有Cloud工具&#xff0c;并且已预先配置了一些容器供您浏览。 …

php_sapi常量,PHP常量PHP_SAPI与函数php_sapi_name()简介,PHP运行环境检测

php_sapi_name() 是用来检测PHP运行环境的函数。该函数返回一个描述PHP与WEB服务器接口的小写字符串。例如&#xff1a;aolserver,apache,apache2filter,apache2handler,caudium,cgi(until PHP 5.3),cgi-fcgi,cli,continuity,embed,isapi,litespeed,milter,nsapi,phttpd,pi3web…

怎么学习正则表达式?(正则的使用心得)

以前使用正则是不会自己去学或者去了解怎么实现正则的语法的&#xff0c;感觉好难懂的样子&#xff0c;于是拖到现在 其实正则不是我想象中的那么难&#xff0c;至少入门还是比较简单。我刚学习了一些比较简单的&#xff0c;谈谈我对正则的理解以及使用心得&#xff0c;希望能帮…

java+解析占位符,如何告诉Spring使用Java映射来解析属性占位符?

Spring提供了一个MapPropertySource,您可以在ApplicationContext的环境中注册(您需要一个大多数ApplicationContext实现提供的ConfigurableEnvironment).解析器(按顺序)使用这些已注册的PropertySource值来查找占位符名称的值.这是一个完整的例子&#xff1a;ConfigurationComp…

UOJ207 共价大爷游长沙

考虑到路径是有向的&#xff0c;不是很好维护。 如果路径无向的话&#xff0c;可以直接转化为链加和查询操作。 既然有向的话&#xff0c;不妨考虑一波hash。 对于一组询问x,y&#xff0c;可以把树划分为两颗子树。 合法显然需要满足 x子树的起点的hashy子树的终点的hash x子树…

mysql select null 0,查询值中为NULL,在MySQL中产生0.00

我有一个动态编写的查询(通过Joomla的OO PHP)将一些值插入MySQL数据库.用户填写的表单上有一个字段用于金额,如果它们留空,我希望进入系统的值为NULL.我已经将错误日志中的查询写出来了;这是查询的样子&#xff1a;INSERT INTO arrc_Voucher(VoucherNbr,securityCode,sequentia…

oracle adf_Oracle ADF移动世界! 你好!

oracle adf您好&#xff0c;ADF Mobile&#xff0c;世界&#xff01; 您可能已经知道... ADF Mobile在这里&#xff01; 以下是一些链接&#xff0c;这些链接会让您有宾至如归的感觉。 ADF Mobile主页&#xff1a; http://www.oracle.com/technetwork/developer-tools/adf/o…

线段树||BZOJ1593: [Usaco2008 Feb]Hotel 旅馆||Luogu P2894 [USACO08FEB]酒店Hotel

题面&#xff1a;P2894 [USACO08FEB]酒店Hotel 题解&#xff1a;和基础的线段树操作差别不是很大&#xff0c;就是在传统的线段树基础上多维护一段区间最长的合法前驱&#xff08;h_&#xff09;&#xff0c;最长合法后驱&#xff08;t_&#xff09;&#xff0c;一段中最长的合…

Linux内存管理详解

前一段时间看了《深入理解Linux内核》对其中的内存管理部分花了不少时间&#xff0c;但是还是有很多问题不是很清楚&#xff0c;最近又花了一些时间复习了一下&#xff0c;在这里记录下自己的理解和对Linux中内存管理的一些看法和认识。 我比较喜欢搞清楚一个技术本身的发展历程…

JavaOne 2016 Essentials:7个您不容错过的事件和会话

参加JavaOne吗&#xff1f; 确保您不会错过这些活动 又到了每年的这个时候。 旧金山一年一度的Java假期即将来临&#xff0c;全市发生了超过400场会议和活动。 由于所有这些内容和新体验都一次发生&#xff0c;因此很容易就无法跟踪正在发生的事情。 在这篇文章中&#xff0c…

任务18:控制反转

控制反转 实现你的依赖&#xff0c;采用什么依赖&#xff0c;不由你自己决定&#xff0c;这个控制交给IOC容器。 这里所有的实现都不由你自己决定&#xff0c;我们只需要传给你就可以了。谁来传呢&#xff1f;容器来传给他 内存的Repository&#xff0c;这里实现的比较简单。 这…

程序的重定位问题(程序装入)

在多道程序环境下&#xff0c;要使程序运行&#xff0c;必须先为程序创建进程。而创建进程的第一件事就是&#xff1a;将程序和数据装入内存。如何将一个用户源程序变成可在内存中执行的程序&#xff0c;通常都要进过几个步骤&#xff1a;1.编译&#xff1a;由compiler将源程序…

matlab频响优化,MATLAB中关于频响图函数最优化的程序问题

我是一名大四的学生,现在正在做毕业设计,因MATLAB从未学过,也是边学边做,我需要求出IGv函数的频率响应图,因IGv的表达式很复杂,这里我没列出,在下面的程序中有的,以下是我的程序,请高手帮我看看程序有什么问题哈,万分感谢!%%igmax is global maximumIgmin1000000000;%%exmperim…