flatmap_flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答

flatmap

RxJava 2.x中共有三个无缝相似的运算符: flatMap()concatMap()concatMapEager() 。 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函数。 换句话说,如果您有Flowable<T>则可以为任意R类型提供从TFlowable<R>的函数。 应用任何这些运算符后,您最终得到Flowable<R> 。 那么它们有何不同?

样例项目

首先,让我们构建一个示例应用程序。 我们将使用Retrofit2 HTTP客户端包装,该包装具有RxJava2的内置插件。 我们的任务是利用GeoNames API来查找世界上任何城市的人口。 该界面如下所示:

public interface GeoNames {Flowable<Long> populationOf(String city);}

该接口的实现由Retrofit自动生成,向下滚动以查看胶粘源代码。 暂时仅假设我们有一个函数,该函数采用具有城市名称的String并异步返回具有该城市人口的单元素流。 还要假设我们有固定的城市要查找:

Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"
);

我们的目标是获取每个城市的人口。

带有concatMap()的示例应用程序如下所示:

cities.concatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在我们看到结果之前,让我们研究一下concatMap()在做什么。 对于每个上游事件( 城市 ),它都调用一个函数,该函数用(子)流替换该事件。 在我们的情况下,它是Long的一元流( Flowable<Long> )。 因此,与所有运算符进行比较之后,我们最终得到的是Long流( Flowable<Flowable<Long>> )流。 当我们分析操作员为展平此类嵌套流所做的操作时,就会出现真正的区别。

concatMap()将首先订阅第一concatMap()流( Flowable<Long>代表华沙的人口)。 订阅实际上是指进行物理HTTP调用。 仅当第一concatMap()流完成时(在我们的情况下发出单个Long并发出完成信号), concatMap()才会继续。 继续意味着订阅第二个子流并等待其完成。 最后一个子流完成时,结果流完成。 这导致了以下信息流:1702139,2138551,7556900和3255944。因此,恰好是华沙,巴黎,伦敦和马德里的人口。 输出顺序完全可以预测。 但是,它也是完全顺序的。 完全没有并发发生,只有在第一个HTTP结束时才进行第二个HTTP调用。 RxJava所增加的复杂性根本没有回报:

23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944

如您所见,没有多线程发生,请求是顺序的,彼此等待。 从技术上讲,并非所有这些都必须在同一线程中发生,但是它们绝不会重叠并且可以利用并发性。 最大的好处是可以保证结果事件的顺序,一旦我们进入flatMap() ,这种顺序就不那么明显了……

flatMap()代码几乎完全相同:

cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

就像之前一样,我们从Long流开始( Flowable<Flowable<Long>> )。 但是, flatMap()运算符渴望一次订阅所有子流,而不是一个个地订阅每个子流。 这意味着我们看到在不同线程中同时启动多个HTTP请求:

00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551

当任何基础子流中的任何一个发出任何值时,它将立即向下游传递给订户。 这意味着我们现在可以在事件发生时即时处理事件。 请注意,结果流是乱序的。 我们收到的第一个事件是7556900,恰好是伦敦的人口,在第一流中排名第二。 与concatMap()相反, flatMap()无法保留顺序,因此以“随机”顺序发出值。 好吧,不是真正随机的,我们只是在值可用时立即接收它们。 在此特定执行中,首先是针对伦敦的HTTP响应,但绝对不能保证。 这导致一个有趣的问题。 我们有各种各样的人口价值流和最初的城市流。 但是,输出流可以是事件的任意排列,而且我们不知道哪个人口对应哪个城市。 我们将在后续文章中解决此问题。

concatMapEager()似乎两全其美:并发性和输出事件的有保证顺序:

