rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件

rxjava 循环发送事件

Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter ,它是对Spring MVC控制器中容易使用的发送事件的抽象。 SSE是一项技术,可让您在一个HTTP连接内沿一个方向将数据从服务器流式传输到浏览器。 听起来像是websocket可以做什么的子集。 但是,由于它是一个简单得多的协议,因此可以在不需要全双工的情况下使用,例如实时推动股价变化或显示长时间运行的进程。 这将是我们的例子。

假设我们有一个具有以下API的虚拟硬币矿工:

public interface CoinMiner {BigDecimal mine() {//...}
}

每次调用mine()我们都必须等待几秒钟,才能获得大约1个硬币的回报(平均)。 如果要挖掘多个硬币,则必须多次调用此方法:

@RestController
public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}

这项工作,我们可以请求/mine/10mine()方法将执行10次。 到目前为止,一切都很好。 但是挖掘是一项占用大量CPU的任务,将计算分散到多个内核将是有益的。 此外,即使使用并行化,我们的API端点也相当慢,我们必须耐心等待直到所有工作完成而没有任何进度通知。 让我们首先修复并行性–但是,由于并行流无法控制底层线程池,因此我们来使用显式的ExecutorService

@Component
class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}

客户端代码必须显式提供ExecutorService (只是设计选择):

@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join);
}

首先多次调用mineAsync ,然后(作为第二阶段)等待所有期货完成并join ,这mineAsync重要。 很容易写:

IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);

但是,由于Java 8中流的惰性,该任务将按顺序执行! 如果您还不习惯流的懒惰,请始终从下至上阅读它们:我们要求join一些将来的内容,以便流上升并只调用一次mineAsync() (惰性!),并将其传递给join() 。 当join()完成时,它再次上升并要求另一个Future 。 通过使用collect()我们强制所有mineAsync()执行,开始所有异步计算。 稍后,我们等待每一个。

介绍

现在该变得更具React性了(我说过了)。 控制器可以返回SseEmitter的实例。 从处理程序方法return后,容器线程将被释放并可以处理更多即将到来的请求。 但是连接没有关闭,客户端一直在等待! 我们应该做的是保留对SseEmitter实例的引用,并在以后从另一个线程调用其send()complete方法。 例如,我们可以启动一个长时间运行的进程,并保持send()从任意线程进行进度。 完成该过程后,我们complete() SseEmitter ,最后关闭HTTP连接(至少从逻辑SseEmitter ,请记住Keep-alive )。 在下面的示例中,我们有一堆CompletableFuture ,当每个CompletableFuture完成时,我们只需将1发送给客户端( notifyProgress() )。 当所有期货都完成后,我们完成流( thenRun(sseEmitter::complete) ),关闭连接:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);}
}private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());
}

调用此方法将产生以下响应(注意Content-Type ):

< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< 
data:1data:1data:1data:1* Connection #0 to host localhost left intact

稍后我们将学习如何在客户端解释这种响应。 现在暂时让我们整理一下设计。

与引进RxJava

上面的代码有效,但是看起来很混乱。 我们实际上有一系列事件,每个事件都代表计算的进度。 计算最终完成,因此流也应发出信号结束。 听起来就像是Observable ! 我们从重构CoinMiner开始,以返回Observable<BigDecimal

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject;
}

每当mineMany()返回的事件出现在Observable ,我们就mineMany()那么多硬币。 当所有期货都完成后,我们也完成了交易。 在实现方面,这看起来还没有改善,但是从控制器的角度来看,它有多干净:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}

调用coinMiner.mineMany()我们只需订阅事件。 事实证明ObservableSseEmitter方法匹配1:1。 这里发生的事情是不言自明的:启动异步计算,每当后台计算发出任何进度信号时,将其转发给客户端。 好的,让我们回到实现上。 由于我们将CompletableFutureObservable混合使用,因此看起来很混乱。 我已经描述了如何仅使用一个元素将CompletableFuture转换为Observable 。 这是一个概述,包括rx.Single从RxJava 1.0.13开始发现的rx.Single抽象(此处未使用):

public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}

将这些实用程序运算符放在某个地方,我们可以改善实现并避免混合使用两个API:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables);
}Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future);
}

RxJava有一个内置的运算符,用于将多个Observable合并为一个,我们的每个基础Observable发出一个事件。

深入研究RxJava运算符

让我们使用RxJava的功能来稍微改善流式传输。

scan()

当前,每次我们开采一枚硬币时,我们都会send(1)客户端send(1)事件。 这意味着每个客户都必须跟踪其已经收到的硬币数量,以便计算总的计算数量。 如果服务器总是发送总金额而不是增量,那就太好了。 但是,我们不想更改实现。 事实证明,使用Observable.scan()运算符非常简单:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();}
}

