kata_小规模流处理kata。 第2部分:RxJava 1.x / 2.x

kata

在第1部分:线程池中,我们设计并实现了相对简单的系统,用于实时处理事件。 确保您阅读了上一部分,因为它包含一些我们将重用的类。 以防万一这是要求:

一个系统每秒发送大约一千个事件。 每个Event至少具有两个属性:

  • clientId –我们期望一个客户端每秒最多可以处理几个事件
  • UUID –全球唯一

消耗一个事件大约需要10毫秒。 设计此类流的使用者:

  1. 允许实时处理事件
  2. 与一个客户端有关的事件应按顺序进行处理,即,您不能并行处理同一clientId事件
  3. 如果10秒钟内出现重复的UUID ,请将其删除。 假设10秒钟后不会出现重复

到目前为止,我们提出的是线程池和共享缓存的组合。 这次我们将使用RxJava实现解决方案。 首先,我没有透露EventStream的实现方式,仅提供了API:

interface EventStream {void consume(EventConsumer consumer);}

实际上,对于手动测试,我构建了一个简单的RxJava流,其行为与系统的要求类似:

@Slf4j
class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e -> log.error("Error emitting event", e));}Observable<Event> observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private Observable<Event> occasionallyDuplicate(Event x) {final Observable<Event> event = Observable.just(x);if (Math.random() >= 0.01) {return event;}final Observable<Event> duplicated =event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}}

了解此模拟器的工作原理不是必不可少的,但很有趣。 首先,我们产生的源源不断的Long值( 012 ...)每毫秒使用(每秒千个事件) interval()操作。 然后,我们使用delay()运算符将每个事件延迟01_000微秒之间的随机时间。 这样,事件将在难以预测的时刻出现,而情况会更加现实。 最后,我们将每个Long值映射(使用ekhem, map()运算符) map()到一个随机Event其中clientId介于1_0001_100 (包含在内)之间。

最后一点很有趣。 我们想模拟偶尔的重复。 为此,我们将每个事件(使用flatMap() )映射到自身(在99%的情况下)。 但是,在1%的情况下,我们两次返回此事件,第二次发生在10毫秒至5秒后。 在实践中,该事件的重复实例将在其他数百个事件之后出现,这使流的行为逼真。

EventStream交互的方式有两种-通过consume()回调和通过observe()流。 我们可以利用Observable<Event>来快速建立功能与第1部分非常相似但更简单的处理管道。

缺少背压

利用RxJava的第一个幼稚方法很快就失败了:

EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

ClientProjectionProjectionMetrics等人来自第1部分 )。 我们几乎立即获得MissingBackpressureException ,这是预期的。 还记得我们的第一个解决方案是如何通过处理越来越多的延迟来滞后吗? RxJava尝试避免这种情况,并避免队列溢出。 由于使用者( ClientProjection )无法实时处理事件,因此抛出MissingBackpressureException 。 这是快速失败的行为。 最快的解决方案是像以前一样使用RxJava的功能将消耗转移到单独的线程池中:

EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e -> clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

EventConsumer接口具有一个辅助方法,该方法可以在提供的Scheduler上异步使用事件:

@FunctionalInterface
interface EventConsumer {Event consume(Event event);default Observable<Event> consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() -> this.consume(event)).subscribeOn(scheduler);}}

通过在单独的Scheduler.io()使用flatMap()使用事件,可以异步调用每个使用。 这次事件几乎是实时处理的,但是存在更大的问题。 由于某种原因,我用FailOnConcurrentModification装饰了ClientProjection 。 事件彼此独立使用,因此可能会同时处理同一clientId两个事件。 不好。 幸运的是,在RxJava中解决此问题比使用普通线程要容易得多:

es.observe().groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

