怎样做个人网站宜昌哪里有做网站的
news/
2025/9/29 15:02:24/
文章来源:
怎样做个人网站,宜昌哪里有做网站的,店商网站设计,定制跟模板网站有什么不一样这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距。 1.处理SQS客户端调用中的失败 2.该方法一次只能处理来自SQS的一条消息#xff0c;如何并行化 3.它不处理错误#xff0c;管道中的任何错误都会中… 这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距。 1.处理SQS客户端调用中的失败 2.该方法一次只能处理来自SQS的一条消息如何并行化 3.它不处理错误管道中的任何错误都会中断整个过程并停止从队列中读取更新的消息。 概括 回顾一下上一篇文章演示了如何使用出色的Project Reactor创建管道来处理来自AWS SQS队列的消息 该练习的最终结果是一个管道如下所示 有了这个管道让我现在讨论如何弥合差距 处理SQS客户端故障 此功能生成从SQS读取的消息流。 Flux.generate { sink: SynchronousSinkListMessage - val receiveMessageRequest: ReceiveMessageRequest ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: ListMessage sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) 现在考虑上述“ sqsClient”存在连接问题的情况 Flux的行为是在发生错误的情况下终止了流。 当然只要服务正在运行这对于服务于处理消息的服务就不会起作用。 解决方法是在出现错误的情况下仅重试处理流程。 Flux.generate { sink: SynchronousSinkListMessage - val receiveMessageRequest: ReceiveMessageRequest ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: ListMessage sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() 如果出现任何错误这将导致Flux重新建立消息流。 并行处理消息 Project Reactor提供了几种并行化处理管道的方式。 我第一次尝试并行处理是在处理链中添加“ subscribeOn”方法。 Flux.generate { sink: SynchronousSinkListMessage - val receiveMessageRequest: ReceiveMessageRequest ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: ListMessage sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .subscribeOn(Schedulers.newElastic( sub )) 但是这不是“ subscribeOn”的工作方式。 当我向该管道发送一些消息时输出如下 2020 - 04 - 07 20 : 52 : 53.241 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.434 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.493 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.538 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.609 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.700 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 上面的“ sub-3”是处理消息的线程的名称看起来所有消息都在“ sub-3”线程上进行处理而没有其他线程在处理 subscriptionOn只是通过从此调度程序池中借用“线程”来更改执行上下文而不使用池本身中的所有线程。 那么如何使处理并行化呢 这个StackOverflow答案提供了我在这里使用的一种非常好的方法本质上是使用 flatMap运算符然后在“ flatMap”运算符内添加“ subscribeOn”运算符。 该运算符急切地订阅其内部发布者然后将结果展平其诀窍是可以为内部订阅者提供他们自己的调度程序并且对于每个订阅最终将使用调度程序池中的线程。 这些并发订阅者的数量可以使用传递给flatMap运算符的“并发”参数来控制。 Flux.generate { sink: SynchronousSinkListMessage - val receiveMessageRequest: ReceiveMessageRequest ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: ListMessage sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .flatMap({ (message: String, deleteHandle: () - Unit) - task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .then() .subscribeOn(taskScheduler) }, concurrency) 处理多个消息时的输出如下所示– 2020 - 04 - 08 21 : 03 : 24.582 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.815 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 5 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 6 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 7 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.817 INFO 17541 --- [ taskHandler- 8 ] sample.msg.MessageListenerRunner : Processed Message hello 现在查看日志中除了线程名taskHandler- *之外还有更多 处理下游错误 我以前使用“重试”运算符进行的修复之一是关于使用sqsClient连接处理上游错误。 但是有可能在管道中处理消息并且任何步骤引发错误时整个管道都会失败。 因此重要的是要防止每一步失败。 我一直致力于确保错误不会传播的一种巧妙方法是使用出色的vavr库及其“尝试”类型 。 尝试类型具有两个结果–一个成功成功或一个异常失败。 这使其余的管道可以按可衡量的方式对上一步的结果进行操作 .flatMap({ (message: String, deleteHandle: () - Unit) - task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .doOnNext { t - t.onFailure { e - LOGGER.error(e.message, e) } } .then() .subscribeOn(taskScheduler) }, concurrency) 上面的代码段演示了一种方法在该方法中我知道负责删除消息的“ deleteHandle”会引发异常Try捕获了此异常如果有错误记录了异常则该异常不会缩短消息流。 结论 我最初的想法是因为我已经采取了一种被动的方式来处理消息所以我将在我的sqs消息处理管道中获得巨大的推动但是我的学习是就像其他所有事情一样需要对基于Project的反应堆进行仔细的了解和调整流以有效地处理消息。 我敢肯定还有更多课程可供我学习我将像我一样记录下来。 整个示例可在我的github存储库中找到 -https//github.com/bijukunjummen/boot-with-sns-sqs 翻译自: https://www.javacodegeeks.com/2020/04/processing-sqs-messages-using-spring-boot-and-project-reactor-part-2.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/921918.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!