corda_Corda服务的异步流调用

corda

如何使流程更快? 如果您与Corda合作已有一段时间,那么您很有可能已经考虑过这一点。 您可以通过以下几方面进行合理的调整以提高性能:事务大小,优化查询并减少整个Flow执行过程中所需的网络跃点数。 在某种程度上,还有另一种可能也使您着迷。 多线程。

更具体地说,从已经执行的流程异步启动流程/子流程。 这样做有可能极大地改善您的CorDapps性能。

如果您尝试此操作,则可能会遇到与我得到的类似的例外。 此外,到目前为止,Corda还不支持子流的线程化。 但是,仍然可以做到。 我们只需要对此保持聪明。 那就是Corda Services中多线程进入的地方。它们可以在Flow中调用,但不会妨碍Flow对其施加的严格规则,因为正在执行的Flow不会从服务中挂起或检查点。

在本文中,我将重点介绍从服务内部以多线程方式启动流程。 在Corda中还可以使用其他线程,但这是我想深入研究的有趣领域。 另一方面,从服务启动流也充满了一些陷阱。 这些需要考虑并遍历。 否则,您将有一天醒来,想知道为什么一切都没有明显的原因停止了。

幸运的是,我在这里为您提供帮助。 对我来说,嗯,我不得不直面这个问题。

对我来说幸运的是,R3能够提供帮助。

作为参考,我将在本文中使用Corda Enterprise 3.1 。 要从本文的内容中真正受益,您将需要使用Enterprise。 这是由于Enterprise支持多个异步执行的流。 开源目前不允许这样做。

我还建议您查看我以前的文章Corda Services 101,因为我们将在此基础上建立基础。

情境

让我们首先概述一下本文将要使用的场景。

  • 随着时间的推移,甲方向甲方发送一些消息。 每个消息来自一个流。
  • 甲方回应发送给他们的所有消息。 每个消息都来自单个Flow,但是它们希望在单个位置执行该过程。

可以快速组合一系列流程来满足此要求。 按顺序执行此操作应该证明绝对是零问题(在纠正了我们所有犯下的愚蠢错误之后)。

尽管这种情况对于需要性能的情况来说是一个很差的情况,但是它很容易理解,因此我们可以专注于异步运行。

慢速同步解决方案

在研究异步解决方案之前,快速浏览一下将要使用的代码将是有益的。 下面是ReplyToMessagesFlow的代码。 我不想遍历所有底层代码,而只想专注于与此帖子相关的代码:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == ourIdentity }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)@Suspendableprivate fun reply(message: StateAndRef) = subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如果您确实阅读过Corda Services 101,那么您可能已经认识到此类。 正如我之前提到的,为提出的问题组合解决方案非常容易。 从Vault检索MessageState ,然后启动子subFlowsubFlow进行回复。

这段代码将愉快地逐个传递消息。

那么,我们可以采用此代码并使其更快吗?

异步尝试失败

让我们尝试通过引入线程来使当前代码更快! 我们将使用CompletableFutures来做到这一点:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesBrokenAsyncFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before
}

大多数代码与以前相同,因此已从示例中排除。

对代码的唯一更改是添加了CompletableFuture及其supplyAsync方法(来自Java)。 它尝试在单独的线程上开始为每个消息执行reply功能。

那么为什么将本节命名为“一次失败的尝试”? 我引用您执行以上代码时获得的堆栈跟踪:

java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172]
Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]

您将获得它,以及Corda正在打印的一长串检查点日志行。 此外,只是为了掩盖我的屁股,并向您证明这不是由于CompletableFuture的问题引起的,这是使用Executor线程池时出现的另一个错误:

Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

希望您在这一点上相信我。 如果不是,请参考我一开始所说的内容。 Corda当前不支持从正在执行的流程异步启动新流程。 我相信他们正在努力。 但是,截至目前。 不要使用此解决方案。

可行的异步解决方案

我们已经看到,在Flow内部执行线程是行不通的。 为了继续追求性能,我们现在来看一下Corda服务中的线程。 这并不奇怪,因为标题和开头的段落已经讨论了这一点……

