技术停滞_检测和测试停滞的流– RxJava常见问题解答

技术停滞

假设您有一个流以不可预测的频率发布事件。 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件。 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题。 静默时间过长(停顿)可以解释为网络问题。 因此,我们经常不时发送人造事件( ping ),以确保:

  • 客户还活着
  • 让客户知道我们还活着

举一个更具体的例子,假设我们有一个Flowable<String>流,它会产生一些事件。 如果没有事件超过一秒钟,我们应该发送一个占位符"PING"消息。 当静默时间更长时,应该每秒发出一个"PING"消息。 我们如何在RxJava中实现这样的要求? 最明显但不正确的解决方案是将原始流与ping合并:

Flowable<String> events = //...
Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith()运算符至关重要:它接受真正的events ,并将它们与恒定的ping流合并。 当然,当没有真实的事件出现时,将出现"PING"消息。 不幸的是,它们与原始流完全无关。 这意味着即使有很多正常事件,我们也会继续发送ping命令。 而且,当静默开始时,我们不会在一秒钟后精确发送"PING" 。 如果您对这种机制感到满意,则可以在此处停止阅读。

一种更复杂的方法需要发现持续超过1秒的静音。 我们可以使用timeout()运算符。 不幸的是,它会产生TimeoutException并从上游退订-行为过于激进。 我们只想收到某种通知。 事实证明,可以使用debounce()运算符。 通常,此操作员会推迟新事件的发出,以防万一有新事件出现,从而覆盖了旧事件。 所以,如果我说:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

这意味着delayed流仅在1秒内跟随其他事件时才会发出事件。 如果events流使产生事件的速度足够快,从技术上讲, delayed可能永远不会发出任何东西。 我们将使用delayed流通过以下方式发现沉默:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

请记住, mergeWith()和它的static merge()对应项之间没有区别。 所以我们到了某个地方。 如果流繁忙,则delayed流将永远不会收到任何事件,因此不会发送"PING"消息。 但是,当原始流不发送任何事件超过1秒时, delayed接收到最后看到的事件,将其忽略并转换为"PING" 。 聪明,但坏了。 此实现仅在发现停顿后才发送一个"PING" ,而不是每秒发送一次定期ping。 很容易修复! 除了将最后一次看到的事件转换为单个"PING"我们还可以将其转换为周期性ping序列:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING"));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

您能看到缺陷在哪里吗? 每当原始流中出现一点沉默时,我们就会每秒发出一次ping 。 但是,一旦出现一些真正的事件,我们应该停止这样做。 我们没有。 上游的每个停顿都会导致新的无限ping流出现在最终的合并流中。 我们必须以某种方式告诉pings流,因为原始流发出了真正的事件,所以它应该停止发出ping 。 猜猜是什么,有takeUntil()运算符可以做到这一点!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

花一点时间完全掌握上面的代码片段。 每当原始流上超过1秒没有任何React时, delayed流就会发出一个事件。 pings流发射的序列"PING"每秒从发射每个事件的事件delayed 。 但是,一旦事件出现在events流上,便会终止pings流。 您甚至可以将所有这些定义为单个表达式:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));

可测性

好的,我们已经编写了所有这些内容,但是我们应该如何测试事件驱动代码的这个三层嵌套的Blob? 我们如何确保ping在正确的时间出现并在静默结束时停止? 如何模拟各种与时间相关的场景? RxJava具有许多杀手级功能,但是测试时间流逝可能是最大的功能。 首先,让我们的ping代码更具可测试性和通用性:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}

此实用程序方法采用任意的T流并添加ping ,以防该流在较长时间内不产生任何事件。 我们在测试中像这样使用它:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

哦,男孩, PublishProcessorTestSchedulerPublishProcessor是一个有趣的类,它是一个亚型Flowable (所以我们可以使用它作为一个普通的流)。 另一方面,我们可以使用其onNext()方法强制发出事件:

events.onNext("A");

如果有人收听events流,他将立即收到"A"事件。 这clock是怎么回事? RxJava中以任何方式处理时间的每个运算符(例如debounce debounce()interval()timeout()window() )都可以采用可选的Scheduler参数。 它充当时间的外部来源。 特殊的TestScheduler是我们完全控制的人为的时间来源。 也就是说,只要我们不显式调用advanceTimeBy()时间就保持静止:

clock.advanceTimeBy(999, MILLISECONDS);

999毫秒不是巧合。 Ping在1秒钟后开始精确显示,因此在999毫秒后将不可见。 现在是时候揭示完整的测试用例了:

@Test
public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

看起来像一堵墙,但这实际上是我们逻辑的完整测试方案。 它可以确保ping在1000毫秒后精确显示,当沉默很长时重复一次,而在真正事件出现时则重复很短。 但最重要的部分是:该测试是100%可预测的并且非常快。 没有Awaitility ,忙等待,轮询,间歇性测试失败和缓慢。 我们完全控制的人工时钟可确保所有这些组合流均按预期工作。

翻译自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html

技术停滞

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334474.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小波滤波器与其他滤波器的区别_滤波器国产 VS 国外

一、滤波器技术及产品类型详细分析几次技术应用的潮流引领了声波射频滤波器技术的发展&#xff0c;而SAW滤波器可以说是军用转民用的技术典范。早期国内的SAW滤波器仅用于国防如雷达、通信等方面需求&#xff1b;而该技术的第一次民用&#xff0c;即用于彩电产业&#xff0c;带…

java stopself_然后,即使我停止了服务,Context.startForegroundService()也没有调用Service.startForeground()...

所以我的应用程序有一些触发服务和通知的远程操作 . 在调用 startForegroundService 和服务尝试启动通知的时间之间&#xff0c;事情可能会发生变化&#xff0c;因此服务会再次检查事物的状态&#xff0c;然后决定要做什么 .因此&#xff0c;如果我的服务决定它不需要运行&…

Unix/Linux/BSD 它们之间的关系以及各自派系的介绍

文章目录一、Unix 是什么二、Unix 派系&#xff08;一&#xff09;BSD 分支1.BSD 是什么2.由 BSD 衍生的闭源 Unix 版本3.由 BSD 衍生的开源 Unix 版本&#xff08;二&#xff09;System V 分支1.System V 是什么2.System V 与 BSD 的版权纠纷&#xff0c;以及 UnixWare 的由来…

arraylist获取前多少位_Java 面试题 :百度前 200 页都在这里

作者&#xff1a;唐尤华&#xff0c;基本概念操作系统中 heap 和 stack 的区别什么是基于注解的切面实现什么是 对象/关系 映射集成模块什么是 Java 的反射机制什么是 ACIDBS与CS的联系与区别Cookie 和 Session的区别fail-fast 与 fail-safe 机制有什么区别get 和 post请求的区…

航空订票系统界面java_Java命令行界面(第21部分):航空公司2

航空订票系统界面java本系列文章的第21篇关于Java中解析命令行参数的文章的重点是Airline 2库。 Airline 2的GitHub项目页面描述了该库&#xff0c;“ Airline是一个Java库&#xff0c;提供了基于注释的框架来解析命令行界面。” 该页面进入状态&#xff1a;航空公司“同时支持…

js map遍历 修改对象里面的值_js中那些方法不改变原来的数组对象

