日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

natty的异步通信框架_OpenHub框架进行的异步通信

發(fā)布時間:2023/12/3 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 natty的异步通信框架_OpenHub框架进行的异步通信 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

natty的異步通信框架

在本系列的前一部分中,我們介紹了OpenHub框架 。 這部分顯示了框架最強大的功能之一- 異步消息傳遞模型 。

當(dāng)源系統(tǒng)無法等待目標(biāo)系統(tǒng)的響應(yīng)時,將使用系統(tǒng)之間的異步通信。 有以下幾個原因:

  • 源系統(tǒng)必須盡可能地響應(yīng) ,并且不受外部影響(通信緩慢,目標(biāo)系統(tǒng)不穩(wěn)定等)的影響
  • 在目標(biāo)系統(tǒng)中處理需要很多時間
  • 異步通信對性能和流量產(chǎn)生積極影響

異步方案

當(dāng)您決定異步通信時,您必須考慮可能的情況:

  • 目標(biāo)系統(tǒng)必須確認傳入消息已成功保存 ,并準(zhǔn)備進行進一步處理。 是否應(yīng)將異步過程的最終結(jié)果通知源系統(tǒng)
  • 如果異步處理失敗該怎么辦? 如果存在暫時性技術(shù)錯誤 (例如,與另一個系統(tǒng)的通信失敗), 請嘗試幾次,或者因為存在業(yè)務(wù)錯誤 (例如,輸入數(shù)據(jù)無效)而停止進一步處理
  • 在異步處理期間會調(diào)用其他系統(tǒng)- 如果對第一個系統(tǒng)的調(diào)用可以,但對第二個系統(tǒng)的調(diào)用失敗,該怎么辦? 異步處理必須是冪等的,并跳過第一個成功的呼叫,然后僅重試第二個呼叫。
  • 異步過程可能很棘手,因此最好將一個大進程(父)劃分為較小的(子)進程 。 如果處理了子進程,則父進程也將完成。
  • 有時,您必須保證傳入請求的順序 (請求的到達順序與發(fā)送的順序不相同)并按照確切的順序進行處理。
  • 它是異步處理,您需要對其進行監(jiān)視或在發(fā)生意外情況 (例如異步過程失敗) 時自動得到通知
  • 有時,您需要保存數(shù)據(jù)或異步過程的當(dāng)前狀態(tài)以嘗試成功完成之間,例如,第一次調(diào)用外部系統(tǒng)的結(jié)果就是第二次調(diào)用的輸入。

當(dāng)您開始考慮所有這些情況時,您會發(fā)現(xiàn)從頭開始實施它并不容易。 OpenHub框架內(nèi)置了對異步消息處理的支持。 它易于使用,但同時又強大又靈活。 并且也是可配置的,例如,該過程應(yīng)再次運行多少次? 在哪個時間間隔?

異步路由實現(xiàn)

使用OpenHub框架的路由實現(xiàn)有兩個子路由:

  • 一種用于處理傳入消息( RouteIn )
  • 一種用于異步過程實現(xiàn)( RouteOut )
