日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

corda_Corda服务的异步流调用

發布時間:2023/12/3 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 corda_Corda服务的异步流调用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

corda

如何使流程更快? 如果您與Corda合作已有一段時間,那么您很有可能已經考慮過這一點。 您可以通過以下幾方面進行合理的調整以提高性能:事務大小,優化查詢并減少整個Flow執行過程中所需的網絡躍點數。 在某種程度上,還有另一種可能也使您著迷。 多線程。

更具體地說,從已經執行的流程異步啟動流程/子流程。 這樣做有可能極大地改善您的CorDapps性能。

如果您嘗試此操作,則可能會遇到與我得到的類似的例外。 此外,到目前為止,Corda還不支持子流的線程化。 但是,仍然可以做到。 我們只需要對此保持聰明。 那就是Corda Services中多線程進入的地方。它們可以在Flow中調用,但不會妨礙Flow對其施加的嚴格規則,因為正在執行的Flow不會從服務中掛起或檢查點。

在本文中,我將重點介紹從服務內部以多線程方式啟動流程。 在Corda中還可以使用其他線程,但這是我想深入研究的有趣領域。 另一方面,從服務啟動流也充滿了一些陷阱。 這些需要考慮并遍歷。 否則,您將有一天醒來,想知道為什么一切都沒有明顯的原因停止了。

幸運的是,我在這里為您提供幫助。 對我來說,嗯,我不得不直面這個問題。

對我來說幸運的是,R3能夠提供幫助。

作為參考,我將在本文中使用Corda Enterprise 3.1 。 要從本文的內容中真正受益,您將需要使用Enterprise。 這是由于Enterprise支持多個異步執行的流。 開源目前不允許這樣做。

我還建議您查看我以前的文章Corda Services 101,因為我們將在此基礎上建立基礎。

情境

讓我們首先概述一下本文將要使用的場景。

  • 隨著時間的推移,甲方向甲方發送一些消息。 每個消息來自一個流。
  • 甲方回應發送給他們的所有消息。 每個消息都來自單個Flow,但是它們希望在單個位置執行該過程。

可以快速組合一系列流程來滿足此要求。 按順序執行此操作應該證明絕對是零問題(在糾正了我們所有犯下的愚蠢錯誤之后)。

盡管這種情況對于需要性能的情況來說是一個很差的情況,但是它很容易理解,因此我們可以專注于異步運行。

慢速同步解決方案

在研究異步解決方案之前,快速瀏覽一下將要使用的代碼將是有益的。 下面是ReplyToMessagesFlow的代碼。 我不想遍歷所有底層代碼,而只想專注于與此帖子相關的代碼:

@InitiatingFlow @StartableByRPC class ReplyToMessagesFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == ourIdentity }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)@Suspendableprivate fun reply(message: StateAndRef) = subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)} }

如果您確實閱讀過Corda Services 101,那么您可能已經認識到此類。 正如我之前提到的,為提出的問題組合解決方案非常容易。 從Vault檢索MessageState ,然后啟動子subFlow以subFlow進行回復。

這段代碼將愉快地逐個傳遞消息。

那么,我們可以采用此代碼并使其更快嗎?

異步嘗試失敗

讓我們嘗試通過引入線程來使當前代碼更快! 我們將使用CompletableFutures來做到這一點:

@InitiatingFlow @StartableByRPC class ReplyToMessagesBrokenAsyncFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before }

大多數代碼與以前相同,因此已從示例中排除。

對代碼的唯一更改是添加了CompletableFuture及其supplyAsync方法(來自Java)。 它嘗試在單獨的線程上開始為每個消息執行reply功能。

那么為什么將本節命名為“一次失敗的嘗試”? 我引用您執行以上代碼時獲得的堆棧跟蹤:

java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172] Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]

您將獲得它,以及Corda正在打印的一長串檢查點日志行。 此外,只是為了掩蓋我的屁股,并向您證明這不是由于CompletableFuture的問題引起的,這是使用Executor線程池時出現的另一個錯誤:

Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748) java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

