flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...

flowable背压 取消

RxJava缺少创建无限自然数流的工厂。 这样的流很有用,例如,当您想通过压缩两个事件的顺序来为可能的无限事件流分配唯一的序列号时:

Flowable<Long> naturalNumbers = //???Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(naturalNumbers,someInfiniteEventStream,Pair::of
);

实现naturalNumbers令人惊讶地复杂。 在RxJava 1.x中,您可以短暂地放弃不遵守反压的Observable

import rx.Observable;  //RxJava 1.xObservable<Long> naturalNumbers = Observable.create(subscriber -> {long state = 0;//poor solution :-(while (!subscriber.isUnsubscribed()) {subscriber.onNext(state++);}
});

这样的流没有背压是什么意思? 好吧,基本上,流可以轻松地以CPU内核允许的速度生成事件(不断增加的state变量),每秒数百万。 但是,当使用者无法如此Swift地使用事件时,未处理事件的积压开始出现:

naturalNumbers
//      .observeOn(Schedulers.io()).subscribe(x -> {//slooow, 1 millisecond});

上面的程序(带有observeOn()运算符的注释掉)可以正常运行,因为它具有意外的反压。 默认情况下,所有内容在RxJava中都是单线程的,因此生产者和使用者在同一个线程中工作。 实际上,调用subscriber.onNext()会阻止,因此while循环会自动对其进行限制。 但是,尝试取消注释observeOn() ,灾难会在几毫秒后发生。 订阅回调在设计上是单线程的。 对于每个元素,它至少需要1毫秒,因此该流每秒可以处理不超过1000个事件。 我们有些幸运。 RxJava快速发现这种灾难性状况,并因MissingBackpressureException而快速失败

我们最大的错误是生产事件,而没有考虑消费者的速度。 顺便说一下,这是响应流背后的核心思想:不允许生产者发出比消费者请求更多的事件。 在RxJava 1.x中,即使实现最简单的流(从头开始考虑背压)也不是一件容易的事。 RxJava 2.x带来了几个方便的运算符,这些运算符建立在以前版本的经验基础之上。 首先RxJava 2.x时不允许你实现Flowable (背压-aware)的相同的方式,你可以与Observable 。 创建Flowable会使消费者使消息过载是不可能的:

Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {long state = 0;while (!subscriber.isCancelled()) {subscriber.onNext(state++);}
}, BackpressureStrategy.DROP);

您是否发现了这个额外的DROP参数? 在解释之前,让我们看一下使用慢速用户订阅时的输出:

0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...

你的旅费可能会改变。 怎么了? observeOn()运算符在调度程序(线程池)之间切换。 从未决事件队列中合并的线程池。 该队列是有限的,容量为128个元素。 知道此限制的observeOn()运算符仅从上游请求128个元素(我们的自定义Flowable )。 此时,它使我们的订户可以处理事件,每毫秒1次。 因此,大约100毫秒后, observeOn()发现其内部队列几乎为空,并要求更多。 会得到128、129、130…吗? 没有! 我们的Flowable在这0.1秒内产生了疯狂的事件,并且(令人惊讶地)在该时间段内产生了超过1亿个数字。 他们去哪了 好吧, observeOn()并没有要求它们,因此DROP策略(强制性参数)只是丢弃了不需要的事件。

BackpressureStrategy

听起来不对,还有其他策略吗? 是的,很多:

  • BackpressureStrategy.BUFFER :如果上游产生太多事件,则会将它们缓冲在无界队列中。 没有任何事件丢失,但是您的整个应用程序很可能会丢失。 如果幸运的话, OutOfMemoryError将拯救您。 我被困在5秒以上的长时间GC暂停中。
  • BackpressureStrategy.ERROR :如果发现事件的过度产生,将抛出MissingBackpressureException 。 这是一个理智(且安全)的策略。
  • BackpressureStrategy.LATEST :类似于DROP ,但是记住上次删除的事件。 以防万一需要更多数据,但我们只是删除了所有内容–至少具有最后看到的价值。
  • BackpressureStrategy.MISSING :没有安全措施,请加以处理。 下游运算符之一(如observeOn() )最有可能抛出MissingBackpressureException
  • BackpressureStrategy.DROP :删除未请求的事件。