cities.concatMapEager(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在了解了concatMap()flatMap()功能之后,了解concatMapEager()相当简单。 急切地让流concatMapEager()流( duh! )同时预订所有子流。 但是,此运算符可确保首先传播第一个子流的结果,即使它不是要完成的第一个子流也是如此。 一个示例将Swift揭示这意味着什么:

00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944

我们立即启动四个HTTP请求。 从日志输出中,我们可以清楚地看到伦敦的居民首先被返回。 但是,订户没有收到它,因为华沙尚未到来。 巧合的是,华沙排名第二,因此华沙人口可以在下游传递给订户。 不幸的是,伦敦人口必须等待更多,因为首先我们需要巴黎人口。 巴黎(紧随其后是马德里)完成后,所有剩余结果都将传递到下游。

请注意,即使人口充足,伦敦的人口也必须等待Hibernate,直到华沙和巴黎完成。 那么concatMapEager()是最好的并发运算符吗? 不完全的。 想象一下,我们有一个数千个城市的列表,每一个城市我们都获取一张1MB的图片。 使用concatMap()我们可以依次(即缓慢concatMap()下载图片。 使用flatMap()可以同时下载图片,并在图片到达时尽快进行处理。 现在, concatMapEager()呢? 在最坏的情况下,我们可以使用concatMapEager()缓存999张图片,因为来自第一个城市的图片恰好是最慢的。 即使我们已经拥有99.9%的结果,但由于我们执行严格的排序,因此我们无法对其进行处理。

使用哪个运算符?

flatMap()应该是您的首选武器。 它允许与流行为进行有效的并发。 但是要准备好接收乱序的结果。 仅当提供的转换速度如此之快,顺序处理不是问题时, concatMap()才能很好地工作。 concatMapEager()非常方便,但是要注意内存消耗。 同样在最坏的情况下,您可能最终会闲置,等待很少的响应。

附录:配置Retrofit2客户端

实际上,我们在本文中始终使用的GeoNames服务接口如下所示:

public interface GeoNames {@GET("/searchJSON")Single<SearchResult> search(@Query("q") String query,@Query("maxRows") int maxRows,@Query("style") String style,@Query("username") String username);default Flowable<Long> populationOf(String city) {return search(city, 1, "LONG", "s3cret").map(SearchResult::getGeonames).map(g -> g.get(0)).map(Geoname::getPopulation).toFlowable();}}

非默认方法的实现由Retrofit2自动生成。 请注意,为简单起见, populationOf()返回一个元素的Flowable<Long> 。 但是,要完全拥抱此API的本质,在现实世界中,其他实现将更为合理。 首先, SearchResult类返回结果的有序列表(省略getter / setter):

class SearchResult {private List<Geoname> geonames = new ArrayList<>();
}class Geoname {private double lat;private double lng;private Integer geonameId;private Long population;private String countryCode;private String name;
}

毕竟,世界上有许多华沙和伦敦 。 我们默默地假设列表将包含至少一个元素,而第一个是正确的匹配。 更合适的实现应返回所有匹配,甚至返回更好的Maybe<Long>类型以反映没有匹配项:

default Maybe<Long> populationOf(String city) {return search(city, 1, "LONG", "nurkiewicz").flattenAsFlowable(SearchResult::getGeonames).map(Geoname::getPopulation).firstElement();
}

粘合代码如下所示。 首先Jackson的设置,以便解析来自API的响应:

import com.fasterxml.jackson.databind.ObjectMapper;private ObjectMapper objectMapper() {return new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}

FAIL_ON_UNKNOWN_PROPERTIES通常是您想要的。 否则,您必须映射JSON响应中的所有字段,并且当API生产者引入新的或向后兼容的字段时,代码将中断。 然后我们设置OkHttpClient ,由Retrofit在下面使用:

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;private OkHttpClient client() {HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}

有时您可以跳过OkHttp客户端的配置,但是我们添加了日志记录拦截器。 默认情况下,OkHttp使用java.util.logging日志记录,因此为了使用体面的日志记录框架,我们必须在开始时就安装桥:

import org.slf4j.bridge.SLF4JBridgeHandler;static {SLF4JBridgeHandler.removeHandlersForRootLogger();SLF4JBridgeHandler.install();
}

最后进行自我改造:

import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;GeoNames createClient() {return new Retrofit.Builder().client(client()).baseUrl("http://api.geonames.org").addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())).addConverterFactory(JacksonConverterFactory.create(objectMapper())).build().create(GeoNames.class);
}

调用createClient()将产生GeoNames接口的动态实现。 我们使用了以下依赖项:

compile 'io.reactivex.rxjava2:rxjava:2.0.6'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'

翻译自: https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html

flatmap

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 中的点_java————形参中的点点点 | 学步园