希望您在這一點上相信我。 如果不是,請參考我一開始所說的內容。 Corda當前不支持從正在執行的流程異步啟動新流程。 我相信他們正在努力。 但是,截至目前。 不要使用此解決方案。

可行的異步解決方案

我們已經看到,在Flow內部執行線程是行不通的。 為了繼續追求性能,我們現在來看一下Corda服務中的線程。 這并不奇怪,因為標題和開頭的段落已經討論了這一點……

拋開諷刺的評論。 委派服務將需要對原始解決方案進行一些重做,但是大部分代碼將保持不變。 大部分內容將被復制并粘貼到另一個類中。 從流中獲取代碼并將其放入服務中。

以下是新的MessageService ,其中包含原始ReplyToMessagesFlow的代碼,但進行了一些更改和添加了線程代碼:

@CordaService class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor = Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == serviceHub.myInfo.legalIdentities.first() }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) =serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)} }

如您所見,大多數代碼與ReplyToMessagesFlow中的代碼相同。

我要強調的第一點是使用Executor線程池。 我之所以沒有在這里使用CompletableFutures ,是因為稍后我們將對其進行研究。

那么,這一切如何運作? replyAll函數在新的系統線程上對從Vault檢索到的每條消息執行reply 。 這個新線程又調用startFlow 。 觸發將新的流程放入“流程工作器”隊列中。 這是所有樂趣發生的地方,一切開始變得混亂。

Flow Worker隊列負責執行Flow的執行順序,并隨著Flow的添加和完成而填充并為空。 該隊列對于協調節點內流的執行至關重要。 當涉及到多線程Flows本身時,它也是痛苦的根源。

下圖顯示了隊列的簡化視??圖:

流進入隊列并在處理后離開

我為什么要談論這個隊列? 好吧,我們需要格外小心,不要將無法完成的流程填滿隊列。

怎么會這樣 通過在正在執行的流程中啟動流程,然后流程等待其完成。 直到隊列的線程池中的所有線程都遇到這種情況,這才不會引起問題。 一旦發生,它將使隊列陷入僵局。 沒有流程可以完成,因為它們都依賴于許多排隊的流程來完成。

流留在隊列中,等待它們調用的流完成

這種情況最有可能發生在多次觸發相同流量的高吞吐量系統上。 現在,隊列中充滿了等待其他流完成的機會。

這不是很好,使事情變得有點困難。 但是,只要我們意識到這一點,我們就可以適應它。

這也是Executor線程池而不是CompletableFuture的原因。 通過啟動新的流程而不等待其完成,可以避免死鎖。 這也是該解決方案的缺點。 沒有新Flow的結果,其功能將極為有限。

話雖如此,如果您的用例適合上面顯示的結構,那么我絕對建議您使用此解決方案。

在下一節中,我將討論使用CompletableFuture 。

CompletableFutures的危險解決方案

這很危險的原因很簡單。 僵局。 我建議不要使用此解決方案。 除非您的節點有權訪問足夠的線程,否則要減少用無法完成的線程填充隊列的機會。 另一方面,這是一個更為理想的解決方案,因為您可以等待啟動的流程的結果并對其進行處理。 這使解決方案更加有用。

以下是帶有CompletableFutures的MessageService外觀:

@CordaService class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List =messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before }

除replyAll函數外,代碼replyAll 。 返回的CordaFuture提供的toCompletableFuture函數,調用join以等待所有期貨的結果并返回總體結果。

如前所述,該解決方案可能導致死鎖。 但是,對于您的情況,也許并非如此。 由您決定發生這種情況的可能性。 如果不利于您,最好走開。 選擇堅持使用同步或異步解決方案,類似于我在上一節中詳細介紹的解決方案。

我真的需要這樣做嗎?

現在,是的,我相信你會的。

展望未來,我懷疑您是否需要依靠我在本文中提出的解決方案。

我相信Corda正在努力消除從Flow內部啟動Flow時甚至不必考慮線程的需求。 取而代之的是,您可以簡單地調用subFlow并帶有一個選項以使其異步運行。 這將使我們能夠保留原始的同步解決方案,但可以選擇使每個subFlow在單獨的線程上運行。