顺便说一句,当您将Observable变为Flowable还必须提供BackpressureStrategy 。 RxJava必须知道如何限制过量产生的Observable 。 好的,那么简单的序列自然数流的正确实现是什么?

认识

create()generate()之间的区别在于责任。 假设Flowable.create()会在不考虑背压的情况下完整地生成流。 它只是在需要时才产生事件。 另一方面,仅允许Flowable.generate()一次生成一个事件(或完成流)。 背压机制透明地计算出当前需要多少个事件。 generate()调用适当的次数,例如,在observeOn()情况下, observeOn() 128次。

由于此运算符一次生成一个事件,因此通常需要某种状态来确定上次出现的时间1 。 这就是generate()含义:(im)可变状态的持有者和一个基于该状态生成下一个事件的函数:

Flowable<Long> naturalNumbers =Flowable.generate(() -> 0L, (state, emitter) -> {emitter.onNext(state);return state + 1;});

generate()的第一个参数是初始状态(工厂),在本例中为0L 。 现在,每当订户或任何下游运营商请求一些事件时,都会调用lambda表达式。 它的责任是根据提供的状态以某种方式最多调用一次onNext() (最多发出一个事件)。 首次调用lambda时, state等于初始值0L 。 但是,我们可以修改状态并返回其新值。 在此示例中,我们增加了long以便后续lambda表达式的调用接收到state = 1L 。 显然,这种情况不断发生,产生连续的自然数。

这样的编程模型显然比while循环难。 它还从根本上改变了实现事件源的方式。 与其在任何时候都想推送事件,不如只是被动地等待请求。 下游运营商和订户正在从您的流中提取数据。 这种转变可在管道的所有级别上产生背压。

generate()有一些风格。 首先,如果您的状态是可变对象,则可以使用不需要返回新状态值的重载版本。 尽管功能较少,但可变状态往往会产生较少的垃圾。 这假设您的状态不断变化,并且每次都传递相同的状态对象实例。 例如,您可以轻松地将Iterator (也是基于pull的!)变成具有反压奇观的流:

Iterator<Integer> iter = //...Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {if (iterator.hasNext()) {emitter.onNext(iterator.next().toString());} else {emitter.onComplete();}
});

请注意,流的类型( <String> )不必与状态类型( Iterator<Integer> )相同。 当然,如果您有Java Collection并想将其转换为流,则不必先创建迭代器。 使用Flowable.fromIterable()足够了。 甚至更简单的generate()版本都假定您根本没有任何状态。 例如随机数流:

Flowable<Double> randoms = Flowable.generate(emitter -> emitter.onNext(Math.random()));

但老实说,您可能最终将需要一个Random实例:

Flowable.generate(Random::new, (random, emitter) -> {emitter.onNext(random.nextBoolean());
});

摘要

如您所见,RxJava 1.x中的Observable.create()和Flowable.create Flowable.create()有一些缺点。 如果您真的在乎大量并发系统的可伸缩性和运行状况(否则您将不会阅读本文!),则必须了解背压。 如果您真的需要从头开始创建流,而不是使用from*()系列方法或执行繁重工作的各种库,请熟悉generate() 。 本质上,您必须学习如何将某些类型的数据源建模为奇特的迭代器。 可能会有更多文章解释如何实现更多现实生活流。

这类似于无状态HTTP协议,该协议在服务器上使用称为会话*的小块状态来跟踪过去的请求。

翻译自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html

flowable背压 取消

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

abstract类_【Java】类的结构 - Nemo

类与对象类中主要包括五种结构&#xff0c;下面进行对这五种结构进行详细的介绍。1. 面向对象与面向过程面向过程&#xff1a;强调的是功能行为&#xff0c;以函数为最小单位&#xff0c;考虑怎么做。面向对象&#xff1a;强调具备了功能的对象&#xff0c;以类/对象为最小单位…

二元函数可微与可导的关系_多元函数中可微与可导的直观区别是什么?