有点改变了。 首先,我们将事件按clientId分组。 这将单个Observable流拆分为流 。 每个名为byClient子流代表与同一clientId相关的所有事件。 现在,如果我们映射到此子流,我们可以确保与同一个clientId相关的事件不会同时处理。 外部流是惰性的,因此我们必须订阅它。 与其单独订阅每个事件,我们不每秒收集事件并进行计数。 这样,我们每秒就会收到一个Integer类型的单个事件,该事件表示每秒消耗的事件数。

使用全局状态的不纯,非惯常,容易出错,不安全的重复数据删除解决方案

现在我们必须删除重复的UUID 。 丢弃重复项的最简单但非常愚蠢的方法是利用全局状态。 我们可以通过在filter()运算符之外可用的缓存中查找重复项来简单地过滤掉重复项:

final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e -> seenUuids.getIfPresent(e.getUuid()) == null).doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

如果要监视此机制的使用,只需添加指标:

Meter duplicates = metricRegistry.meter("duplicates");es.observe().filter(e -> {if (seenUuids.getIfPresent(e.getUuid()) != null) {duplicates.mark();return false;} else {return true;}})

从操作员内部访问全局状态,尤其是可变状态是非常危险的,并且破坏了RxJava的唯一目的–简化并发。 显然,我们使用了Guava的线程安全Cache ,但是在许多情况下,很容易错过从多个线程访问共享全局可变状态的地方。 如果您发现自己在运算符链之外对某些变量进行了变异,请非常小心。

RxJava 1.x中的自定义

RxJava 1.x有一个distinct()运算符,大概可以完成此工作:

es.observe().distinct(Event::getUuid).groupBy(Event::getClientId)

不幸的是, distinct()在内部将所有密钥( UUID distinct()存储在不断增长的HashSet 。 但是我们只关心最近10秒钟内的重复! 通过复制粘贴DistinctOperator的实现,我创建了DistinctEvent运算符,该运算符利用Guava的缓存仅存储了最后10秒钟的UUID值。 我故意在此运算符中对Event进行硬编码,而不是使其通用性更强,以使代码更易于理解:

class DistinctEvent implements Observable.Operator<Event, Event> {private final Duration duration;DistinctEvent(Duration duration) {this.duration = duration;}@Overridepublic Subscriber<? super Event> call(Subscriber<? super Event> child) {return new Subscriber<Event>(child) {final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).<UUID, Boolean>build().asMap();@Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) == null) {child.onNext(event);} else {request(1);}}@Overridepublic void onError(Throwable e) {child.onError(e);}@Overridepublic void onCompleted() {child.onCompleted();}};}
}

用法非常简单,整个实现(加上自定义运算符)如下:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

实际上,如果您跳过每秒的日志记录,它甚至可以更短:

es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

该解决方案比以前的基于线程池和装饰器的解决方案要短得多。 唯一尴尬的部分是自定义运算符,它在存储太多历史UUID时避免内存泄漏。 幸运的是RxJava 2得以解救!

RxJava 2.x和更强大的内置

实际上,我是从提交公关RxJava具有更强大的执行这种紧密distinct()操作。 但是在我检查2.x分支之前,它是: distinct()允许提供自定义Collection ,而不是硬编码的HashSet 。 信不信由你,依赖倒置不仅涉及Spring框架或Java EE。 当库允许您提供其内部数据结构的自定义实现时,这也是DI。 首先,我创建一个辅助方法,该方法可以构建由Map<UUID, Boolean>支持,由Cache<UUID, Boolean>支持的Set<UUID> Cache<UUID, Boolean> 。 我们一定喜欢代表团!

private Set<UUID> recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).<UUID, Boolean>build().asMap());
}

有了这种方法,我们可以使用以下表达式实现整个任务:

es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

优雅,简洁,清晰! 它看起来像是一个问题:

  • 观察事件流
  • 仅考虑不同的UUID
  • 客户分组活动
  • 为每个客户消耗(顺序)

希望您喜欢所有这些解决方案,并发现它们对您的日常工作很有用。

也可以看看:

  • 小规模流处理kata。 第1部分:线程池
  • 小规模流处理kata。 第2部分:RxJava 1.x / 2.x

翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.html

kata

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335401.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle数据库的Sequence(序列)

序列 Sequence( 掌握 ) 序列(Sequence)是一种用来生成唯一数字值的数据库对象。序列的值由Oracle程序按递增或递减顺序自动生成 , 通常用来自动生成表的主键值 , 是一种高效率获得唯一键 值的途径。

高斯背景建模 matlab,高斯背景建模整理 – 要饭的

OpenCV 中高斯背景建模相关论文BackgroundSubtractorMOG:Paper : An Improved Adaptive Background Mixture Model for Real-time Tracking with Shadow DetectionWebsite : http://personal.ee.surrey.ac.uk/Personal/R.Bowden/publications/avbs01/avbs01.pdf创新点 &#x…

apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)

apache ignite在本文中&#xff0c; “使用Apache Ignite进行高性能内存计算”一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理。 本文的一部分摘自 书 。 术语“复杂事件处理”或CEP没有广泛或高度接受的定义。 Wikipedia的以下引用可以简要描述什么是复…

python 生成器_提高你的Python: 解释‘yield’和‘Generators(生成器)’

例子&#xff1a;有趣的素数 假设你的老板让你写一个函数&#xff0c;输入参数是一个int的list&#xff0c;返回一个可以迭代的包含素数 记住&#xff0c;迭代器&#xff08;Iterable&#xff09; 只是对象每次返回特定成员的一种能力。 你肯定认为"这很简单"&#x…

数据库的存储过程

存储过程 Procedure 过程( Procedure )是一种 PL/SQL 存储程序单元 , 主要用于在数据库中完成特定的操作或者任务 , 如果在程序中经常需要执行某个操作 , 可以基于这些操作建立一个过程 ,用于简化客户端的开发和维护 , 以及提高执行性能。 在实际项目开发中&#xff0c;因为业…

tpm php,TPM系列

近来一直在整理资料&#xff0c;刚好看到有比较详细的介绍&#xff0c;就发扬一下“拿来主义”吧&#xff1a;)顺便鄙视一下某安全网站转载我的博客很乱&#xff1a;(1、安装环境准备1.1内核Linux内核2.6.12版本及以上&#xff0c;提供了对tpm芯片的支持&#xff0c;下载地址&a…

zookeeper 日志查看_每天使用的注册中心zookeeper,流量暴涨怎么办?

通过本文能学习什么&#xff1f;初步了解zookeeper监控如何运用tcpdump Wireshark抓包分析Dubbo在zookeeper上节点设计如何查看zookeeper节点快照背景zookeeper作为dubbo的注册中心&#xff0c;承载着服务的基础信息(方法名&#xff0c;分组&#xff0c;版本等)&#xff0c;服…

javaone_JavaOne 2016 Essentials:您不应该错过的7个事件和会话

javaone参加JavaOne吗&#xff1f; 确保您不会错过这些活动 又到了每年的这个时候。 旧金山一年一度的Java假期即将来临&#xff0c;全市各地举行了超过400场会议和活动。 随着所有这些丰富的内容和新体验同时发生&#xff0c;很容易就无法跟踪正在发生的事情。 在这篇文章中…

Oracle的package(包)

包 Package 包是一种比较特殊的 PL/SQL 程序 , 它并不是一个 PL/SQL 存储程序块 , 而是用于将相关的存储过程和函数组织起来 , 组成 PL/SQL 存储程序组。 包由两个独立部分组成:包头和包体。

python中可用于布尔测试的,如何在Python中使用布尔值?

Python中的布尔值是整数的子类。布尔构造函数-bool。bool类继承自int类。issubclass(bool,int) // will return Trueisinstance(True,bool) , isinstance(False,bool) //they both TrueTrue和False是单例对象。它们将在应用程序的整个生命周期内保持相同的内存地址。当您键入Tr…

python比较两张图片是否一样_opencv_判断两张图片是否相同