转自:http://zhidao.baidu.com/question/149668626.htmlmain方法的签名其实可以这样写:public static void main(String... args)//方法1它也可以运行.并且,如果同时还存在public static void main(String[] args)//方法2会报已经存在重复的方法的错误.由此可见,String... args…

sendkeys.send 始终输出英文._PLC的三种输出方式,你知道有哪些吗?

电工技术维修学习网&#xff1a;www.dgjswx.com关注电工技术维修学习网官方微信公众号《电工维修学习》收获更多电工经验知识和提升实战技能电工技术&#xff0c;电气知识&#xff0c;电工基础知识&#xff0c;电工入门知识&#xff0c;电工资料&#xff0c;电工软件&#xff0…

Linux 命令之 echo -- 输出指定的字符串或者变量的值

文章目录一、命令介绍二、命令选项三、命令示例&#xff08;一&#xff09;输出变量的值&#xff08;二&#xff09;合并上下单元格内容&#xff08;三&#xff09;查看一行第一栏&#xff08;四&#xff09;查看一行的第一和第三栏&#xff08;五&#xff09;结合输出重定向符…

java 死循环排查_java应用死循环排查方法或查找程序消耗资源的线程方法(面试)...

今天遇到一个面试&#xff0c;怎么在一堆线程中查找一个死循环&#xff1f;如果遇到线上应用cpu飙升&#xff0c;并出现OutOfMemery怎么办&#xff1f;首先线上应用的jvm配置要养成良好的习惯&#xff0c;增加一下配置则可以在jvm发生 oom的时候自动dump日志了 -XX:HeapDumpOn…

jw摄像_Java命令行界面(第17部分):jw-options

jw摄像JavaWorld的文章“ 用Java处理命令行参数”&#xff1a; Matthias Laux博士关闭的案例介绍了一个简单的基于Java的库&#xff0c;用于处理命令行参数 &#xff0c;我在本文中将其称为jw-options 。 被引用的文章提供了有关为何在构造Options类时做出某些设计决策的背景信…

pixel和毫米怎么换算_压力传感器相关压力单位换算

压力传感器的应用已经很普遍了&#xff0c;压力传感器各个单间之间应该怎么换算呢&#xff0c;这个问题困扰着很多的客户&#xff0c;今天呢我们就来说一下它们之间的换算。在实际的工程应用中&#xff0c;压强单位常被当作压力单位。比较常见的压力单位包括&#xff1a;bar、K…

Linux 命令之 lsof -- 列出当前系统已打开的文件列表

文章目录命令介绍常用选项字段说明文件类型文件描述符文件状态模式锁模式参考示例&#xff08;一&#xff09;查看打开指定文件的所有进程&#xff08;二&#xff09;列出由某个 PID 对应的进程打开的所有文件&#xff08;三&#xff09;查看指定名称的进程所打开的文件列表&am…

mongodb json_在MongoDB和Spring Batch中将XML转换为JSON和原始使用

mongodb json总览 为什么将XML转换为JSON以在MongoDB中原始使用&#xff1f; 由于MongoDB使用JSON文档存储记录&#xff0c;就像表和行将记录存储在关系数据库中一样&#xff0c;我们自然需要将XML转换为JSON。 某些应用程序可能需要存储原始&#xff08;未修改的&#xff09;…

java串口发送16进制数据_MFC串口通信发送16进制数据的方法

本文实例为大家分享了MFC串口通信发送16进制数据的具体代码&#xff0c;供大家参考&#xff0c;具体内容如下MFC串口通信会使用m_mscomm控件。发送数据一般是在edit control 里输入自己想发送的内容&#xff0c;然后点击send button。如果直接发送字符串内容&#xff0c;通过下…

Linux 命令之 du -- 显示每个文件和目录的磁盘使用空间/所占用的磁盘空间大小/所使用的磁盘空间大小/查看文件和目录的大小

文章目录命令介绍常用选项参考示例&#xff08;一&#xff09;显示当前目录下的所有子目录和文件所占空间&#xff08;二&#xff09;查看指定文件所占空间的大小&#xff08;三&#xff09;查看指定目录的所占空间&#xff08;四&#xff09;查看多个文件所占空间&#xff08;…