/*** Route definition for asynchronous operation "translate" via web services.*/ @CamelConfiguration(value = AsyncTranslateWsRoute.ROUTE_BEAN) public class AsyncTranslateWsRoute extends AbstractBasicRoute {static final String ROUTE_BEAN = "asyncTranslateWsRouteBean";private static final String OPERATION_NAME = "asyncTranslateWs";static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT;@Overrideprotected void doConfigure() throws Exception {// asyncTranslate - input asynch messagecreateAsyncRouteIn();// asyncTranslate - process delivery (=asynchronous execution)createAsyncRouteOut();}/*** Route for asynchronous <strong>asyncTranslate</strong> input operation.* <p/>* Prerequisite: none* <p/>* Output: {@link AsyncTranslateResponse}*/private void createAsyncRouteIn() {Namespaces ns = new Namespaces("h", TranslateWebServiceConfig.TRANSLATE_SERVICE_NS);// note: mandatory parameters are set already in XSD, this validation is extraXPathValidator validator = new XPathValidator("/h:asyncTranslateRequest", ns, "h:inputText");AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME,getInWsUri(new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest")),new AsynchResponseProcessor() {@Overrideprotected Object setCallbackResponse(CallbackResponse callbackResponse) {AsyncTranslateResponse res = new AsyncTranslateResponse();res.setConfirmAsyncTranslate(callbackResponse);return res;}}, jaxb(AsyncTranslateResponse.class)).withValidator(validator).build(this);}/*** Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution).* Only input text is logged in this case.* <p/>* Prerequisite: none*/private void createAsyncRouteOut() {final String URI_LOG_INPUT_PARAMS = "direct:logInputParams";from(URI_ASYNC_OUT).routeId(ROUTE_ID_ASYNC_OUT)// xml -> AsyncTranslateRequest.unmarshal(jaxb(AsyncTranslateRequest.class)).to("extcall:message:" + URI_LOG_INPUT_PARAMS);from(URI_LOG_INPUT_PARAMS).validate(body().isInstanceOf(AsyncTranslateRequest.class)).log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})");} }

RouteIn使用AsynchRouteBuilder通過以下功能輕松配置:

  • 定義哪個傳入的Web Service請求應(yīng)開始此路由
  • 定義源系統(tǒng)的確認響應(yīng)。 輸入路由成功執(zhí)行后,將返回對源系統(tǒng)的同步響應(yīng)。
  • 定義驗證器,該驗證器檢查傳入請求中是否存在元素inputText

RouteOut定義了異步過程本身。 在這種情況下,僅記錄輸入請求( AsyncTranslateRequest )。

就這樣。 周圍的一切都由OpenHub框架實現(xiàn)。

外部通話

您的路線實施通常會調(diào)用外部系統(tǒng)或其他路線。 如果實現(xiàn)異步過程,則必須遵守冪等規(guī)則-可以多次調(diào)用過程的每個部分,并且必須確保所有調(diào)用的行為相同。 有時外部系統(tǒng)/路由本身是冪等的,然后您可以根據(jù)需要多次調(diào)用它。 如果沒有,那么您必須在實現(xiàn)中對其進行控制。 因此,我們將Camel組件設(shè)為extcall 。

上例中的組件excall確保即使整個異步過程運行了多次,使用URI_LOG_INPUT_PARAMS路由也會被精確地調(diào)用一次。

外部通話說明

描述:

  • 在異步消息處理期間調(diào)用了兩個外部系統(tǒng)
  • 在處理過程中我們可以返回兩個extcall的停靠點
    • 如果在對外部系統(tǒng)1的第一次請求之前發(fā)生錯誤,則下一個處理嘗試將從頭開始,與新消息到達相同

漏斗和節(jié)流組件

其他強大的組件是funnel和throttling 。

Funnel組件用于過濾特定集成點處的并發(fā)消息。 這種過濾可確保在該位置僅處理一個特定類型的消息或同時具有特定信息的消息,即使是以保證的順序(可選選項)進行處理。 對于與外部系統(tǒng)進行通信很有用,該外部系統(tǒng)一次只能接受一個特定實體(例如,訂購系統(tǒng)中的一個特定客戶)的一條輸入消息。

第二個組件throttling允許您確保特定端點不會過載,或者我們不會超出任何外部服務(wù)的商定SLA。 Throttling組件也可以用于同步消息。

所有組件都支持群集。

實施細節(jié)

OpenHub需要保存的所有內(nèi)容都保存在數(shù)據(jù)庫中–類型沒有限制。 無需調(diào)整JMS / MQ系統(tǒng)即可支持異步消息傳遞。 然后,您可以使用自己喜歡的任何工具進行日常工作- 數(shù)據(jù)模型簡單,清晰并且文檔齊全。 數(shù)據(jù)庫工具比JMS / MQ系統(tǒng)更多。

有時我們聽說在這種情況下使用數(shù)據(jù)庫是一種反模式,從性能的角度來看,在某些情況下它可能是瓶頸。 這取決于實際項目中的集成用例,但在處理數(shù)十萬個并發(fā)請求的實際項目中,我們?nèi)匀粵]有嚴格的性能限制。 我們準(zhǔn)備添加JMS / MQ實現(xiàn),但是到目前為止還不需要。

不必僅通過傳入請求啟動異步過程–您還可以使用調(diào)度作業(yè)在需要的任何時候啟動路由,然后將其留給OpenHub框架。

所有示例都可以在GitHub的參考實現(xiàn)中找到–參見https://github.com/OpenWiseSolutions/openhub-ri

翻譯自: https://www.javacodegeeks.com/2017/10/asynchronous-communication-made-openhub-framework.html

natty的異步通信框架

總結(jié)

以上是生活随笔為你收集整理的natty的异步通信框架_OpenHub框架进行的异步通信的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。