抛开讽刺的评论。 委派服务将需要对原始解决方案进行一些重做,但是大部分代码将保持不变。 大部分内容将被复制并粘贴到另一个类中。 从流中获取代码并将其放入服务中。

以下是新的MessageService ,其中包含原始ReplyToMessagesFlow的代码,但进行了一些更改和添加了线程代码:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor = Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == serviceHub.myInfo.legalIdentities.first() }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) =serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如您所见,大多数代码与ReplyToMessagesFlow中的代码相同。

我要强调的第一点是使用Executor线程池。 我之所以没有在这里使用CompletableFutures ,是因为稍后我们将对其进行研究。

那么,这一切如何运作? replyAll函数在新的系统线程上对从Vault检索到的每条消息执行reply 。 这个新线程又调用startFlow 。 触发将新的流程放入“流程工作器”队列中。 这是所有乐趣发生的地方,一切开始变得混乱。

Flow Worker队列负责执行Flow的执行顺序,并随着Flow的添加和完成而填充并为空。 该队列对于协调节点内流的执行至关重要。 当涉及到多线程Flows本身时,它也是痛苦的根源。

下图显示了队列的简化视​​图:

流进入队列并在处理后离开

我为什么要谈论这个队列? 好吧,我们需要格外小心,不要将无法完成的流程填满队列。

怎么会这样 通过在正在执行的流程中启动流程,然后流程等待其完成。 直到队列的线程池中的所有线程都遇到这种情况,这才不会引起问题。 一旦发生,它将使队列陷入僵局。 没有流程可以完成,因为它们都依赖于许多排队的流程来完成。

流留在队列中,等待它们调用的流完成

这种情况最有可能发生在多次触发相同流量的高吞吐量系统上。 现在,队列中充满了等待其他流完成的机会。

这不是很好,使事情变得有点困难。 但是,只要我们意识到这一点,我们就可以适应它。

这也是Executor线程池而不是CompletableFuture的原因。 通过启动新的流程而不等待其完成,可以避免死锁。 这也是该解决方案的缺点。 没有新Flow的结果,其功能将极为有限。

话虽如此,如果您的用例适合上面显示的结构,那么我绝对建议您使用此解决方案。

在下一节中,我将讨论使用CompletableFuture

CompletableFutures的危险解决方案

这很危险的原因很简单。 僵局。 我建议不要使用此解决方案。 除非您的节点有权访问足够的线程,否则要减少用无法完成的线程填充队列的机会。 另一方面,这是一个更为理想的解决方案,因为您可以等待启动的流程的结果并对其进行处理。 这使解决方案更加有用。

以下是带有CompletableFuturesMessageService外观:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List =messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before
}

replyAll函数外,代码replyAll 。 返回的CordaFuture提供的toCompletableFuture函数,调用join以等待所有期货的结果并返回总体结果。

如前所述,该解决方案可能导致死锁。 但是,对于您的情况,也许并非如此。 由您决定发生这种情况的可能性。 如果不利于您,最好走开。 选择坚持使用同步或异步解决方案,类似于我在上一节中详细介绍的解决方案。

我真的需要这样做吗?

现在,是的,我相信你会的。

展望未来,我怀疑您是否需要依靠我在本文中提出的解决方案。

我相信Corda正在努力消除从Flow内部启动Flow时甚至不必考虑线程的需求。 取而代之的是,您可以简单地调用subFlow并带有一个选项以使其异步运行。 这将使我们能够保留原始的同步解决方案,但可以选择使每个subFlow在单独的线程上运行。

将各部分结合在一起

总之,在Corda Enterprise 3中,可以在正在执行的Flow中异步启动新的Flow。 根据您的用例,这可以提供良好的性能优势。 有缺点。 您不能等待异步流的结果,而不会用死锁的威胁来威胁您的节点。 节点的基础队列无法处理它所处的情况。因此,您需要注意如何将线程引入Flow调用。 值得庆幸的是,随着Corda的发展,您甚至根本不必担心自己这样做。 它甚至可能像添加布尔函数参数一样简单。 那是梦想!