一、map方法 [javascript] view plain copy function fuzzyPlural(single) {var result = single.replace(/o/g, e); //replace也不会修改原来数组的值,这里打印[foot,goose,moose,kangaroo] //alert(single); //下面为新的数组添加了一个元素,但是我们看到在map的时候没有对…

QPW 公告表(tf_notice)

公告表 CREATE TABLE tf_notice (notice_id int(11) NOT NULL AUTO_INCREMENT COMMENT 公告ID,title varchar(300) DEFAULT NULL COMMENT 标题,content varchar(2000) DEFAULT NULL COMMENT 内容,company_id bigint(20) DEFAULT NULL COMMENT 企业ID, # 删掉appraise_id bigin…

python爬虫数据_python爬取数据分析

一.python爬虫使用的模块 1.import requests 2.from bs4 import BeautifulSoup 3.pandas 数据分析高级接口模块 二. 爬取数据在第一个请求中时, 使用BeautifulSoupimport requests # 引用requests库 from bs4 import BeautifulSoup # 引用BeautifulSoup库 res_movies requests…

java 迁移数据_Java 9迁移指南:七个最常见的挑战

java 迁移数据我敢肯定&#xff0c;您已经听说过更新到Java 9并不是一件容易的事&#xff0c;甚至可能是不兼容的更新&#xff0c;而且对于大型代码库而言&#xff0c;迁移毫无意义。 这样做之后&#xff0c;我迁移了一个相当大的旧代码库&#xff0c;我可以告诉你&#xff0c;…

markdown java 代码高亮_MarkdownPad2使用代码高亮插件

MarkdownPad 2有插入代码块的功能&#xff0c;但样式却不尽人意&#xff0c;但又不想换个编辑器&#xff0c;找了挺多相关资料&#xff0c;最后在MarkdownPad 2集成prettify高亮插件。如下相关资料&#xff1a;你可以下载后引用&#xff0c;也可以直接引用bootcdn。具体步骤&am…

QPW 点评阅读日志表(tf_appraise_read_log)

点评阅读日志表 CREATE TABLE tf_appraise_read_log (read_log_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 日志ID,appraise_id bigint(20) DEFAULT NULL COMMENT 点评ID,user_id bigint(20) DEFAULT NULL COMMENT 用户ID,duration int(11) DEFAULT NULL COMMENT 阅读时长…

需求调研 现有系统梳理_对速度的需求,访问现有数据的速度提高了1000倍

需求调研 现有系统梳理了解如何通过使用标准Java 8流和Speedment的In-JVM-Memory加速器将分析数据库应用程序加速1000倍。 Web和移动应用程序有时会很慢&#xff0c;因为后备数据库很慢和/或与数据库的连接施加了延迟。 现代的UI和交互式应用程序需要快速后端&#xff0c;并且…

order by 影响效率么_提升开发效率N倍的20+命令行神器

图 by&#xff1a;石头青海湖关于作者&#xff1a;程序猿石头(ID: tangleithu)&#xff0c;现任阿里巴巴技术专家&#xff0c;清华学渣&#xff0c;前大疆后端 Leader。以每篇文章都让人有收获为目的&#xff0c;欢迎关注&#xff0c;交流和指导&#xff01;背景本文主要来源于…

超级酒吧女生Java下载_超真实分享:一个人去酒吧的经验+注意事项

原标题&#xff1a;超真实分享&#xff1a;一个人去酒吧的经验&#xff0b;注意事项今天想跟大家分享我自己一个很奇妙也很特别的经验&#xff0c;就是关于我自己一个人去酒吧喝酒的故事&#xff5e;对这个故事有好奇心的女孩欢迎跟我一起聊聊&#xff5e;我的个性很独立而且蛮…

QPW 点评点赞日志表(tf_appraise_praise)

点评点赞日志表 CREATE TABLE tf_appraise_praise (praise_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 点赞ID,user_id bigint(20) DEFAULT NULL COMMENT 用户ID,appraise_id bigint(20) DEFAULT NULL COMMENT 点评ID,status smallint(6) DEFAULT NULL COMMENT 状态, # 这…

java parse_Java命令行界面(第9部分):parse-cmd

java parseparse-cmd库由单个类ParseCmd.java组成&#xff0c;该类是“用于在Java应用程序中定义和解析命令行参数的Java类。” 该库托管在Google Code存档上 &#xff0c;因此可以随时关闭 。 该JAR似乎也可以在GitHub上使用 。 这篇文章介绍了如何使用parse-cmd 0.0.93处理Ja…

mysql 聚簇索引和非聚簇索引_MySQL学习之——索引

转自&#xff1a;CSDNMySQL是目前绝大多数互联网公司使用的关系型数据库&#xff0c;它性能出色、资源丰富、成本低廉&#xff0c;是快速搭建互联网应用的首选关系型数据库。但是&#xff0c;俗话说&#xff0c;“好马配好鞍”&#xff0c;仅仅会使用MySQL是不够的&#xff0c;…

QPW 邀请日志表(tf_invite_log)

邀请日志表 被邀请者注册成功后&#xff0c;往这张表插入一条记录&#xff0c;记录在何时谁邀请了谁。 CREATE TABLE tf_invite_log (invite_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 邀请ID,user_id bigint(20) DEFAULT NULL COMMENT 用户ID, # 邀请用户IDinvited_us…

java登录界面命令_Java命令行界面(第10部分):picocli

java登录界面命令picocli主页面将picocli描述为“强大的微小命令行界面”&#xff0c;“ picocli”是一个单文件Java框架&#xff0c;用于解析命令行参数并生成精美&#xff0c;易于定制的用法帮助消息。 有颜色。” 这篇文章简要概述了如何使用Picocli 0.9.7处理Java代码中的命…

navicat运行sql文件慢_SQL进阶之路——入门

一、初步了解SQL数据库&#xff1a;用来存放数据关系数据库&#xff1a;1.含有多张表 2.各表之间有关系关系数据库中表的样式&#xff1a;a.每个表1个表名 b.每个表中包含记录列名的列和记录数据的行 c.利用主键用来标识数据的唯一性关系数据库中每个表之间如何建立联系&#x…