正如我们已经发现的, flatMap()
不会保留原始流的顺序。 让我们使用上一篇文章的GeoNames API示例进行说明 :
public interface GeoNames {Flowable<Long> populationOf(String city);}
通过使用flatMap()
请求多个城市的人口,我们不能保证它们会按顺序到达:
Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid");cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));
输出有些令人惊讶:
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms)
17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms)
17:09:49.956 | Rx-3 | Population: 7556900
17:09:49.958 | Rx-3 | Population: 3255944
17:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms)
17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms)
17:09:51.100 | Rx-2 | Population: 2138551
17:09:51.100 | Rx-2 | Population: 1702139
一段时间后,我们收到马德里和伦敦的回复,随后又被订户收到。 首先是7556900(伦敦人口)和3255944(马德里)。不久之后,巴黎和华沙也到达了。 一方面,很好的是,我们可以在每个人口到达时立即进行处理。 这使系统看起来响应更快。 但是我们失去了一些东西。 输入流是"Warsaw"
, "Paris"
, "London"
, "Madrid"
而结果流包含"London"
, "Madrid"
, "Paris"
, "Warsaw"
人口。 我们如何分辨哪个数字代表哪个城市?
显然,以下解决方案是完全错误的 ,但在实际的代码库中并非闻所未闻:
Flowable<Long> populations = cities.flatMap(geoNames::populationOf);
cities.zipWith(populations, Pair::of).subscribe(response -> log.info("Population: {}", response));
它编译,运行,甚至产生一些结果。 不幸的是,这些结果是完全错误的:
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Warsaw,2138551)
17:20:03.976 | Rx-2 | Population: (Paris,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Madrid,1702139)
我们将城市与人口的随机排列组合在一起。 更糟的是,在尝试了十几次之后,我设法得到了错误的结果。 由于某种原因,该程序大多数时候都在我的机器上运行。 您可以想象的最糟糕的错误。
flatMap()
的问题在于它会释放原始请求。 想象一下一个异步系统,在该系统上您收到某种队列的响应,但不知道请求是什么。 一个明显的解决方案是以某种方式将某种关联ID甚至整个请求附加到响应中。 不幸的是, populationOf(String city)
不返回原始请求( city
),仅返回响应( population
)。 如果它返回诸如CityWithPopulation
值对象或Pair<String, Long>
类的东西, CityWithPopulation
容易得多。 现在,假设我们通过附加请求( city
)来增强原始方法:
Flowable<Pair<String, Long>> populationOfCity(String city) {Flowable<Long> population = geoNames.populationOf(city);return population.map(p -> Pair.of(city, p));
}
现在,我们可以将这种方法用于更多的城市:
cities.flatMap(this::populationOfCity).subscribe(response -> log.info("Population: {}", response));
…或避免使用额外的辅助方法:
cities.flatMap(city -> geoNames.populationOf(city).map(p -> Pair.of(city, p))).subscribe(response -> log.info("Population: {}", response));
这次的result
变量是Pair<String, Long>
但是建议您使用更具表现力的value对象。
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Paris,2138551)
17:20:03.976 | Rx-2 | Population: (Madrid,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Warsaw,1702139)
我发现带有嵌套map()
flatMap()
添加了额外的上下文,这是处理乱序结果的最有效方法。 当然,它不是最易读的反应式代码,因此请确保将这种复杂性隐藏在某些外观之下。
更新
正如DávidKarnok在对本文的评论中指出的那样, flatMap()
内的map()
运算符是一种常见习语,以至于存在专门的flatMap()
重载。 除了标准转换函数(在我们的示例中为(String, Long) -> SomeType
String -> Flowable<Long>
)之外,它还采用了组合器双功能(例如(String, Long) -> SomeType
)。 此功能的目的是提供一种将输入项与通过转换生成的每个输出项组合在一起的转换。 这正是我们对嵌套map()
所做的工作(将人口增加了对应的城市名称),但要短得多:
Flowable<Pair<String, Long>> populations = cities.flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop));
第二个lambda表达式( (city, pop) -> Pair.of(city, pop)
)是对populationOf()
产生的每个下游事件执行的。 如果极端,可以使用方法引用:
Flowable<Pair<String, Long>> populations = cities.flatMap(geoNames::populationOf, Pair::of);
花一点时间研究最后一个示例,一旦您掌握了它,它实际上就非常简单:
- 为每个
city
找到人口pop
- 对于每个人口,通过形成
Pair<String, Long>
将其与city
结合起来
PS:这是9年中的第200个帖子!
翻译自: https://www.javacodegeeks.com/2017/08/flatmap-order-events-rxjava-faq.html