python金融风控评分卡模型和数据分析微专业课&#xff08;博主亲自录制视频&#xff09;&#xff1a;http://dwz.date/b9vv本文为原创,转载请注明&#xff0c;作者 231469242qq.com OpenCV介绍 OpenCV是一个基于BSD许可&#xff08;开源&#xff09;发行的跨平台计算机视觉库&a…

kata_小规模流处理kata。 第1部分:线程池

kata我再次为我的公司在GeeCON 2016上举办了编程竞赛。 这次分配需要设计并根据以下要求选择实施系统&#xff1a; 一个系统每秒发送大约一千个事件。 每个Event至少具有两个属性&#xff1a; clientId –我们期望一个客户端每秒最多可以处理几个事件 UUID –全球唯一 消耗…

Oracle数据库的trigger(触发器)

触发器 Trigger PL/SQL 程序中的触发器的结构类似于函数和过程 , 与函数和过程不同 , 触发器是在事件发生时隐式地运行的。相当于Java语言中的事件监听器

hpux oracle9,oracle 9.2.0.8在HP-UX 11.31 下的安装步骤和注意事项

在hp_unix ia64位操作系统安装oracle 数据库步骤1. 操作系统安装好后打补丁和调参数&#xff1a;(1) 打上patch(PHKL_38038)&#xff0c;安装oracle9i必须打此补丁(2) 配置好网卡IP&#xff0c;可用xmanager调用系统图形化界面(3) 按方案调整内核参数2. 数据库安装(1) 命令行创…

bat 存储过程返回值_为什么不推荐使用存储过程?

之所以有这个题目&#xff0c;我既不是故意吸引眼球&#xff0c;也不想在本文对存储过程进行教科书般论述。最近项目中遇到的存储过程问题&#xff0c;让我想起了去年在武汉出差时一位同事的发问&#xff1a;我觉得存储过程挺好用的&#xff0c;为什么你不建议用&#xff1f;当…

aws lambda使用_使用Lambda,Api Gateway和CloudFormation在AWS云上使用Java

aws lambda使用在上一篇文章中&#xff0c;我们实现了基于Java的aws lambda函数&#xff0c;并使用CloudFront进行了部署。 由于我们已经设置了lambda函数&#xff0c;因此我们将使用AWS API Gateway将其与http端点集成。 Amazon API Gateway是一项完全托管的服务&#xff0c;…

数据库的序列

序列的特性:产生连续的不同的数字值用来作为数据表的主键。 序列是数据库中的独立对象表可以用序列产生的值作为主键 , 也可以不用序列可以为一个或多个表产生主键 , 也可以不用 建议:一个序列为一个表产生主键序列这种对象在 Oracle、db2 等数据库中有 , 在 mysql、sql serve…

java mic波形识别_会议季Mic Drop:您不应该错过的13场Java演讲

java mic波形识别您的老板没有派您参加真正的会议吗&#xff1f; 我们为您准备了最好的讲座 九月份的一些重大事件闻名于世&#xff1a;秋季的第一天&#xff0c;甚至全国熏肉日。 这也是召开会议最忙的月份之一&#xff0c;一些大型Java事件涵盖了平台的新的重要更新。 在以…

oracle utl inaddr,oracle11g之ACL拙见

错误样例(使用UTL_HTTP发送http请求时&#xff0c;报出如下错误)&#xff1a; 原因&#xff1a; 1、Oracle允许使用几个PL/SQL API(UTL_TCP&#xff0c; UTL_SMTP&#xff0c; UTL_MAIL&#xff0c; UTL_HTTP和 UTL_INADDR)访问外部网络服务&#xff0c;这些API都使用TCP协议。…

Mac下载JDK/安装JDK/卸载JDK

文章目录下载JDK安装JDK配置环境变量卸载JDK下载JDK 访问这个地址&#xff1a;https://www.oracle.com/java/technologies/downloads/#java18 进入下载JDK的界面后&#xff0c;下拉界面到下图所示的位置&#xff1a; 或者访问这个地址&#xff1a;https://www.oracle.com/cn/…