Dubbo 3.0 预览版解读,6到飞起~
Dubbo 自 2011 年 10 月 27 日開源后,已被許多非阿里系的公司使用,其中既有當當網、網易考拉等互聯網公司,也不乏中國人壽、青島海爾等大型傳統企業。
自去年 12 月開始,Dubbo 3.0?便已正式進入開發階段《重大利好,Dubbo 3.0要來了》,并備受社區和廣大 Dubbo 用戶的關注,本文將為您詳細解讀?3.0 預覽版的新特性和新功能。
下面先解答一下兩個有意思的與 Dubbo 相關的疑問。
-
為什么 Dubbo 一開源就是 2.0 版本?之前是否存在 1.0 版本?
筆者曾做過 Dubbo 協議的適配兼容,Dubbo 確實存在過 1.x 版本,而且從協議設計和模型設計上都與 2.0 的開源版本協議是完全不一樣的。下圖是關于 Dubbo 的發展路徑:
-
阿里內部正在使用 Dubbo 開源版本嗎?
是的,非常確定,當前開源版本的 Dubbo 在阿里巴巴被廣泛使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是兼容了 Dubbo 使用方式和 Remoting 協議。當然,我們現在正在做 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來做內核的統一。所以,Dubbo 是得到大規模線上系統驗證的分布式服務框架,這一點毋容置疑。
Dubbo 3.0 預覽版的要點
Dubbo 3.0 在設計和功能上的新增支持和改進,主要是以下四方面:
-
Dubbo 內核之 Filter 鏈的異步化
這里要指出的是,3.0 中規劃的異步去阻塞和 2.7 中提供的異步是兩個層面的特性。2.7 中的異步是建立在傳統 RPC 中 request – response 會話模型上的,而 3.0 中的異步將會從通訊協議層面由下向上構建,關注的是跨進程、全鏈路的異步問題。通過底層協議開始支持 streaming 方式,不單單可以支持多種會話模型,還可以在協議層面開始支持反壓、限流等特性,使得整個分布式體系更具有彈性。綜上所述,2.7 關注的異步更局限在點對點的異步(一個 consumer 調用一個 provider),3.0 關注的異步化,寬度上則關注整個調用鏈上的異步,高度上則向上又可以包裝成 Rx 的編程模型。有趣的是,Spring 5.0 發布了對 Flux 的支持,隨后開始解決跨進程的異步問題。
-
功能方面是 reactive(響應式)支持
最近幾年,?reactive programming這個詞語的熱度迅速提升,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現Reactive Stream 的 rx 接口,從而能讓用戶享受到RP帶來的響應性提升,甚至面向 RP 的架構升級。當然,我們希望 reactive 不單單能夠帶來事件(event)驅動的應用集成方式的升級,也希望在 Load Balance(選擇最優的服務節點),fault tolerance(限流降級時最好做到自適應)等方面發揮其積極價值。
-
云原生/ ServiceMesh 方向的探索
我們定下的策略是進入 Envoy 社區來實現 Dubbo 融入 mesh 的理念思想,目前 Dubbo 協議已經被 Envoy 支持。當然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務治理方面的工作需要繼續在數據面建設,另外,控制面板的建設在社區也沒有提上日程。
-
融合并支持阿里內部
Dubbo 3.0 定下了內外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產系統中部署,相信通過大流量、大規模的考驗,Dubbo 用戶可以獲得一個性能、穩定、服務治理實踐各方面俱佳的核心,用戶在生產系統中采用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。
Filter 鏈的異步化設計
Dubbo 最強大的一處設計是其在 Filter 鏈上的抽象設計,通過其擴展機制的開放性支持,用戶可以對 Dubbo 做功能增強,并允許各個擴展點被定制來是否保留。
Dubbo?的?Filter?定義如下:
按照“調用一個遠程服務的方法就像調用本地的方法一樣”這種說法,這個直接返回 Result 響應的方式是非常好的,用起來是簡單直接,問題是時代變換到了需要關注體驗,需要走 Reactive 響應式的時代,也回到基本點:invoke一個 invocation 需要經過網絡在不同的進程處理,天然就是異步的過程,也就是發送請求(invocation)與接收響應(Result)本身是兩個不同的事件,是需要兩個過程方法來在 Filter 鏈處理。那么如何改造這個關鍵的 SPI 呢?有兩種方案:
第一種,把 invoke 的返回值改成 CompletableFuture, 好處是一目了然,Result 不在建議同步獲取了;但基礎接口的簽名一改會導致代碼改造量巨大,同時也會讓原有的 SPI 擴展不在支持。
第二種,Result 接口直接繼承 CompletationStage,是代表了響應的異步計算。這樣能進避免第一種的劣勢。所以,3.0.0 Preview 版本對內部調用鏈路實現做了一次重構:基于 CompletableFuture 實現了框架內部的全異步調用,而在外圍編程上,同時支持同步、異步調用模式。
值得注意的是,此次重構僅限于框架內部實現,對使用方沒有任何影響即接口上保持完全兼容。要了解 Dubbo 異步 API 如何使用,請參考《如何基于 Dubbo 實現全異步的調用鏈》(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html),這篇文章將著重對實現思路和原理做一些簡單介紹。此次重構的要點有:
-
框架內部采用全異步調用模型,僅在外圍做同步、異步適配;
-
內置Filter鏈支持異步回調;
基本工作流程
首先我們來看一個通用的跨網絡異步調用的線程模型:
通信框架異步發送請求消息,請求消息發送成功后,返回代表業務結果的 CompletableFuture 給業務線程。之后對于 Future 的處理,根據調用類型會有所區別:
對于同步請求(如上圖體現的場景),業務線程會調用 future.get 同步阻塞等待結果,當收到網絡層返回的業務結果后,future.get 返回并最終將結果傳遞給調用發起方。
對于異步請求,業務線程不會調用 future.get,而是將 future 保存在調用上下文或者直接返回給調用者,同時會為 future 注冊回調監聽器,以便當真正的業務結果從通信層返回時監聽器可以對結果做進一步的處理。
接下來具體看一下一次異步 Dubbo RPC 請求的調用流程:
消費方面向 Proxy 代理編程,發出調用請求,請求經過 Filter 鏈向下傳遞。
Invoker.invoke() 將請求異步轉發給網絡層,并收到代表返回結果的 Future。
Future 被包裝到 Result,轉而由 Result 代表這次遠程調用的結果(由于 Result 的異步屬性,此時它可能并不包含真正的返回值)。
Result 繼續沿著調用鏈返回,在經過每個 Filter 時,Filter 可選擇注冊 Listener 監聽器,以便在業務結果返回時執行結果預處理。
最終 Proxy 調用 result.recreate() 將結果返回給消費者:
-
如果方法是 CompletableFuture 簽名,則返回 Future;
-
如果方法是普通同步簽名,則返回對象默認值,Future 可通過 RpcContext 拿到;
? 6.?調用方在拿到代表異步業務結果的 Future 后,可選擇注冊回調監聽器,以監聽真正的業務結果返回。
同步調用和異步調用基本上是一致的,并且也是走的回調模式,只是在鏈路返回之前做了一次阻塞 get 調用,以確保在收到實際結果時再返回。Filter 在注冊 Listener 時由于 Future 已處于 complete 狀態,因此會同時觸發回調 onResponse()/onError()。
關于流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 調用中代表返回結果,在 3.0 中 Result 自身增加了代表狀態的接口,類似 Future 現在 Result 可以代表一次未完成的調用。
要讓 Result 具備代表異步返回結果的能力,有兩中方式來實現:
1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 接口。
public interface Result extends CompletionStage {}2. 讓 Result 實例持有 Future 實例,與 1 的區別即是設計中選用“繼承”還是“組合”。
public class AsyncRpcResult implements Result {private CompletableFuture<RpcResult> resultFuture;}同時,為了讓 Result 更直觀的體現其異步結果的特性,也為了方便面向 Result 接口編程,我們可以考慮為Result增加一些異步接口:
public interface Result extends Serializable {Result thenApplyWithContext(Function<Result, Result> fn);<U> CompletableFuture<U> thenApply(Function<Result,???extends U> fn);Result get() throws InterruptedException, ExecutionException;}Filter SPI
Filter 是 Dubbo 預置的攔截器擴展 SPI,用來做請求的預處理、結果的后處理,框架本身內置了一些攔截器實現,而從用戶層面,我相信這個 SPI 也應該是被擴展最多的一個。在 3.0 版本中,Filter 回歸單一職責的設計模式,將回調接口單獨提取到 Listener 中。
@SPI public interface Filter {Result invoke(Invoker<?>?invoker,?Invocation invocation) throws RpcException;interface Listener {void onResponse(Result result, Invoker<?>?invoker,?Invocation invocation);void onError(Throwable t, Invoker<?>?invoker,?Invocation invocation);} }以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞調用請求。
同時,增加了一個新的回調接口 Listener,每個 Filter 實現可以定義自己的 Listenr 回調器,從而實現對返回結果的異步監聽,參考以下是為 MonitorFilter 增加的 Listener 回調實現:
class MonitorListener implements Listener {@Overridepublic void onResponse(Result result, Invoker<?>?invoker,?Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(),Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);getConcurrent(invoker, invocation).decrementAndGet(); // count down}}@Overridepublic void onError(Throwable t, Invoker<?>?invoker,?Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);getConcurrent(invoker, invocation).decrementAndGet(); // count down}} }泛化調用異步接口支持
為了更直觀的做異步調用,泛化接口新增了?
CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)接口:
public interface GenericService {/*** Generic invocation** @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is* required, e.g. findPerson(java.lang.String)* @param parameterTypes Parameter types* @param args Arguments* @return invocation return value* @throws GenericException potential exception thrown from the invocation*/Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {Object object = $invoke(method, parameterTypes, args);if (object instanceof CompletableFuture) {return (CompletableFuture<Object>) object;}return CompletableFuture.completedFuture(object);} }這樣,當我們想做異步調用時,就可以直接這樣使用:
CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);更具體用例請參見《泛化調用示例》
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call
異步與性能
組要注意的是,框架內部的異步實現本身并不能提高單次調用的性能,相反,由于線程切換和回調邏輯的存在,異步反而可能會導致單次調用性能的下降,但是異步帶來的優勢是能減少對資源的占用,提升整個系統的并發程度和吞吐量,這點對于 RPC 這種需要處理網絡延遲的場景非常適用。更多關于異步化設計的好處,請參考其他異步化原理介紹相關文章。
響應式編程支持
響應式編程讓開發者更方便地編寫高性能的異步代碼,很可惜,在之前很長一段時間里,dubbo 并不支持響應式編程,簡單來說,dubbo 不支持在 rpc 調用時使用 Mono/Flux 這種流對象(reative-stream 里流的概念),給用戶使用帶來了不便。(關于響應式編程更詳細的信息請參見這里:http://reactivex.io/)。
RSocket 是一個開源的支持 reactive-stream 語義的網絡通信協議,他將 reative 語義的復雜邏輯封裝起來了,使得上層可以方便實現網絡程序。(RSocket詳細資料請參見這里:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT ?版本里基于 RSocket 對響應式編程進行了簡單的支持,用戶可以在請求參數和返回值里使用 Mono 和 Flux 類型的對象。下面我們給出使用范例,(范例源碼可以在這里獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定義接口如下:
public interface DemoService {Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2); }然后實現該 demo 接口:
public class DemoServiceImpl implements DemoService {@Overridepublic Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {return m1.zipWith(m2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});}@Overridepublic Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {return f1.zipWith(f2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});} }然后配置并啟動服務端,注意協議名字填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo?http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!--?provider's?application?name,?used?for?tracing?dependency?relationship?--> <dubbo:application name="demo-provider"/> <!--?use?registry?center?to?export?service?--> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!--?use?dubbo?protocol?to?export?service?on?port?20880?--> <dubbo:protocol name="rsocket" port="20890"/> <!--?service?implementation,?as?same?as?regular?local?bean?--> <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/> <!--?declare?the?service?interface?to?be?exported?--> <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/> </beans> public class RsocketProvider {public static void main(String[] args) throws Exception {new EmbeddedZooKeeper(2181, false).start();ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});context.start();System.in.read(); // press any key to exit} }然后配置并啟動消費者消費者如下, 注意協議名填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo?http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!--?consumer's?application?name,?used?for?tracing?dependency?relationship?(not?a?matching?criterion), don't set it same as provider --> <dubbo:application name="demo-consumer"/> <!--?use?registry?center?to?discover?service?--> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!--?generate?proxy?for?the?remote?service,?then?demoService?can?be?used?in?the?same?way?as?the local regular interface --> <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/> </beans> public class RsocketConsumer {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});context.start();DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxywhile (true) {try {Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));monoResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).block();Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));fluxResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).blockLast();} catch (Throwable throwable) {throwable.printStackTrace();}}} }可以看到配置上除了協議名使用 rsocket 以外其他并沒有特殊之處。
實現原理
以前用戶并不能在參數或者返回值里使用 Mono/Flux 這種流對象(reative-stream 里的流的概念)。因為流對象自帶異步屬性,當業務把流對象作為參數或者返回值傳遞給框架之后,框架并不能將流對象正確的進行序列化。
dubbo 基于 RSocket 實現了 reative 支持。RSocket 將 reative 語義的復雜邏輯封裝起來了,給上層提供了簡潔的抽象如下:
/** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads);我們只需要在此基礎上添加我們的 rpc 邏輯即可。
-
從客戶端視角看,框架建立連接之后,只需要將請求信息編碼到 Payload 里,然后通過 requestStream 方法即可向服務端發起請求。
-
從服務端視角看,rsocket 收到請求之后,會調用我們實現的 requestStream 方法,我們從 Payload 里解碼得到請求信息之后,調用業務方法,然后拿到 Flux 類型的返回值即可。
-
需要注意的是業務返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我們需要通過 map operator 攔截業務數據,將 BizDO 編碼為 Payload 才可以遞交給我 RSocket。而 RSocket 會負責數據的傳輸和 reative 語義的實現。
經過上面的分析,我們知道了 Dubbo 如何基于 RSocket 實現了響應式編程的支持。有了響應式編程支持,業務可以更加方便的實現異步邏輯。
小結
當前 Dubbo 3.0 將提供具備當代特性(如響應性編程)的相關支持,同時汲取阿里內部 HSF 的設計長處來實現兩者的融合,當前預覽版的很多地方還在探討中,希望大家能夠積極反饋,我們都會虛心學習并參考。
Dubbo 3.0 sample @GitHub:
https://github.com/apache/incubator-dubbo-samples/tree/3.x
覃柳杰(花名:未宇) Github ID: qinliujie,阿里巴巴中間件開發,Dubbo 開源項目 PMC,參與 HSF2.2和 Dubbo3.0 的設計和開發。
呂仁琦(花名:空冥) Github ID: jefflv,阿里巴巴中間件開發,Dubbo 開源項目 commiter,參與了內部 HSF2.0 的設計和開發。
劉軍(花名:陸龜) 阿里巴巴中間件高級開發工程師,Apache Dubbo (Incubating)PPMC,深度參與 Dubbo 項目開發,主要貢獻者之一。
謝育能(花名:思邪)阿里巴巴中間件開發,Dubbo 3.0 開源項目的響應式模塊的負責人,參與了內部 HSF2.2 的設計和開發。
總結
以上是生活随笔為你收集整理的Dubbo 3.0 预览版解读,6到飞起~的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 欠阿里云一分钱,会是什么样的后果。。。
- 下一篇: 你的接口能承受高并发吗?