在多元的情况下&#xff0c;可微可导的关系要比在一元情况下复杂&#xff0c;但是只是要复杂一些&#xff0c;如果我们从一元开始去理解&#xff0c;你会发现并不困难。这篇文章主要阐述以下三个概念&#xff1a;偏微分偏导数全微分全导数这里暂时不讲&#xff0c;看名字好像和…

java 错误输入异常_在Java中进行输入验证期间用错误通知替换异常

java 错误输入异常在我以前的文章中&#xff0c;我写了一篇关于输入验证设计的文章 &#xff0c;该设计取代了难以维护和测试的 if-else块。 但是&#xff0c;正如某些读者指出的那样&#xff0c;它有一个缺点–如果输入数据有多个验证错误&#xff0c;则用户将不得不多次提交请…

Linux 命令之 mkdir 命令-创建目录

文章目录介绍语法格式常用选项参考示例介绍 mkdir 命令是“make directories”的缩写&#xff0c;用来创建目录。如果在目录名的前面没有加任何路径名&#xff0c;则在当前目录下创建目录&#xff1b;如果给出了一个已经存在的路径&#xff0c;将会在该目录下创建一个指定的目…

uniapp app蓝牙打印_给编程器加装蓝牙串口模块,用手机APP操作打印信息进控制台...

公众号回复【编程器】可下载蓝牙串口APP专用蓝牙串口模块购买&#xff1a;https://item.taobao.com/item.htm?id619731291566编程器加装蓝牙模块后&#xff0c;可以方便的使用手机查看打印信息&#xff0c;进入控制台执行各种串口命令。本款蓝牙串口APP具有保存打印信息、分享…

Linux 命令之 rm -- 删除文件和目录

文章目录一、命令介绍二、语法格式三、参考选项四、参考示例&#xff08;一&#xff09;删除当前目录下有内容的子目录&#xff08;二&#xff09;删除当前目录下多个含有内容的子目录&#xff08;三&#xff09;删除当前目录下以特定关键字开头的文件&#xff08;四&#xff0…

mysql报表占容量_MariaDB(MySQL)修改表结构报表空间满

今天数据库表修改表结构&#xff0c;需要添加一列&#xff1a;ALTER TABLE xxxx_learn ADD COLUMN learn_stage_code VARCHAR(32) NULL DEFAULT 99 COMMENT 学段 AFTER qualified_rate;结果一直报错&#xff1a;ERROR 1114 (HY000) at line 303: The table xxxx_learn is full刚…

对话框 函数_通过函数式编程实现动态对话框处理程序

对话框 函数在我以前的文章中&#xff0c;我提到了一个常见的用例&#xff0c;当我们需要以编程方式检查当前事务是否脏了&#xff0c;并在做某件事之前通知用户有关该事务的信息。 就像“您尚未保存的更改将丢失&#xff0c;您要继续吗&#xff1f;”。 假设我们需要在应用程…

Linux 命令之 yum -- 基于 RPM 的软件包管理器

文章目录一、命令介绍二、选项参数选项参数三、配置文件四、参考示例&#xff08;一&#xff09;安装、升级和删除包安装指定的软件包强制重新安装本地安装指定软件包本地更新指定软件包安装 yum 服务器中的所有可安装的软件安装程序组&#xff08;软件组&#xff09;安装 yum …

adb 最大连接_手机触屏失效的抢救办法,以及如何利用adb实现PC与手机交互

手机进水或者摔坏屏幕导致触屏失效的时候&#xff0c;一般情况下&#xff0c;要么选择换屏&#xff0c;要么选择把手机扔掉。但其实如果有一根OTG线&#xff0c;手机中的资料还可以安全备份出来&#xff0c;或者还可以再利用起来&#xff0c;发挥一下余热做点别的事。像上图一样…

proxy跨域不生效_前端开发:深入使用proxy代理解决跨域问题

在前端领域里面&#xff0c;跨域指的是浏览器允许向服务器发送跨域请求&#xff0c;进而克服Ajax只能同源使用的局限性限制。同源策略是一种约定&#xff0c;而且是浏览器中最基本也是最核心的安全功能&#xff0c;若缺少了该策略&#xff0c;浏览器非常容易被***&#xff1b;同…