这篇文章中使用的代码可以在我的GitHub上找到 。

如果您发现此帖子有帮助,可以在Twitter上@LankyDanDev关注我,以了解我的新帖子。

翻译自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html

corda

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/332934.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Powerline专用字体安装_PowerFonts字体库安装_Meslo LG字体安装

文章目录单独下载某个 Meslo LG 字体通过命令方式安装全部 Meslo LG 字体zsh 如果使用主题 agnoster&#xff0c;而该主题依赖于 Powerline&#xff0c;而 Powerline 的正常显示有依赖于字体 Meslo LG&#xff0c;要不然会出现乱码的情况&#xff0c;如下所示&#xff1a;单…

delphi7 中文注释字体_使用nerd-font/font-patcher为字体添加字体图标

Nerd-fonts常用来在终端下显示各种图标&#xff0c;这个项目的github repo下提供了许多Nerd Font字体&#xff0c;图标使用效果如图。由于目前Windows Terminal还不支持设置第二字体&#xff0c;要想使用nerd font就必须替换掉原有字体。Nerd font的github页面下提供了许多常见…

Mybatis框架(待完善)

框架 框架&#xff08;Framework&#xff09;是整个或部分系统的可重用设计&#xff0c;表现为一组抽象构件及构件实例间交互的方法&#xff1b;另一种定义认为&#xff0c;框架是可被应用开发者定制的应用骨架。前者是从应用方面而后者是从目的方面给出的定义。 简而言之&a…

MacOS安装Powerline

文章目录安装卸载查看配置&#xff08;可以忽略这部分&#xff09;Powerline 是一款比较酷炫的状态栏工具&#xff0c;可以美化终端和 vim 界面&#xff0c;由 python 开发&#xff0c;目前仅支持 python2.X&#xff0c;由于 python2 和 python3 互不兼容&#xff0c;安装前要了…

充电器用着用着就松了怎么办_手机充电口松动怎么办?

如果手机充不进电&#xff0c;首先要检查充电器是否完好&#xff0c;如果充电器完好才考虑手机充电接口松动的可能性。手机充电口松动时&#xff0c;先换一个充电器试试&#xff0c;好多原装充电器接口挤压或踩压难免变薄&#xff0c;而手机接口长时间充电时抻曳也难免变大&…

jakarta ee_适用于Java EE / Jakarta EE开发人员的Micronaut

jakarta ee城镇中有一个名为Micronaut的新微服务框架。 在本文中&#xff0c;我将从Java EE / Jakarta EE的角度讨论如何冒险使用Micronaut框架。 我是Java EE开发人员&#xff0c;因此使用诸如Eclipse MicroProfile之类的解决方案开发微服务更接近我的专业知识&#xff0c;但是…

mysql(待完善)

下载 https://www.mysql.com/cn/downloads/ 下载步骤 1.MYSQL Comunity(GPL) Downloads 2.MySql Community Server 3.根据自己电脑配置下载 4.直接下载 5.注意安装的密码加密方式 选择 使用旧版密码加密 一定要选择Use Legacy Password Encryption。&#xff08;备注&#x…

配置 Powerline 到 Vim

只需要在 vim 的配置文件 .vimrc 里面加上以下语句&#xff0c;再重启一下终端就可以啦&#xff5e; set rtp/usr/local/lib/python2.7/site-packages/powerline/bindings/bash/powerline.shset nocompatible set t_Co256let g:minBufExplForceSyntaxEnable 1 python from po…

c++判断二叉树是否为二叉搜索树_原创 | 好端端的数据结构,为什么叫它SB树呢?...

点击上方蓝字&#xff0c;关注并星标&#xff0c;和我一起学技术。大家好&#xff0c;今天给大家介绍一个很厉害的数据结构&#xff0c;它的名字就很厉害&#xff0c;叫SB树&#xff0c;业内大佬往往叫做傻叉树。这个真不是我框你们&#xff0c;而是它的英文缩写就叫SBT。SBT其…