scan()运算符接收上一个事件和当前事件,并将它们组合在一起。 通过应用BigDecimal::add我们只需将所有数字相加即可。 例如1、1 + 1,(1 + 1)+ 1,依此类推。 scan()类似于flatMap() ,但保留中间值。

sample()采样

可能是因为我们的后端服务产生了太多的进度更新,我们无法使用。 我们不想给客户端增加不相关的更新并饱和带宽。 每秒最多发送两次更新听起来很合理。 幸运的是,RxJava也有一个内置的运算符:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);

sample()将定期查看底层流,并仅发出最新的项,并丢弃中间项。 幸运的是,我们使用scan()即时聚合了项目,因此我们不会丢失任何更新。

window() –恒定的发射间隔

不过有一个陷阱。 如果在选定的500毫秒内没有新内容出现, sample()将不会两次发出相同的项目。 很好,但是请记住我们正在通过TCP / IP连接推送这些更新。 最好定期将更新发送给客户端,即使在此期间什么也没发生–只是为了保持连接的正常运行,就像ping 。 可能有多种方法可以实现此要求,例如,涉及timeout()运算符。 我选择使用window()运算符每500毫秒对所有事件进行分组:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);

这是一个棘手的问题。 首先,我们将所有进度更新分组在500毫秒的窗口中。 然后,我们使用reduce来计算在此时间段内开采的硬币的总数(类似于scan() )。 如果在此期间未开采任何硬币,我们只需返回ZERO 。 最后,我们使用scan()汇总每个窗口的小计。 我们不再需要sample()因为window()确保每500毫秒发出一个事件。

客户端

JavaScript中有很多SSE用法示例,因此为您提供一种调用我们的控制器的快速解决方案:

var source = new EventSource("/mine/10");
source.onmessage = function (event) {console.info(event);
};

我相信SseEmitter是Spring MVC的一项重大改进,它将使我们能够编写更健壮和更快的Web应用程序,需要即时的单向更新。

翻译自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html

rxjava 循环发送事件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/337108.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文搞懂 | Linux 内核的 4 大 IO 调度算法

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删Linux 内核包含4个IO调度器&#xff1a;Noop IO schedulerAnticipatory IO schedulerDeadline IO scheduler CFQ IO scheduler。anticipatory, 预…

众神进入瓦尔哈拉_一时冲动:“通往瓦尔哈拉之路的冒险”

众神进入瓦尔哈拉通过所有有关Java 9和Project Jigsaw的讨论&#xff0c;我们不应忽视Java的另一重大变化。 希望在第10版或第11版中&#xff0c; Valhalla项目能够实现并介绍价值类型和专业化。 那么&#xff0c;这是什么一回事&#xff0c;项目进展如何&#xff0c;面临什么…

当电子工程师十余年,感慨万千!

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删当电子工程师也一些年头了&#xff0c;不算有出息&#xff0c;环顾四周&#xff0c;也没有看见几个有出息的。回顾工程师生涯&#xff0c;感慨万…

canva画图 图片居中裁剪_css实现不定宽高的图片img居中裁剪_类似微信朋友圈图片效果...

需求如下&#xff1a;前端需要显示矩形的缩略图&#xff0c;接口返回的图片尺寸大小不一&#xff0c;宽高不相等&#xff0c;需要前端来处理并显示成正方形&#xff0c;类似微信朋友圈图片的效果&#xff0c;等比例正方形显示在列表中&#xff0c;让图片根据宽高来自适应显示在…

哈希策略_优化哈希策略的简介

哈希策略总览 用于哈希键的策略可以直接影响哈希集合&#xff08;例如HashMap或HashSet&#xff09;的性能。 内置的哈希函数被设计为通用的&#xff0c;并且可以在各种用例中很好地工作。 我们可以做得更好&#xff0c;特别是如果您对用例有一个很好的了解吗&#xff1f; 测…

面试大全 | C语言高级部分总结,2.6万字长文

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、内存大话题1.0、内存就是程序的立足之地&#xff0c;体现内存重要性。1.1、内存理解&#xff1a;内存物理看是有很多个 Bank&#xff08;就是…

c#设计12星座速配软件_C#设计模式(12)——组合模式

阅读目录1.组合模式在软件开发中我们经常会遇到处理部分与整体的情况&#xff0c;如我们经常见到的树形菜单&#xff0c;一个菜单项的子节点可以指向具体的内容&#xff0c;也可以是子菜单。类似的情况还有文件夹&#xff0c;文件夹的下级可以是文件夹也可以是文件。举一个例子…

hibernate与jpa_将JPA Hibernate与OptaPlanner集成

