日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

携程基于Quasar协程的NIO实践

發(fā)布時(shí)間:2024/1/23 编程问答 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 携程基于Quasar协程的NIO实践 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

IO密集型系統(tǒng)在高并發(fā)場景下,會有大量線程處于阻塞狀態(tài),性能低下,JAVA上成熟的非阻塞IO(NIO)技術(shù)可解決該問題。目前Java項(xiàng)目對接NIO的方式主要依靠回調(diào),代碼復(fù)雜度高,降低了代碼可讀性與可維護(hù)性。近年來Golang、Kotlin等語言的協(xié)程(Coroutine)能達(dá)到高性能與可讀性的兼顧。

本文利用開源的Quasar框架提供的協(xié)程對系統(tǒng)進(jìn)行NIO改造,解決以下兩個(gè)問題:

1)提升單機(jī)任務(wù)的吞吐量,保證業(yè)務(wù)請求突增時(shí)系統(tǒng)的可伸縮性。

2)使用更輕量的協(xié)程同步等待IO,替代處理NIO常用的異步回調(diào)。

一、Java異步編程與非阻塞IO

本文改造的系統(tǒng)處理來自前臺的任務(wù),通過HTTP請求對端服務(wù),還通過RPC調(diào)用內(nèi)部服務(wù)。當(dāng)業(yè)務(wù)高峰時(shí),系統(tǒng)會遇到瞬時(shí)并發(fā)任務(wù)量數(shù)十倍激增的情況,系統(tǒng)的線程數(shù)量急劇增加造成性能下降。為此,不得不擴(kuò)容以保證業(yè)務(wù)高峰時(shí)期的性能。

??

? ? ? ? ? ? ? ? ? ? ? ? ?

基于epoll的NIO框架Netty在一些框架級別的應(yīng)用中已經(jīng)得到了廣泛使用,但在快速迭代的業(yè)務(wù)系統(tǒng)中的應(yīng)用依然有一定的局限性。NIO 消除了線程的同步阻塞,意味著只能異步處理IO的結(jié)果,這與業(yè)務(wù)開發(fā)者順序化的思維模式有一定差異。當(dāng)業(yè)務(wù)邏輯復(fù)雜以及出現(xiàn)多次遠(yuǎn)程調(diào)用的情況下,多級回調(diào)難以實(shí)現(xiàn)和維護(hù)。

1.1?Java中的異步工具

Java項(xiàng)目大多使用JDK8,除線程外可以獲得的異步的編程支持包括CompletableFuture,以及開源的RxJava、Vert.x等反應(yīng)式編程框架等。這些工具使用了基于響應(yīng)式編程的鏈?zhǔn)秸{(diào)用逐級傳遞事件,未從根本解決回調(diào)問題。

如下為將一段簡單的邏輯判斷使用CompletableFuture進(jìn)行異步改造后的對比。原始版本使用getA方法獲得第一步的請求結(jié)果,根據(jù)其相應(yīng)選擇使用getB1還是getB2獲取第二步的響應(yīng)作為結(jié)果。

HttpResponse?a?=?getA(); HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();} String ans=b.getBody();

首先將三個(gè)獲取響應(yīng)的方法改為異步。此處假設(shè)getB1與getB2內(nèi)部已經(jīng)具有復(fù)雜邏輯,且不屬于同一領(lǐng)域,不適合合并為一個(gè)方法。

private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();