ide 日志 乱码_IDE日志分析方法pt。 1个

ide 日志 乱码介绍 我认为大多数软件工程师都了解日志的重要性。 它们已成为软件开发的一部分。 如果无法解决问题&#xff0c;我们尝试在日志中查找原因。 对于一些简单的情况&#xff0c;当错误阻止应用程序打开窗口时&#xff0c;这可能就足够了。 您可以在日志中找到问题&a…

.bash_profile vs .bashrc

请参阅&#xff1a;https://joshstaiger.org/archives/2005/07/bash_profile_vs.html

生成ssh证书(windows)

ssh -keygen -t rsa 生成ssh证书 /home/work/.ssh authorized_keys 客户端建立私钥和公钥 在客户端终端运行命令 ssh-keygen -t rsa https://www.cnblogs.com/ggjucheng/archive/2012/08/19/2646346.html https://blog.csdn.net/qq_36667170/article/details/79094257

日志间隔_在日志中搜索时间间隔

日志间隔介绍 这篇文章与我有关日志分析的迷你系列文章间接相关。 最好阅读两个主要部分&#xff0c;以更好地理解我在说什么。 第1 部分 &#xff0c; 第2部分 。 这篇文章描述了我在实现IDE方法时遇到的一个重要问题。 任务描述 当某人使用日志时&#xff0c;通常只需要调查…

如果在iTerm2中复制命令特别卡,就跟慢动作似的,怎么办?

如果在 iTerm2 中复制命令特别卡&#xff0c;就跟慢动作似的。你可以编辑 ~/.zshrc&#xff1a; vim ~/.zshrc增加如下内容&#xff1a; pasteinit() {OLD_SELF_INSERT${${(s.:.)widgets[self-insert]}[2,3]}zle -N self-insert url-quote-magic # I wonder if youd need .ur…

lambda表达式的使用

package com.asx.application.common.utils;import org.junit.Test;import java.util.Comparator; import java.util.function.Consumer;/*** lambda表达式的使用* 1.举例:(o1,o2) -> Integer.compare(o1,o2) ;* 2.格式* ->&#xff1b;lambda操作符 或 箭头操作符* ->…

centos桥接模式怎么联网_今日回收 | 互联网+废品回收模式是怎么兴起的呢?

随着社会的不断发展和进步&#xff0c;废品回收已不再是传统和低效的行业&#xff0c;而是我国现如今整合资源的重要手段。而该行业之所以能够有如此成就&#xff0c;只因其中98%的企业结合了互联网&#xff0c;成功实现了企业转型。据统计&#xff0c;我国目前废品回收的相关企…

文档 修订 非修订区别_修订和不变性

文档 修订 非修订区别这是一个简短的帖子。 我不确定如何启动它。 这是审阅一些现有代码时“为什么我没有想到这一刻”之一。 由于存在NDA&#xff0c;我无法共享实际代码。 它与处理修订有关。 我能与之联系最紧密的是WordPress&#xff08;WP&#xff09;如何处理博客文章和修…

终端界面如何改成彩色的

很多朋友说自己的终端一直是黑白的&#xff0c;如何改成彩色的呢&#xff1f;在用户目录的 .profile 里加上这两行即可&#xff1a; export CLICOLOR 1 export LSCOLORSgxfxcxdxbxegedabagacad

深度解析Java可变参数类型以及与数组的区别

可变参数类型&#xff1a;variable argument type 1.可变参数是兼容数组类参数的&#xff0c;但是数组类参数却无法兼容可变参数 //说明&#xff1a;可变参数可以兼容数组参数 public class TestVarArgus {public static void dealArray(int... intArray) {for (int i : intA…

ios nslog 例子_iOS Block实例

iOS之Block详解&#xff1a;Block详解ViewController.h(ARC)#import interface ViewController : UIViewController// 属性声明的block都是全局的__NSGlobalBlock__property (nonatomic, copy) void (^copyBlock)();property (nonatomic, weak) void (^weakBlock)();endViewCon…