干货 | 携程基于Quasar协程的NIO实践
作者簡(jiǎn)介
Ryan,攜程Java開(kāi)發(fā)工程師,對(duì)高并發(fā)、網(wǎng)絡(luò)編程等領(lǐng)域有濃厚興趣。
IO密集型系統(tǒng)在高并發(fā)場(chǎng)景下,會(huì)有大量線程處于阻塞狀態(tài),性能低下,JAVA上成熟的非阻塞IO(NIO)技術(shù)可解決該問(wèn)題。目前Java項(xiàng)目對(duì)接NIO的方式主要依靠回調(diào),代碼復(fù)雜度高,降低了代碼可讀性與可維護(hù)性。近年來(lái)Golang、Kotlin等語(yǔ)言的協(xié)程(Coroutine)能達(dá)到高性能與可讀性的兼顧。
本文利用開(kāi)源的Quasar框架提供的協(xié)程對(duì)系統(tǒng)進(jìn)行NIO改造,解決以下兩個(gè)問(wèn)題:
1)提升單機(jī)任務(wù)的吞吐量,保證業(yè)務(wù)請(qǐng)求突增時(shí)系統(tǒng)的可伸縮性。
2)使用更輕量的協(xié)程同步等待IO,替代處理NIO常用的異步回調(diào)。
一、Java異步編程與非阻塞IO
本文改造的系統(tǒng)處理來(lái)自前臺(tái)的任務(wù),通過(guò)HTTP請(qǐng)求對(duì)端服務(wù),還通過(guò)RPC調(diào)用內(nèi)部服務(wù)。當(dāng)業(yè)務(wù)高峰時(shí),系統(tǒng)會(huì)遇到瞬時(shí)并發(fā)任務(wù)量數(shù)十倍激增的情況,系統(tǒng)的線程數(shù)量急劇增加造成性能下降。為此,不得不擴(kuò)容以保證業(yè)務(wù)高峰時(shí)期的性能。
??
? ? ? ? ? ? ? ? ? ? ? ? ?
基于epoll的NIO框架Netty在一些框架級(jí)別的應(yīng)用中已經(jīng)得到了廣泛使用,但在快速迭代的業(yè)務(wù)系統(tǒng)中的應(yīng)用依然有一定的局限性。NIO 消除了線程的同步阻塞,意味著只能異步處理IO的結(jié)果,這與業(yè)務(wù)開(kāi)發(fā)者順序化的思維模式有一定差異。當(dāng)業(yè)務(wù)邏輯復(fù)雜以及出現(xiàn)多次遠(yuǎn)程調(diào)用的情況下,多級(jí)回調(diào)難以實(shí)現(xiàn)和維護(hù)。
1.1?Java中的異步工具
Java項(xiàng)目大多使用JDK8,除線程外可以獲得的異步的編程支持包括CompletableFuture,以及開(kāi)源的RxJava、Vert.x等反應(yīng)式編程框架等。這些工具使用了基于響應(yīng)式編程的鏈?zhǔn)秸{(diào)用逐級(jí)傳遞事件,未從根本解決回調(diào)問(wèn)題。
如下為將一段簡(jiǎn)單的邏輯判斷使用CompletableFuture進(jìn)行異步改造后的對(duì)比。原始版本使用getA方法獲得第一步的請(qǐng)求結(jié)果,根據(jù)其相應(yīng)選擇使用getB1還是getB2獲取第二步的響應(yīng)作為結(jié)果。
HttpResponse?a?=?getA();HttpResponse b ; if(a.getBody().equals("1")){b=getB1(); } else{b=getB2(); }String ans=b.getBody();首先將三個(gè)獲取響應(yīng)的方法改為異步。此處假設(shè)getB1與getB2內(nèi)部已經(jīng)具有復(fù)雜邏輯,且不屬于同一領(lǐng)域,不適合合并為一個(gè)方法。
private CompletableFuture<HttpResponse> getA(); private CompletableFuture<HttpResponse> getB1(); private CompletableFuture<HttpResponse> getB2();然后使用CompletableFuture的鏈?zhǔn)秸{(diào)用,將兩個(gè)步驟組合起來(lái):
String ans = getA().thenCompose(a -> {if (a.getBody().equals("1")) {return getB1();} else {return getB2();}}).get().getBody();使用CompletableFuture的鏈?zhǔn)交卣{(diào)后,代碼變得不友好。RxJava等框架同樣具有這個(gè)問(wèn)題。這類反應(yīng)式的編程工具更適合于數(shù)據(jù)流的傳遞。對(duì)于if/else、switch/case,乃至while/for、break/continue這類過(guò)程控制語(yǔ)句,實(shí)現(xiàn)與維護(hù)的難度都很大。業(yè)務(wù)系統(tǒng)需要類似于線程的同步等待,同時(shí)具有低資源消耗的編碼工具,配合 NIO使用。當(dāng)時(shí)使用NIO時(shí),由于可以不占用線程,可以使用一種資源消耗更小的協(xié)程來(lái)等待。
1.2?協(xié)程
協(xié)程是一種進(jìn)程自身來(lái)調(diào)度任務(wù)的調(diào)度模式。協(xié)程與線程不同之處在于,線程由內(nèi)核調(diào)度,而協(xié)程的調(diào)度是進(jìn)程自身完成的。協(xié)程只是一種抽象,最終的執(zhí)行者是線程,每個(gè)線程只能同時(shí)執(zhí)行一個(gè)協(xié)程,但大量的協(xié)程可以只擁有少量幾個(gè)線程執(zhí)行者,協(xié)程的調(diào)度器負(fù)責(zé)決定當(dāng)前線程在執(zhí)行那個(gè)協(xié)程,其余協(xié)程處于休眠并被調(diào)度器保存在內(nèi)存中。
和線程類似,協(xié)程掛起時(shí)需要記錄棧信息,以及方法執(zhí)行的位置,這些信息會(huì)被協(xié)程調(diào)度器保存。協(xié)程從掛起到重新被執(zhí)行不需要執(zhí)行重量級(jí)的內(nèi)核調(diào)用,而是直接將狀態(tài)信息還原到執(zhí)行線程的棧,高并發(fā)場(chǎng)景下,協(xié)程極大地避免了切換線程的開(kāi)銷(xiāo)。下圖展示了協(xié)程調(diào)度器內(nèi)部任務(wù)的流轉(zhuǎn)。
協(xié)程中調(diào)用的方法是可以掛起的。不同于線程的阻塞會(huì)使線程休眠,協(xié)程在等待異步任務(wù)的結(jié)果時(shí),會(huì)通知調(diào)度器將自己放入掛起隊(duì)列,釋放占用的線程以處理其他的協(xié)程。異步任務(wù)完畢后,通過(guò)回調(diào)將異步結(jié)果告知協(xié)程,并通知調(diào)度器將協(xié)程重新加入就緒隊(duì)列執(zhí)行。
1.3?Quasar任務(wù)調(diào)度原理
Quasar(https://github.com/puniverse/quasar)是一個(gè)開(kāi)源的Java協(xié)程框架,通過(guò)利用Java instrument技術(shù)對(duì)字節(jié)碼進(jìn)行修改,使方法掛起前后可以保存和恢復(fù)JVM棧幀,方法內(nèi)部已執(zhí)行到的字節(jié)碼位置也通過(guò)增加狀態(tài)機(jī)的方式記錄,在下次恢復(fù)執(zhí)行可直接跳轉(zhuǎn)至最新位置。以如下方法為例,該方法分為兩步,第一步為initial初始化,第二部為通過(guò)NIO獲取網(wǎng)絡(luò)響應(yīng)。
public String instrumentDemo(){initial();String ans = getFromNIO();return ans; }Quasar會(huì)在initial前增加一個(gè)flag字段,表明當(dāng)前方法執(zhí)行的位置。第一次執(zhí)行方法時(shí),檢查到flag為0,修改flag為1并繼續(xù)往下執(zhí)行initial方法。執(zhí)行g(shù)etFromNIO方法前插入字節(jié)碼指令將棧幀中的數(shù)據(jù)全部保存在一個(gè)Quasar自定義的棧結(jié)構(gòu)中,在執(zhí)行g(shù)etFromNIO后,掛起協(xié)程,讓出線程資源。直至NIO異步完成后,協(xié)程調(diào)度器將第二次執(zhí)行該方法,檢測(cè)到flag為1,將會(huì)調(diào)用jump指令跳轉(zhuǎn)到returnans語(yǔ)句前,并將保存的棧結(jié)構(gòu)還原到當(dāng)前棧中,最后調(diào)用人return ans語(yǔ)句,方法執(zhí)行完畢。
二、系統(tǒng)異步IO改造
在項(xiàng)目中添加Quasar依賴后,可以使用Fiber類新建協(xié)程。建立的方法與線程類似。
new Fiber(()->{//方法體 }).start();2.1?整合Netty與Quasar
系統(tǒng)使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),該框架提供了異步回調(diào)和CompletableFuture兩種對(duì)響應(yīng)的異步處理方式。
CompletableFuture自JDK8推出,與之前的Future類最大的不同在于,提供了異步任務(wù)跨線程的通知和控制機(jī)制。即,任務(wù)的等待者可以在CompletableFuture注冊(cè)任務(wù)完成或異常時(shí)的回調(diào),而執(zhí)行者也可以通過(guò)它通知等待者。Quaasr框架對(duì)它也做了支持,提供了API用于在協(xié)程中等待CompletableFuture的結(jié)果。調(diào)用后,協(xié)程將掛起,直至future狀態(tài)為已完成。
AsyncCompletionStage.get(future)通過(guò)CompletableFuture作為通知中介,我們可以將AsyncHttpClient與Quasar做整合,掛起協(xié)程等待IO結(jié)果。
//創(chuàng)建HttpClient AsyncHttpClient httpClient = Dsl.asyncHttpClient(); //創(chuàng)建請(qǐng)求 Request request = createRequest(); //將網(wǎng)絡(luò)請(qǐng)求交給HttpClient執(zhí)行 CompletableFuture<Response> future = httpClient.executeRequest(request) .toCompletableFuture(); //通過(guò)Quasar掛起協(xié)程 Response response = AsyncCompletionStage.get(future); //獲取網(wǎng)絡(luò)結(jié)果后,通過(guò)future傳遞response并喚醒協(xié)程重新執(zhí)行 deal(response);過(guò)程可由下圖表示。
?
Quasar框架AsyncCompletionStage.get內(nèi)部完成的工作相當(dāng)于,在HttpClient返回的future上注冊(cè)回調(diào),回調(diào)的內(nèi)容是“IO操作完成后通知調(diào)度器喚醒協(xié)程”,這樣將NIO異步回調(diào)全部操作封裝在協(xié)程調(diào)度器中,用戶代碼看起來(lái)是同步等待的形式,避免了自行實(shí)現(xiàn)回調(diào)處理帶來(lái)的繁瑣,解決了前文所述的回調(diào)地獄。
2.2?聲明掛起方法
Quasar需要織入字節(jié)碼接管掛起方法的調(diào)度,在項(xiàng)目主pom下添加quasar-maven-plugin插件,該插件將在編譯后的class文件中修改字節(jié)碼。
<plugin><groupId>com.vlkan</groupId><artifactId>quasar-maven-plugin</artifactId><version>0.7.9</version><executions><execution><goals><goal>instrument</goal></goals></execution></executions> </plugin>Quasar通過(guò)識(shí)別方法是否拋出了該框架定義的SuspendExecution異常決定是否修改字節(jié)碼。Quasar框架在AsyncCompletionStage.get方法上聲明了SuspendExceution異常,該異常是捕獲異常,但僅作為識(shí)別掛起方法的聲明,在運(yùn)行時(shí)不會(huì)實(shí)際拋出。使用者必須逐層拋出該異常直至新建協(xié)程的一層。當(dāng)方法內(nèi)部存在try/catch語(yǔ)句時(shí),也必須拋出該異常。
public void startFiber() throws ExecutionException, InterruptedException {Fiber<Void> fiber = new Fiber<Void>(() -> {//不用繼續(xù)拋出異常Response response = waitNextLayer1();deal(response);}).start(); }private Response waitNextLayer1() throws SuspendExecution {return waitNextLayer2(); }private Response waitNextLayer2() throws SuspendExecution {CompletableFuture<Response> future = httpClient.executeRequest(request) .toCompletableFuture();try {// Quasar框架工具類拋出SuspendExecutionreturn AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }2.3?異步RPC調(diào)用
目前主流的RPC框架都基于NIO實(shí)現(xiàn),支持異步回調(diào),有的RPC框架已經(jīng)直接提供了返回CompletableFuture或ListenableFuture(Guava工具類提供)的異步接口,通過(guò)使用ComplatableFuture,可以按前文類似的方法將Quasar與RPC框架結(jié)合起來(lái)。當(dāng)RPC框架沒(méi)有該返回類型時(shí),一般會(huì)提供如下類似的帶泛型的異步回調(diào)接口:
interface Callback<TResponse> {void callback(TResponse TResponse, Exception e); }這種情況,可以使用者自己創(chuàng)建ComplatableFuture,在回調(diào)中設(shè)置其狀態(tài),并調(diào)用AsyncCompletionStage.get等待這個(gè)future。
CompletableFuture<Response> future=new CompletableFuture<>(); //調(diào)用hello接口的異步API new RpcClient().helloAsync(request, new Callback<Response>() {public void callback(Response response, Exception e) {if (e == null) future.complete(response);else future.completeExceptionally(e);} }); //在此處調(diào)用Quasar的API,掛起直至RPC調(diào)用完成 Response response = AsyncCompletionStage.get(future);上述代碼依然具有異步回調(diào)不直觀的缺點(diǎn),通過(guò)JDK8的函數(shù)式接口可以實(shí)現(xiàn)一個(gè)通用的調(diào)用模板,將異步回調(diào)變?yōu)橥降却男问健?/p> @FunctionalInterface private interface RpcAsyncCall<TRequest, TResponse> {void request(TRequest request, Callback<TResponse> callback); } public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {CompletableFuture<TResponse> future = new CompletableFuture<>();call.request(request, (response, e) -> {if (e == null) future.complete(response);else future.completeExceptionally(e);});try {//使用Quasar等待Future結(jié)果return AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }
最后的調(diào)用可簡(jiǎn)化一行代碼,該方法適用于所有該Rpc框架提供的異步接口。
Response response= waitRpc(new RpcClient()::helloAsync, request);2.4?阻塞操作的處理
Quasar協(xié)程使用的時(shí)候有一定的限制,由于調(diào)度器線程池大小固定,在協(xié)程中不能阻塞線程,執(zhí)行線程將被占用。對(duì)于某些暫時(shí)只能依靠阻塞IO的調(diào)用,如數(shù)據(jù)庫(kù),消息隊(duì)列等,無(wú)法使用協(xié)程等待其結(jié)果,當(dāng)這些阻塞操作量不大的情況下,可使用另一個(gè)可伸縮的線程池等待結(jié)果,避免對(duì)協(xié)程調(diào)度器的影響。
public void waitBlocking() throws SuspendExecution {//從DB獲取結(jié)果String ans = waitBlocking(this::selectFromDB); }private ExecutorService threadPool = Executors.newCachedThreadPool();private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution {CompletableFuture<T> future = new CompletableFuture<>();threadPool.submit(() -> {T ans = supplier.get();future.complete(ans);});try {return AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }2.5?并發(fā)工具的使用
協(xié)程對(duì)并發(fā)鎖的使用有比較大的限制,需要使用者理解線程鎖與協(xié)程的調(diào)度機(jī)制。在synchronized同步塊的內(nèi)部,不能包含掛起協(xié)程的語(yǔ)句。當(dāng)持有鎖的協(xié)程掛起后會(huì)讓出線程資源,由于鎖的可重入性,另一個(gè)運(yùn)行在同一個(gè)線程上的協(xié)程再加鎖時(shí)同樣會(huì)成功。另一方面,協(xié)程掛起后恢復(fù)執(zhí)行時(shí),也可能會(huì)在另一個(gè)線程上運(yùn)行。出現(xiàn)兩個(gè)線程操作共享資源的異常。同時(shí)未持有鎖的線程釋放時(shí),會(huì)出現(xiàn)IllegalMonitorStateException異常。
但如果同步塊的內(nèi)部沒(méi)有掛起協(xié)程的語(yǔ)句,則線程鎖的機(jī)制仍然有效。線程的在執(zhí)行過(guò)程中可能切換,而協(xié)程的調(diào)度在每個(gè)執(zhí)行線程上是串行的,協(xié)程持有的鎖在不包含掛起操作時(shí),會(huì)在占用線程執(zhí)行完畢直到退出同步塊為止,不會(huì)發(fā)生鎖失效的情況。
JDK并發(fā)包中的工具可分為兩類,一類是Lock、Semaphore、CountDownLatch等具有線程可重入性的工具,不能在未釋放資源前使用掛起協(xié)程的操作,而另一類則是原子變量、并發(fā)容器等不會(huì)讓出線程的工具,仍可正常使用,但要注意高并發(fā)的情況下鎖的性能。此外,在使用并發(fā)工具的阻塞方法,如await時(shí),可能導(dǎo)致協(xié)程的執(zhí)行線程中發(fā)生阻塞。
三、總結(jié)
系統(tǒng)運(yùn)行在4核心的主機(jī)上,線程池構(gòu)成如下。
業(yè)務(wù)邏輯運(yùn)行在Quasar的協(xié)程調(diào)度線程池中,線程池大小為CPU核數(shù)。HTTP請(qǐng)求與RPC調(diào)用均通過(guò)內(nèi)部的NIO線程池管理。此外定義了一個(gè)core size為8的可伸縮的線程池用于少量消息隊(duì)列、DB等阻塞IO的操作。其余的線程是系統(tǒng)中引入的其他組件所新建的線程,正常情況下不會(huì)成為系統(tǒng)性能的瓶頸。
改造后,在業(yè)務(wù)高峰流量激增數(shù)十倍的情況下線程數(shù)量依然穩(wěn)定,而CPU利用率也從平均5%以下提升至10%-60%,在瞬時(shí)與高峰流量下能保持穩(wěn)定。集群CPU核數(shù)在保留一定的業(yè)務(wù)冗余以應(yīng)對(duì)業(yè)務(wù)高峰的情況下,縮減至1/5。
3.1?限制與風(fēng)險(xiǎn)
Quasar協(xié)程不是Java的語(yǔ)言標(biāo)準(zhǔn),沒(méi)有JVM層面的支持,使用時(shí)必須手動(dòng)拋出異常聲明每一個(gè)掛起方法,對(duì)代碼有一定的侵入性。使用不當(dāng)時(shí),可能出現(xiàn)異常。
代碼的try/catch時(shí)可能同時(shí)捕獲SuspendExecution異常,從而忘記標(biāo)記方法,此方法字節(jié)碼不會(huì)被修改,結(jié)合Quasar的原理不難看出,當(dāng)沒(méi)有織入字節(jié)碼時(shí),掛起方法恢復(fù)執(zhí)行,無(wú)法還原方法棧幀和執(zhí)行狀態(tài),將會(huì)出現(xiàn)語(yǔ)句被重復(fù)執(zhí)行、空指針等錯(cuò)誤。運(yùn)行時(shí)空指針、死循環(huán)的癥狀,排查的重點(diǎn)是是否漏加SuspendExecution標(biāo)記。
在新線程而不是新協(xié)程中使用掛起方法時(shí),會(huì)出現(xiàn)同樣的問(wèn)題。Thread的構(gòu)造方法中傳入的是Runnable接口對(duì)象,其run方法沒(méi)有聲明SuspendExecution異常,run內(nèi)部的語(yǔ)句不會(huì)被織入字節(jié)碼,造成上述異常。
3.2?總結(jié)與展望
協(xié)程使得NIO能夠更好地應(yīng)用在Java中,比回調(diào)方法更易讀易維護(hù)。對(duì)系統(tǒng)的改造集中在底層通信封裝和對(duì)方法的標(biāo)記上,業(yè)務(wù)邏輯無(wú)需修改。雖然具有一定的代碼侵入性和理解成本,但這種學(xué)習(xí)成本能逐漸被代碼的可維護(hù)性優(yōu)勢(shì)抵消。
異步編程最佳的實(shí)現(xiàn)方式是:“Codes Like Sync,Works Like Async”,即以同步的方式編碼,達(dá)到異步的效果與性能,兼顧可維護(hù)性與可伸縮性。OpenJDK 在2018年創(chuàng)建了Loom 項(xiàng)目(https://wiki.openjdk.java.net/display/loom),目標(biāo)是在JVM上實(shí)現(xiàn)輕量級(jí)的線程,并解除JVM線程與內(nèi)核線程的映射。相信會(huì)給Java生態(tài)帶來(lái)巨大的改變。
總結(jié)
以上是生活随笔為你收集整理的干货 | 携程基于Quasar协程的NIO实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spring 自定义注解玩法大全,从入门
- 下一篇: 传说中的CAFEBABE到底在哪儿?