Dubbo的异步调用
文章目錄
- dubbo異步調用
- 2.6版本中dubbo異步調用的實現
- 2.7版本dubbo 客戶端Consumer異步調用
- 使用CompletableFuture簽名的接口
- 1、調用遠程服務:
- 2、 使用RpcContext
- 2.7 版本 服務提供者Provider異步執行
- 1、定義CompletableFuture簽名的接口
- 2、使用AsyncContext
- springboot 項目集成異步調用
在微服務環境中,往往一個接口,是經過多個服務間的接口調用,最后封裝成一個接口中返回。如果每個等待每個接口串并執行結果,會比較耗時,此時我們就需要異步處理。
dubbo異步調用
dubbo的異步調用,基于NIO的非阻塞實現并行調用,客戶端不需要啟動多線程即可完成并行調用多個遠程服務,相對多線程開銷較小。
在userThread用戶線程請求網關服務,調用某個方法,執行ioThread,再去調用服務端server,在服務端返回數據之前,將Future對象,復制到RpcContext中,在通過getFuture獲取Future對象。
在服務端返回數據后,通知Future對象,通過Future的get方法,獲取返回結果。
2.6版本中dubbo異步調用的實現
在客戶端的consumer.xml 中配置如下:
<dubbo:reference id="fooService" interface="com.alibaba.dubbo.samples.async.api.FooService"><dubbo:method name="findFoo" async="true"/> </dubbo:reference><dubbo:reference id="barService" interface="com.alibaba.dubbo.samples.async.api.BarService"><dubbo:method name="findBar" async="true"/> </dubbo:reference>調用代碼。
// 此調用會立即返回null fooService.findFoo(fooId) // 拿到調用的Future引用, 當結果返回后,會被通知和設置到此Future。 Future<Foo> fooFuture = RpcContext.getContext().getFuture();// 此調用會立即返回null barService.findBar(barId) // 拿到調用的Future引用, 當結果返回后,會被通知和設置到此Future。 Future<Bar> fooFuture = RpcContext.getContext().getFuture();// 此時findFoo 和 findBar 的請求同時在執行,客戶端不需要啟動多線程來支持并行,而是借助NIO的非阻塞完成。// 如果foo 已返回,直接拿到返回值,否則當前線程wait住, 等待foo返回后,線程會被notify喚醒。 Foo foo = fooFuture.get(). // 同理等待bar返回。 Bar bar = barFuture.get(); // 如果foo需要5秒返回,bar需要6秒返回,實際只需要6秒,即可獲取到foo和bar,進行接下來的處理。一些特殊場景下,客戶端為了盡快調用返回值,可以設置是否等待消息發出:
- sent=“true” 等待消息發出,消息發送失敗將拋出異常;
- sent=“false” 不等待消息發出,將消息放入 IO 隊列,即刻返回。
默認為fase。配置方式如下:
如果你只是想異步,完全忽略返回值,可以配置 return="false",以減少 Future 對象的創建和管理成本:
<dubbo:method name="goodbye" async="true" return="false"/>此時,RpcContext.getContext().getFuture()將返回null。
2.7版本dubbo 客戶端Consumer異步調用
從v2.7.0開始,Dubbo的所有異步編程接口開始以CompletableFuture為基礎。
使用CompletableFuture簽名的接口
需要服務提供者事先定義CompletableFuture簽名的服務,具體參見服務端異步執行接口定義:
public interface AsyncService {CompletableFuture<String> sayHello(String name); }注意接口的返回類型是CompletableFuture<String>。
XML引用服務:
<dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.As1、調用遠程服務:
// 調用直接返回CompletableFuture CompletableFuture<String> future = asyncService.sayHello("async call request"); // 增加回調 future.whenComplete((v, t) -> {if (t != null) {t.printStackTrace();} else {System.out.println("Response: " + v);} }); // 早于結果輸出 System.out.println("Executed before response return.");### Springboot2、 使用RpcContext
在 consumer.xml 中配置:
<dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService"><dubbo:method name="sayHello" async="true" /> </dubbo:reference>調用代碼:
// 此調用會立即返回null asyncService.sayHello("world"); // 拿到調用的Future引用,當結果返回后,會被通知和設置到此Future CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture(); // 為Future添加回調 helloFuture.whenComplete((retValue, exception) -> {if (exception == null) {System.out.println(retValue);} else {exception.printStackTrace();} });或者,你也可以這樣做異步調用:
CompletableFuture<String> future = RpcContext.getContext().asyncCall(() -> {asyncService.sayHello("oneway call request1");} );future.get();2.7 版本 服務提供者Provider異步執行
Provier端異步執行將阻塞的業務從Dubbo內部線程池切換到業務自定義線程,避免Dubbo線程池過度占用,有助于避免不同服務間的互相影響。異步執行無益于節省資源或提升RPC響應性能,因為如果業務執行需要阻塞,則始終還是要有線程來負責執行。
注意:Provider端異步執行和Consumer端異步調用是相互獨立的,你可以任意正交組合兩端配置
- Consumer同步 - Provider同步
- Consumer異步 - Provider同步
- Consumer同步 - Provider異步
- Consumer異步 - Provider異步
1、定義CompletableFuture簽名的接口
服務接口定義:
public interface AsyncService {CompletableFuture<String> sayHello(String name); }服務實現:
public class AsyncServiceImpl implements AsyncService {@Overridepublic CompletableFuture<String> sayHello(String name) {RpcContext savedContext = RpcContext.getContext();// 建議為supplyAsync提供自定義線程池,避免使用JDK公用線程池return CompletableFuture.supplyAsync(() -> {System.out.println(savedContext.getAttachment("consumer-key1"));try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "async response from provider.";});} }通過return CompletableFuture.supplyAsync(),業務執行已從Dubbo線程切換到業務線程,避免了對Dubbo線程池的阻塞。
2、使用AsyncContext
Dubbo提供了一個類似Serverlet 3.0的異步接口AsyncContext,在沒有CompletableFuture簽名接口的情況下,也可以實現Provider端的異步執行。
服務接口定義:
public interface AsyncService {String sayHello(String name); }服務暴露,和普通服務完全一致:
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/> <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>服務實現:
public class AsyncServiceImpl implements AsyncService {public String sayHello(String name) {final AsyncContext asyncContext = RpcContext.startAsync();new Thread(() -> {// 如果要使用上下文,則必須要放在第一句執行asyncContext.signalContextSwitch();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}// 寫回響應asyncContext.write("Hello " + name + ", response from provider.");}).start();return null;} }springboot 項目集成異步調用
1、啟動來添加注解@EnableAsync
@SpringBootApplication(scanBasePackages = {"com.stylefeng.guns"}) @EnableDubboConfiguration @EnableAsync public class GatewayApplication {public static void main(String[] args) {SpringApplication.run(GatewayApplication.class, args);} }2、 客戶端注解@Reference中添加屬性async = true
@Reference(interfaceClass = FilmAsyncServiceApi.class, validation = "1.0", async = true)private FilmAsyncServiceApi filmAsyncServiceApi;3、 接口調用
/*** 影片詳情接口* @return*/@RequestMapping(value = "films/{searchParam}", method = RequestMethod.GET)public ResponseVO films(@PathVariable String searchParam,int searchType) throws ExecutionException, InterruptedException {// 根據searchType,判斷查詢類型FilmDetailVO filmDetail = filmServiceApi.getFilmDetail(searchType, searchParam);// 查詢影片的詳細信息 -> Dubbo 的異步獲取// 獲取影片描述信息if (filmDetail == null ) {return ResponseVO.serviceFail("沒有可查詢的影片");} else if (filmDetail.getFilmId() == null || filmDetail.getFilmId().trim().length() == 0) {return ResponseVO.serviceFail("沒有可查詢的影片");}String filmId = filmDetail.getFilmId();filmServiceApi.getFilmDesc(filmId);// 拿到調用的Future引用,當結果返回后,會被通知和設置到此future。Future<FilmDescVO> filmDescVOFuture = RpcContext.getContext().getFuture();// 獲取圖片信息filmServiceApi.getImags(filmId);Future<ImgVO> imgVOFuture = RpcContext.getContext().getFuture();// 獲取導演信息filmServiceApi.getDectInfo(filmId);Future<ActorVO> actorVOFuture = RpcContext.getContext().getFuture();// 獲取演員信息filmServiceApi.getActors(filmId);Future<List<ActorVO>> actorsFuture = RpcContext.getContext().getFuture();FilmInfoVO filmInfoVO = new FilmInfoVO();BeanUtils.copyProperties(filmDetail, filmInfoVO);// 組織Actor屬性InfoRequestVO infoRequestVO = new InfoRequestVO();ActorRequestVO actorRequestVO = new ActorRequestVO();actorRequestVO.setActors(actorsFuture.get());actorRequestVO.setDirector(actorVOFuture.get());infoRequestVO.setActors(actorsFuture.get());infoRequestVO.setBiography(filmDescVOFuture.get().getBiography());infoRequestVO.setFilmId(filmId);infoRequestVO.setImgVO(imgVOFuture.get());filmInfoVO.setInfo04(infoRequestVO);return ResponseVO.success(IMG_PRE, filmInfoVO);}在每個服務調用后,都會拿到對應Future,拿到值之后,設置到Future中,通過get方法獲取。
總結
以上是生活随笔為你收集整理的Dubbo的异步调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Group Normalization(
- 下一篇: C 异步调用