hibernate与jpa我们一直在改进OptaPlanner与JEE其余部分的集成&#xff0c;因此更容易构建可以正常工作的最终用户应用程序。 让我们看一下改进的JPA Hibernate集成。 基础 JPA Hibernate和OptaPlanner都可以在POJO&#xff08;普通的旧Java对象&#xff09;上工作&#xff0c…

程序如何运行,编译、链接、装入?

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、地址概念和程序如何运行在多道程序环境下&#xff0c;要使程序运行&#xff0c;必须先为之创建进程。而创建进程的第一件事&#xff0c;便是…

python举两种字符串引号的例子_python里的单引号和双引号的有什么作用

在Python当中表达字符串既可以使用单引号&#xff0c;也可以使用双引号&#xff0c;那两者有什么区别吗&#xff1f;python单引号和双引号的区别简单来说&#xff0c;在Python中使用单引号或双引号是没有区别的&#xff0c;都可以用来表示一个字符串。但是这两种通用的表达方式…

枚举对象注释_如何以及何时使用枚举和注释

枚举对象注释本文是我们名为“ 高级Java ”的学院课程的一部分。 本课程旨在帮助您最有效地使用Java。 它讨论了高级主题&#xff0c;包括对象创建&#xff0c;并发&#xff0c;序列化&#xff0c;反射等。 它将指导您完成Java掌握的旅程&#xff01; 在这里查看 &#xff01;…

background 互联网图片_cssbackground-image和layer-background-image的区别

layer-background-image语法&#xff1a;layer-background-image : none | url (url)参数&#xff1a;none :  无背景图url :  使用绝对或相对地址指定背景图像说明&#xff1a;设置或检索对象整个区域的背景图像。示例&#xff1a;code {position: absolute;top: 100px; lef…

纪事本 乱码_纪事日记–可自定义的数据存储

纪事本 乱码总览 使任何数据结构或算法尽可能快的方法是使代码完全执行您想要的操作&#xff0c;而无需执行其他操作。 建立一个可以做任何人想做的每件事的数据存储的问题是&#xff0c;它做得特别不好。 自定义数据存储在性能方面可以实现什么&#xff1f; 您可以支持&#…

datavideo切换台说明书_【新品发布】datavideo SE-650 高清四通道切换台

还在为音乐演唱会的拍摄而烦恼吗&#xff1f;或者为体育比赛的多机位发愁&#xff1f;或者为微课、优课、慕课制作而焦头烂额&#xff1f;大部分用户对多机位的恐惧来源于相关产品令人发指的复杂和专业性&#xff0c;面对满眼的键盘会有无从下手之感&#xff0c;很多学校和工作…

NSA:建议从 C/C++ 切换到内存安全语言

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删美国国家安全局&#xff08;NSA&#xff09;发布了一份指南&#xff0c;旨在帮助软件开发商和运营商预防和缓解软件内存安全问题。其鼓励组织将编…

探索cqrs和事件源_编写基于事件的CQRS读取模型

探索cqrs和事件源关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式。 但是&#xff0c;尽管也有一些有趣的考虑&#xff0c;但读取模型经常被忽略。 在本文中&#xff0c;我们将介绍通过使用事件流来填充视图模型的示例实现。 总览 …

shmmax单位_kernel.shmmax ,kernel.shmmni 和kernel.shmall

kernel.shmmax 2147483648//该参数定义了共享内存段的最大尺寸(以字节为单位)。其值应>sag_max_size初始化参数的大小,否则SAG由多个内存段构成,效率降低,还要不小于物理内存的一半,默认情况下在32位x86系统中,OracleSGA最大不能超过1.7GB.缺省为32M&#xff0c;对于oracle…

C# 11正式发布

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删C# 11 现已发布。公告称&#xff0c;“随着每个版本的发布&#xff0c;社区的参与度越来越高&#xff0c;贡献了从建议、见解和错误报告到整个功…

java 自定义运算符_Java中集合的自定义运算符

java 自定义运算符总览 操作员重载有多种语言可用。 Java对String类型的运算符的支持对运算符的重载非常有限。 我们可以利用其他语言支持运算符的不同方式&#xff0c;但是我们可以在Java中实现一个使用Java已经使用的约定的实现。 获取&#xff0c;设置和放置操作 集合的运…

marker主题 ros_(五)ROS主题理解

参考网址&#xff1a;1&#xff0c;小海龟例子(1) 在新的终端打开roscore$ roscore&#xff0d;&#xff0d;&#xff0d;如果出错&#xff0c;请确定关闭所有ROS命令或者路径&#xff0c;重试。(2) 在新的终端打开运行小海龟界面$ rosrun turtlesimturtlesim_node得到结果&…