然后使用CompletableFuture的鏈?zhǔn)秸{(diào)用,將兩個(gè)步驟組合起來:

String ans = getA() .thenCompose(a -> { if (a.getBody().equals("1")) { return getB1(); } else { return getB2(); } }).get() .getBody();


使用CompletableFuture的鏈?zhǔn)交卣{(diào)后,代碼變得不友好。RxJava等框架同樣具有這個(gè)問題。這類反應(yīng)式的編程工具更適合于數(shù)據(jù)流的傳遞。對于if/else、switch/case,乃至while/for、break/continue這類過程控制語句,實(shí)現(xiàn)與維護(hù)的難度都很大。業(yè)務(wù)系統(tǒng)需要類似于線程的同步等待,同時(shí)具有低資源消耗的編碼工具,配合 NIO使用。當(dāng)時(shí)使用NIO時(shí),由于可以不占用線程,可以使用一種資源消耗更小的協(xié)程來等待。

1.2?協(xié)程

協(xié)程是一種進(jìn)程自身來調(diào)度任務(wù)的調(diào)度模式。協(xié)程與線程不同之處在于,線程由內(nèi)核調(diào)度,而協(xié)程的調(diào)度是進(jìn)程自身完成的。協(xié)程只是一種抽象,最終的執(zhí)行者是線程,每個(gè)線程只能同時(shí)執(zhí)行一個(gè)協(xié)程,但大量的協(xié)程可以只擁有少量幾個(gè)線程執(zhí)行者,協(xié)程的調(diào)度器負(fù)責(zé)決定當(dāng)前線程在執(zhí)行那個(gè)協(xié)程,其余協(xié)程處于休眠并被調(diào)度器保存在內(nèi)存中。

和線程類似,協(xié)程掛起時(shí)需要記錄棧信息,以及方法執(zhí)行的位置,這些信息會被協(xié)程調(diào)度器保存。協(xié)程從掛起到重新被執(zhí)行不需要執(zhí)行重量級的內(nèi)核調(diào)用,而是直接將狀態(tài)信息還原到執(zhí)行線程的棧,高并發(fā)場景下,協(xié)程極大地避免了切換線程的開銷。下圖展示了協(xié)程調(diào)度器內(nèi)部任務(wù)的流轉(zhuǎn)。

協(xié)程中調(diào)用的方法是可以掛起的。不同于線程的阻塞會使線程休眠,協(xié)程在等待異步任務(wù)的結(jié)果時(shí),會通知調(diào)度器將自己放入掛起隊(duì)列,釋放占用的線程以處理其他的協(xié)程。異步任務(wù)完畢后,通過回調(diào)將異步結(jié)果告知協(xié)程,并通知調(diào)度器將協(xié)程重新加入就緒隊(duì)列執(zhí)行。

1.3?Quasar任務(wù)調(diào)度原理

Quasar(https://github.com/puniverse/quasar)是一個(gè)開源的Java協(xié)程框架,通過利用Java instrument技術(shù)對字節(jié)碼進(jìn)行修改,使方法掛起前后可以保存和恢復(fù)JVM棧幀,方法內(nèi)部已執(zhí)行到的字節(jié)碼位置也通過增加狀態(tài)機(jī)的方式記錄,在下次恢復(fù)執(zhí)行可直接跳轉(zhuǎn)至最新位置。以如下方法為例,該方法分為兩步,第一步為initial初始化,第二部為通過NIO獲取網(wǎng)絡(luò)響應(yīng)。

public String instrumentDemo(){ initial(); String ans = getFromNIO(); return ans;}

Quasar會在initial前增加一個(gè)flag字段,表明當(dāng)前方法執(zhí)行的位置。第一次執(zhí)行方法時(shí),檢查到flag為0,修改flag為1并繼續(xù)往下執(zhí)行initial方法。執(zhí)行g(shù)etFromNIO方法前插入字節(jié)碼指令將棧幀中的數(shù)據(jù)全部保存在一個(gè)Quasar自定義的棧結(jié)構(gòu)中,在執(zhí)行g(shù)etFromNIO后,掛起協(xié)程,讓出線程資源。直至NIO異步完成后,協(xié)程調(diào)度器將第二次執(zhí)行該方法,檢測到flag為1,將會調(diào)用jump指令跳轉(zhuǎn)到returnans語句前,并將保存的棧結(jié)構(gòu)還原到當(dāng)前棧中,最后調(diào)用人return ans語句,方法執(zhí)行完畢。

二、系統(tǒng)異步IO改造

在項(xiàng)目中添加Quasar依賴后,可以使用Fiber類新建協(xié)程。建立的方法與線程類似。

new Fiber(()->{ //方法體}).start();

2.1?整合Netty與Quasar

系統(tǒng)使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),該框架提供了異步回調(diào)和CompletableFuture兩種對響應(yīng)的異步處理方式。

CompletableFuture自JDK8推出,與之前的Future類最大的不同在于,提供了異步任務(wù)跨線程的通知和控制機(jī)制。即,任務(wù)的等待者可以在CompletableFuture注冊任務(wù)完成或異常時(shí)的回調(diào),而執(zhí)行者也可以通過它通知等待者。Quaasr框架對它也做了支持,提供了API用于在協(xié)程中等待CompletableFuture的結(jié)果。調(diào)用后,協(xié)程將掛起,直至future狀態(tài)為已完成。

AsyncCompletionStage.get(future)

通過CompletableFuture作為通知中介,我們可以將AsyncHttpClient與Quasar做整合,掛起協(xié)程等待IO結(jié)果。

//創(chuàng)建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//創(chuàng)建請求Request request = createRequest();//將網(wǎng)絡(luò)請求交給HttpClient執(zhí)行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通過Quasar掛起協(xié)程Response response = AsyncCompletionStage.get(future);//獲取網(wǎng)絡(luò)結(jié)果后,通過future傳遞response并喚醒協(xié)程重新執(zhí)行deal(response);


過程可由下圖表示。

Quasar框架AsyncCompletionStage.get內(nèi)部完成的工作相當(dāng)于,在HttpClient返回的future上注冊回調(diào),回調(diào)的內(nèi)容是“IO操作完成后通知調(diào)度器喚醒協(xié)程”,這樣將NIO異步回調(diào)全部操作封裝在協(xié)程調(diào)度器中,用戶代碼看起來是同步等待的形式,避免了自行實(shí)現(xiàn)回調(diào)處理帶來的繁瑣,解決了前文所述的回調(diào)地獄。

2.2?聲明掛起方法

Quasar需要織入字節(jié)碼接管掛起方法的調(diào)度,在項(xiàng)目主pom下添加quasar-maven-plugin插件,該插件將在編譯后的class文件中修改字節(jié)碼。

<plugin> <groupId>com.vlkan</groupId> <artifactId>quasar-maven-plugin</artifactId> <version>0.7.9</version> <executions> <execution> <goals> <goal>instrument</goal> </goals> </execution> </executions></plugin>

Quasar通過識別方法是否拋出了該框架定義的SuspendExecution異常決定是否修改字節(jié)碼。Quasar框架在AsyncCompletionStage.get方法上聲明了SuspendExceution異常,該異常是捕獲異常,但僅作為識別掛起方法的聲明,在運(yùn)行時(shí)不會實(shí)際拋出。使用者必須逐層拋出該異常直至新建協(xié)程的一層。當(dāng)方法內(nèi)部存在try/catch語句時(shí),也必須拋出該異常。

public void startFiber() throws ExecutionException, InterruptedException { Fiber<Void> fiber = new Fiber<Void>(() -> { //不用繼續(xù)拋出異常 Response response = waitNextLayer1(); deal(response); }).start();} private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();} private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具類拋出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}

2.3?異步RPC調(diào)用

目前主流的RPC框架都基于NIO實(shí)現(xiàn),支持異步回調(diào),有的RPC框架已經(jīng)直接提供了返回CompletableFuture或ListenableFuture(Guava工具類提供)的異步接口,通過使用ComplatableFuture,可以按前文類似的方法將Quasar與RPC框架結(jié)合起來。當(dāng)RPC框架沒有該返回類型時(shí),一般會提供如下類似的帶泛型的異步回調(diào)接口:

interface Callback<TResponse> { void callback(TResponse TResponse, Exception e);}

這種情況,可以使用者自己創(chuàng)建ComplatableFuture,在回調(diào)中設(shè)置其狀態(tài),并調(diào)用AsyncCompletionStage.get等待這個(gè)future。

CompletableFuture<Response> future=new CompletableFuture<>();//調(diào)用hello接口的異步APInew RpcClient().helloAsync(request, new Callback<Response>() { public void callback(Response response, Exception e) { if (e == null) future.complete(response); else future.completeExceptionally(e); }});//在此處調(diào)用Quasar的API,掛起直至RPC調(diào)用完成Response response = AsyncCompletionStage.get(future);

上述代碼依然具有異步回調(diào)不直觀的缺點(diǎn),通過JDK8的函數(shù)式接口可以實(shí)現(xiàn)一個(gè)通用的調(diào)用模板,將異步回調(diào)變?yōu)橥降却男问健?/p>

@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> { void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution { CompletableFuture<TResponse> future = new CompletableFuture<>(); call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); }); try { //使用Quasar等待Future結(jié)果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}


最后的調(diào)用可簡化一行代碼,該方法適用于所有該Rpc框架提供的異步接口。

Response response= waitRpc(new RpcClient()::helloAsync, request);

2.4?阻塞操作的處理

Quasar協(xié)程使用的時(shí)候有一定的限制,由于調(diào)度器線程池大小固定,在協(xié)程中不能阻塞線程,執(zhí)行線程將被占用。對于某些暫時(shí)只能依靠阻塞IO的調(diào)用,如數(shù)據(jù)庫,消息隊(duì)列等,無法使用協(xié)程等待其結(jié)果,當(dāng)這些阻塞操作量不大的情況下,可使用另一個(gè)可伸縮的線程池等待結(jié)果,避免對協(xié)程調(diào)度器的影響。

public void waitBlocking() throws SuspendExecution { //從DB獲取結(jié)果 String ans = waitBlocking(this::selectFromDB);} private ExecutorService threadPool = Executors.newCachedThreadPool(); private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); }); try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}

2.5?并發(fā)工具的使用

協(xié)程對并發(fā)鎖的使用有比較大的限制,需要使用者理解線程鎖與協(xié)程的調(diào)度機(jī)制。在synchronized同步塊的內(nèi)部,不能包含掛起協(xié)程的語句。當(dāng)持有鎖的協(xié)程掛起后會讓出線程資源,由于鎖的可重入性,另一個(gè)運(yùn)行在同一個(gè)線程上的協(xié)程再加鎖時(shí)同樣會成功。另一方面,協(xié)程掛起后恢復(fù)執(zhí)行時(shí),也可能會在另一個(gè)線程上運(yùn)行。出現(xiàn)兩個(gè)線程操作共享資源的異常。同時(shí)未持有鎖的線程釋放時(shí),會出現(xiàn)IllegalMonitorStateException異常。

但如果同步塊的內(nèi)部沒有掛起協(xié)程的語句,則線程鎖的機(jī)制仍然有效。線程的在執(zhí)行過程中可能切換,而協(xié)程的調(diào)度在每個(gè)執(zhí)行線程上是串行的,協(xié)程持有的鎖在不包含掛起操作時(shí),會在占用線程執(zhí)行完畢直到退出同步塊為止,不會發(fā)生鎖失效的情況。

JDK并發(fā)包中的工具可分為兩類,一類是Lock、Semaphore、CountDownLatch等具有線程可重入性的工具,不能在未釋放資源前使用掛起協(xié)程的操作,而另一類則是原子變量、并發(fā)容器等不會讓出線程的工具,仍可正常使用,但要注意高并發(fā)的情況下鎖的性能。此外,在使用并發(fā)工具的阻塞方法,如await時(shí),可能導(dǎo)致協(xié)程的執(zhí)行線程中發(fā)生阻塞。

三、總結(jié)

系統(tǒng)運(yùn)行在4核心的主機(jī)上,線程池構(gòu)成如下。

業(yè)務(wù)邏輯運(yùn)行在Quasar的協(xié)程調(diào)度線程池中,線程池大小為CPU核數(shù)。HTTP請求與RPC調(diào)用均通過內(nèi)部的NIO線程池管理。此外定義了一個(gè)core size為8的可伸縮的線程池用于少量消息隊(duì)列、DB等阻塞IO的操作。其余的線程是系統(tǒng)中引入的其他組件所新建的線程,正常情況下不會成為系統(tǒng)性能的瓶頸。

改造后,在業(yè)務(wù)高峰流量激增數(shù)十倍的情況下線程數(shù)量依然穩(wěn)定,而CPU利用率也從平均5%以下提升至10%-60%,在瞬時(shí)與高峰流量下能保持穩(wěn)定。集群CPU核數(shù)在保留一定的業(yè)務(wù)冗余以應(yīng)對業(yè)務(wù)高峰的情況下,縮減至1/5。

3.1?限制與風(fēng)險(xiǎn)

Quasar協(xié)程不是Java的語言標(biāo)準(zhǔn),沒有JVM層面的支持,使用時(shí)必須手動(dòng)拋出異常聲明每一個(gè)掛起方法,對代碼有一定的侵入性。使用不當(dāng)時(shí),可能出現(xiàn)異常。

代碼的try/catch時(shí)可能同時(shí)捕獲SuspendExecution異常,從而忘記標(biāo)記方法,此方法字節(jié)碼不會被修改,結(jié)合Quasar的原理不難看出,當(dāng)沒有織入字節(jié)碼時(shí),掛起方法恢復(fù)執(zhí)行,無法還原方法棧幀和執(zhí)行狀態(tài),將會出現(xiàn)語句被重復(fù)執(zhí)行、空指針等錯(cuò)誤。運(yùn)行時(shí)空指針、死循環(huán)的癥狀,排查的重點(diǎn)是是否漏加SuspendExecution標(biāo)記。

在新線程而不是新協(xié)程中使用掛起方法時(shí),會出現(xiàn)同樣的問題。Thread的構(gòu)造方法中傳入的是Runnable接口對象,其run方法沒有聲明SuspendExecution異常,run內(nèi)部的語句不會被織入字節(jié)碼,造成上述異常。

3.2?總結(jié)與展望

協(xié)程使得NIO能夠更好地應(yīng)用在Java中,比回調(diào)方法更易讀易維護(hù)。對系統(tǒng)的改造集中在底層通信封裝和對方法的標(biāo)記上,業(yè)務(wù)邏輯無需修改。雖然具有一定的代碼侵入性和理解成本,但這種學(xué)習(xí)成本能逐漸被代碼的可維護(hù)性優(yōu)勢抵消。

異步編程最佳的實(shí)現(xiàn)方式是:“Codes Like Sync,Works Like Async”,即以同步的方式編碼,達(dá)到異步的效果與性能,兼顧可維護(hù)性與可伸縮性。OpenJDK 在2018年創(chuàng)建了Loom 項(xiàng)目(Main - Main - OpenJDK Wiki),目標(biāo)是在JVM上實(shí)現(xiàn)輕量級的線程,并解除JVM線程與內(nèi)核線程的映射。相信會給Java生態(tài)帶來巨大的改變。

總結(jié)

以上是生活随笔為你收集整理的携程基于Quasar协程的NIO实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。