將各部分結合在一起

總之,在Corda Enterprise 3中,可以在正在執行的Flow中異步啟動新的Flow。 根據您的用例,這可以提供良好的性能優勢。 有缺點。 您不能等待異步流的結果,而不會用死鎖的威脅來威脅您的節點。 節點的基礎隊列無法處理它所處的情況。因此,您需要注意如何將線程引入Flow調用。 值得慶幸的是,隨著Corda的發展,您甚至根本不必擔心自己這樣做。 它甚至可能像添加布爾函數參數一樣簡單。 那是夢想!

這篇文章中使用的代碼可以在我的GitHub上找到 。

如果您發現此帖子有幫助,可以在Twitter上@LankyDanDev關注我,以了解我的新帖子。

翻譯自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html

corda

總結

以上是生活随笔為你收集整理的corda_Corda服务的异步流调用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 波多野结衣精品视频 | 日本理论片中文字幕 | 亚洲综合图片网 | 97av视频| av第一福利大全导航 | 国产精品久久精品 | 日韩亚洲欧美在线观看 | 在线观看视频你懂得 | 欧美在线中文字幕 | 久草色在线 | 丰满岳乱妇一区二区三区 | 性欧美大战久久久久久久 | 欧亚av在线 | 蜜臀久久99精品久久久画质超高清 | 永久免费看成人av的动态图 | 尤物视频网站在线观看 | 亚洲性色视频 | 精品国精品国产 | 日本少妇xxx | 黄色小视频免费看 | 少妇精品久久久久久久久久 | 亚洲精品二三区 | 亚洲视频一区二区在线观看 | 日本美女交配 | 亚洲欧洲中文 | 亚洲欧洲免费视频 | 四虎影库永久在线 | yy4138理论片动漫理论片 | 欧美日韩少妇精品 | 亚洲一区二区三区欧美 | 婷婷精品进入 | 成人91视频 | 91文字幕巨乱亚洲香蕉 | 人成午夜| 成年人免费网站在线观看 | 国产xxx | 色哟哟在线播放 | 久久久精品国产sm调教 | 国产精品sm调教免费专区 | 天海翼一区二区 | 香蕉私人影院 | 欧美性做爰大片免费 | 欧美日韩亚洲不卡 | 国产白浆一区二区 | 久草视频福利在线 | www.五月.com| 9191久久| 日本黄网站 | 337p日本欧洲亚洲大胆精筑 | 成人精品免费 | 国产高清视频网站 | 九九看片 | 午夜少妇久久久久久久久 | 色牛影院 | 亚洲香蕉一区 | 污网站在线免费看 | 黄色视屏网站 | 亚洲视频在线播放 | 国产乱码精品1区2区3区 | 亚洲精品五月天 | 国产精品三级在线观看无码 | 产乳奶汁h文1v1 | 无码人妻久久一区二区三区 | 国产最新地址 | 九九热免费 | 在线观看国产视频 | 亚洲区第一页 | 国产成人综合一区二区三区 | 国产日韩欧美中文字幕 | 中文字幕不卡视频 | a天堂在线视频 | 中文字幕av专区 | 日本成人午夜视频 | 日日骚视频 | 超碰98在线观看 | 日韩免费观看av | 日韩精品在线观看网站 | 欧洲精品码一区二区三区免费看 | 欧美日韩免费网站 | 国产粉嫩呻吟一区二区三区 | 日韩免费观看视频 | 在线免费看mv的网站入口 | 国产伦精品一区二区三区妓女下载 | 九九久久综合 | 你懂的在线播放 | 中文乱码人妻一区二区三区视频 | 天天曰夜夜操 | 波多野结衣欧美 | 久久av影视 | 成人高清视频免费观看 | 日本高清有码 | 一本色道av| 久色视频在线播放 | 91国内揄拍国内精品对白 | 蜜桃久久久aaaa成人网一区 | 激情久 | 午夜黄色影院 | 伊人久色 | www.黄色网址 |