vue 侦听器侦听对象属性_Spring中的异步和事务性事件侦听器

vue 侦听器侦听对象属性内置的事件发布功能从Spring的早期版本开始存在&#xff0c;并且对于处理同一应用程序上下文中Spring组件之间的基本通信仍然有用。 通常&#xff0c;应用程序可以生成应用程序事件&#xff08;可以是任意对象&#xff09;并监听它们。 整个机制非常简单…

mac tomcat java_Mac下配置Java开发环境(JDK1.8)和Tomcat服务器

平时做PHP,装的有nginx,mysql这儿就不多说了&#xff0c;可以看前面的相关文章&#xff0c;用的brew配置的&#xff0c;超简单。 今天介绍一下Java相关的配置 Java官网下载&#xff1a;http://www.oracle.com/technetwork/java/javase/downloads/index.html 我这儿下载的是Java…

python集合和字典的区别_Python中的字典和集合

导语&#xff1a;本文章记录了本人在学习Python基础之数据结构篇的重点知识及个人心得&#xff0c;打算入门Python的朋友们可以来一起学习并交流。 本文重点&#xff1a; 1、掌握常见的字典创建&#xff0c;查询&#xff0c;判别方法&#xff1b; 2、了解字典中的defaultdict、…

Linux 命令之 df -- 显示磁盘空间使用情况

文章目录命令介绍常用选项大小格式参考示例&#xff08;一&#xff09;查看系统磁盘设备的使用情况&#xff08;二&#xff09;显示指定文件所在分区的磁盘使用情况&#xff08;三&#xff09;显示文件类型为ext4的磁盘使用情况命令介绍 df 命令的英文全称即“Disk Free”&…

apache ignite_Apache Ignite本机持久性,简要概述

apache ignite通过将数据的工作集放入系统内存中&#xff0c;内存中方法可以达到极高的速度。 当所有数据都保存在内存中后&#xff0c;就不再需要处理使用传统旋转磁盘引起的问题。 例如&#xff0c;这意味着无需维护数据的其他缓存副本并管理它们之间的同步。 但是这种方法还…

java编程九九乘法表_如何用JAVA语言编写一个九九乘法表

教一下学了JAVA語言的同学们撰写一个99玖玖乘决表方法进行设计构思&#xff1a;假如把99玖玖乘决报表中如“1*11”算式全部当作一个一字的笔画体得话&#xff0c;99玖玖乘决表可当作一个倾斜角二五长方形形&#xff0c;进行倾斜角二五长方形形可用两个for循环系统软件嵌套循环来…

python tkinter 安装_如何为Python安装tkinter?

如果你和我一样&#xff0c;由于你在I.S中的好朋友而在你的网络上没有根目录特权&#xff0c;而且你是在本地安装中工作的&#xff0c;你可能会在上面的方法上遇到一些问题。 我在谷歌上花了很长时间--但最终&#xff0c;这很容易。 要在Linux上本地安装(我将其安装到我的主目录…

Linux 命令之 more -- 显示文本文件内容/显示文件内容/查看文件内容

文章目录一、命令介绍二、常用选项三、命令内部操作快捷键四、参考示例&#xff08;一&#xff09;显示文件file的内容&#xff0c;显示之前先清屏&#xff0c;附已显示的百分比&#xff08;二&#xff09;显示文件file的内容&#xff0c;每10行显示一次&#xff0c;而且在显示…

java jigsaw_Java 9,Jigsaw,JPMS和模块:个人探索

java jigsawJava 9由于Jigsaw项目而延迟了很多次&#xff0c;您可能会听到很多关于模块&#xff0c;模块化和其他内容的信息&#xff0c;那么&#xff0c;这到底是什么呢&#xff1f; 什么是模块化&#xff0c;模块化平台是什么意思&#xff1f; Java平台模块系统&#xff08;J…

java conf_JAVA 解析、编辑nginx.conf详解

最近工程开发遇到一个需求&#xff1a;用Java去解析并编辑nginx.conf解析nginx.conf过程可以参考该项目的README.md下面举个列子说明一下该如何编辑nginx.conf。定义一个pojoimportcom.alibaba.fastjson.JSONArray;importcom.google.common.base.Strings;importlombok.Data;Dat…