java 静态缓存示例_Java 9 JShell示例:集合静态工厂方法

java 静态缓存示例这篇文章继续从My My Java 9 Features博客文章中探索Java9功能。 在这里&#xff0c;我们在List&#xff0c;Set和Map接口中试验Java9 Collections静态工厂方法。 集合静态工厂方法 Java9使用其新的静态工厂方法使创建不可变列表变得更加容易 有12种Set.of和…

mysql在计算机管理中的路径怎么修改_称重软件中的数据修改怎么知晓?

称重软件称重软件应客户需求&#xff0c;数据允许修改&#xff0c;但不允许删除只能作废。如果数据已修改&#xff0c;该如何知晓该数据是修改过的呢&#xff0c;这就用到了标记。用户修改数据时为保证数据的可追溯性&#xff0c;同样在数据安全方面也有相应的要求&#xff0c;…

pip安装mysql模块_使用pip安装mysql模块for python

我正在尝试使用pip安装mysql模块for python&#xff0c;但遇到了一个错误&#xff1a;mysqlclient.lib(typelib.obj) : error LNK2001: unresolved external symbol __iob_funcmysqlclient.lib(viosslfactories.obj) : error LNK2001: unresolved external symbol __iob_funcmy…

Linux 命令之 apt-get -- APT 软件包管理工具

文章目录 一、命令介绍二、语法格式三、相关文件及目录四、常用命令(一)下载、安装、升级和删除软件包(二)查询和检验软件包(三)执行其它功能五、常用选项(一)安装、升级和删除软件包(二)查询和检验软件包(三)执行其它功能六、参考示例(一)下载、安装、升级和删除…

spring health_为什么Spring的Health会再次向下,向下,向上,向上,向上和向下?...

spring health为什么 我们新JavaScript客户端应用程序会定期调用Grails后端的/health端点&#xff0c;以确定离线状态。 事情开始变得“​​有趣”。 我们免费获得该端点&#xff0c;因为Grails基于Spring Boot&#xff0c;而Spring Boot带有一个名为Spring Boot Actuator的子…

使用JDBC连接数据库(MySQL)的源代码

文章目录JDBC 访问数据库的步骤使用 JDBC 访问数据库的演示代码使用 PreparedStatement 对象查询插入更新删除使用 Statement 对象查询删除JDBC 访问数据库的步骤 将 jdbc 驱劢程序相关的 jar 包 copy 到 WEB-INF/lib 下在 servlet 代码当中&#xff0c;使用 jdbc 访问数据库&…

popupwindow 不抢夺焦点_央视专访“上个厕所就要3000块”的亲历者, 被“坑”的不愉快经历...

资讯 聚焦 活动 宣传 推广 品牌 热文 找小编合作加个人微信2871001801百度百科&#xff1a;宁河于雍正九年(1731年)从宝坻县分出,据《河北省县名考原》称:“蓟运河纵贯县境,时多水患,故县以宁河名”!当然还有另外别的解释!民国三年(1914年)属直隶省津海道,民国十七年(1928年…

gitlab10.x迁移_1.x到2.x的迁移:可观察与可观察:RxJava FAQ

gitlab10.x迁移标题不是错误。 rx.Observable 1.x的io.reactivex.Observable与2.x的io.reactivex.Observable完全不同。 盲目升级rx依赖关系并重命名项目中的所有导入将进行编译&#xff08;稍作更改&#xff09;&#xff0c;但不能保证相同的行为。 在项目的早期&#xff0c; …

ubuntu安装与配置mysql_ubuntu下mysql的安装与配置

安装MySQL sudo apt-get install mysql-server 这个应该很简单了&#xff0c;而且我觉得大家在安装方面也没什么太大题目&#xff0c;所以也就未几说了&#xff0c;下面我们来讲讲配置。 配置MySQL 留意&#xff0c;在Ubuntu下MySQL缺省是只安装MySQLsudo apt-get install mysq…