告别异步代码
Quasar是一個(gè)向JVM添加真正的輕量級(jí)線程(纖維)的庫。 它們非常便宜且非常快-實(shí)際上,光纖的行為就像Erlang進(jìn)程或Go goroutines-并允許您編寫簡(jiǎn)單的阻塞代碼,同時(shí)享受與復(fù)雜異步代碼相同的性能優(yōu)勢(shì)。
在本文中,我們將學(xué)習(xí)如何將任何基于回調(diào)的異步API轉(zhuǎn)換為漂亮的(光纖)阻塞API。 它適用于希望將自己的或第三方庫與Quasar光纖集成的用戶。 如果您僅將Quasar光纖與通道或演員一起使用,或者利用Comsat項(xiàng)目中已經(jīng)提供的許多集成功能,則不需要了解這些知識(shí)(下面提供的代碼是應(yīng)用程序開發(fā)人員從未看到的代碼)。 但是,即使您不這樣做,您也可能會(huì)發(fā)現(xiàn)這篇文章對(duì)理解Quasar如何發(fā)揮其魔力很有幫助。
為什么要異步?
首先,許多庫提供異步API的原因是OS可以處理的正在運(yùn)行的1個(gè)線程的數(shù)量遠(yuǎn)遠(yuǎn)少于OS可以維護(hù)的開放TCP連接的數(shù)量。 也就是說,您的計(jì)算機(jī)所支持的并發(fā)性比線程所提供的要高得多,因此,庫以及使用它們的開發(fā)人員會(huì)放棄線程,將其作為用于軟件并發(fā)性單元2的抽象。 異步API不會(huì)阻塞線程,并可能導(dǎo)致顯著的性能提升(通常在吞吐量和服務(wù)器容量方面,而在延遲方面卻沒有那么多)。
但是,使用異步API也會(huì)創(chuàng)建正確獲得“回調(diào)地獄”名稱的代碼。 在缺乏多核處理的環(huán)境(例如Javascript)中,回調(diào)地獄已經(jīng)很糟糕了。 在那些需要關(guān)注內(nèi)存可見性和同步性的環(huán)境(例如JVM)中,情況可能會(huì)更糟。
編寫在光纖上運(yùn)行的阻塞代碼可為您提供與異步代碼相同的優(yōu)點(diǎn),而沒有缺點(diǎn):您使用了不錯(cuò)的阻塞API(甚至可以繼續(xù)使用現(xiàn)有的API),但是卻獲得了非阻塞代碼的所有性能優(yōu)勢(shì)。
可以肯定的是,異步API還有一個(gè)優(yōu)勢(shì):它們使您可以同時(shí)分派多個(gè)IO操作(例如HTTP請(qǐng)求)。 因?yàn)檫@些操作通常需要很長時(shí)間才能完成,而且通常是獨(dú)立的,所以我們可以同時(shí)等待其中的幾個(gè)完成。 但是,Java期貨也可以使用此有用的功能,而無需回調(diào)。 稍后,我們將看到如何制作纖維增強(qiáng)期貨。
光纖異步
許多現(xiàn)代的Java IO /數(shù)據(jù)庫庫/驅(qū)動(dòng)程序都提供兩種API:一種是同步(線程)阻塞的API,另一種是基于回調(diào)的異步API(對(duì)于NIO,JAX-RS客戶端,Apache HTTP客戶端以及更多的API來說都是如此。 )。 同步API更好。
Quasar有一個(gè)編程工具,可以將任何基于回調(diào)的異步API轉(zhuǎn)換為一個(gè)很好的阻止光纖的API: FiberAsync 。 本質(zhì)上, FiberASync所做的是阻止當(dāng)前光纖,安裝異步回調(diào),并在觸發(fā)該回調(diào)時(shí),它將再次喚醒光纖,并返回操作結(jié)果(如果失敗,則引發(fā)異常)。
為了了解如何使用FiberAsync ,我們將看一個(gè)API示例: FooClient 。 FooClient是一種現(xiàn)代的IO API,因此有兩種形式,一種是同步的,線程阻止一種,另一種是異步的。 他們來了:
interface FooClient {String op(String arg) throws FooException, InterruptedException; }interface AsyncFooClient {Future<String> asyncOp(String arg, FooCompletion<String> callback); }interface FooCompletion<T> {void success(T result);void failure(FooException exception); }請(qǐng)注意異步操作(如許多現(xiàn)代庫中的情況)如何都需要回調(diào)并返回前途。 現(xiàn)在,讓我們忽略未來。 我們稍后再講。
FooClient比AsyncFooClient更好,更簡(jiǎn)單,但是它阻塞了線程并大大降低了吞吐量。 我們想要?jiǎng)?chuàng)建一個(gè)FooClient接口的實(shí)現(xiàn),該接口可以在光纖中運(yùn)行并阻塞光纖,因此我們獲得了簡(jiǎn)單的代碼和高吞吐量。 為此,我們將在AsyncFooClient使用AsyncFooClient ,但將其轉(zhuǎn)換為阻止光纖的FooClient 。 這是我們需要的所有代碼(我們將進(jìn)一步對(duì)其進(jìn)行簡(jiǎn)化):
public class FiberFooClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return new FiberAsync<String, FooException>() {@Overrideprotected void requestAsync() {asyncClient.asyncOp(arg, new FooCompletion<String>() {public void success(String result) {FiberAsync.this.asyncCompleted(result);}public void failure(FooException exception) {FiberAsync.this.asyncFailed(exception);}});}}.run();} catch(SuspendExecution e) {throw new AssertionError(e);}} }現(xiàn)在,這是怎么回事? 我們正在實(shí)施的FooClient接口,但我們正在做op纖維粘連,而不是線程阻塞。 我們需要告訴Quasar我們的方法是光纖阻塞(或“可掛起”),因此我們使用@Suspendable對(duì)其進(jìn)行@Suspendable 。
然后,我們將FiberAsync子類FiberAsync并實(shí)現(xiàn)requestAsync方法( FiberAsync接受的兩個(gè)通用類型參數(shù)是返回類型和操作可能拋出的已檢查異常的類型(如果有);對(duì)于沒有已檢查異常的情況,第二個(gè)通用參數(shù)應(yīng)為RuntimeException )。 requestAsync負(fù)責(zé)啟動(dòng)異步操作并注冊(cè)回調(diào)。 然后,回調(diào)需要調(diào)用asyncCompleted (如果操作成功)并將其傳遞給我們希望返回的結(jié)果,或者asyncFailed (如果操作失敗)并將失敗原因的異常傳遞給它。
最后,我們調(diào)用FiberAsync.run() 。 這將阻止當(dāng)前的光纖,并調(diào)用requestAsync來安裝回調(diào)。 纖維將保持阻塞,直到回調(diào)被觸發(fā),它會(huì)釋放出FiberAsync通過調(diào)用或者asyncCompleted或asyncFailed 。 run方法還具有一個(gè)帶超時(shí)參數(shù)的版本,如果我們想對(duì)阻塞操作進(jìn)行時(shí)間限制(通常是個(gè)好主意),該方法很有用。
需要解釋的另一件事是try/catch塊。 有兩種方法來聲明可@Suspendable的方法:用@Suspendable對(duì)其進(jìn)行注釋,或者聲明它以引發(fā)已檢查的異常SuspendExecution 。 FiberAsync的run方法使用了后者,因此為了編譯代碼,我們需要捕獲SuspendExecution ,但是由于這不是真正的異常,因此我們永遠(yuǎn)無法真正捕獲它(嗯,至少在Quasar運(yùn)行正常的情況下,至少不是這樣) –因此為AssertionError 。
完成后,您可以在任何光纖中使用op ,如下所示:
new Fiber<Void>(() ->{// ...String res = client.op();// ... }).start();順便說一句,所有的要短很多與脈沖星 (類星體的Clojure的API),其中異步操作:
(async-op arg #(println "result:" %))使用Pulsar的await宏將其轉(zhuǎn)換為以下同步的光纖阻塞代碼:
(println "result:" (await (async-op arg)))簡(jiǎn)化和批量生產(chǎn)
通常,像FooClient這樣的接口將具有許多方法,并且通常, AsyncFooClient大多數(shù)方法將采用相同類型的回調(diào)( FooCompletion )。 如果是這種情況,我們可以將我們已經(jīng)看到的許多代碼封裝到FiberAsync的命名子類中:
abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> {@Overridepublic void success(T result) {asyncCompleted(result);}@Overridepublic void failure(FooException exception) {asyncFailed(exception);}@Override@Suspendablepublic T run() throws FooException, InterruptedException {try {return super.run();} catch (SuspendExecution e) {throw new AssertionError();}}@Override@Suspendablepublic T run(long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException {try {return super.run(timeout, unit);} catch (SuspendExecution e) {throw new AssertionError();}} }請(qǐng)注意,我們?nèi)绾问笷iberAsync直接實(shí)現(xiàn)FooCompletion回調(diào)–不是必需的,但這是一個(gè)有用的模式。 現(xiàn)在,我們的光纖阻塞op方法要簡(jiǎn)單得多,并且該接口中的其他操作也可以輕松實(shí)現(xiàn):
@Override @Suspendable public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}}.run(); }有時(shí),我們可能希望在常規(guī)線程而不是光纖上調(diào)用op方法。 默認(rèn)情況下,如果在線程上調(diào)用FiberAsync.run()會(huì)引發(fā)異常。 為了解決這個(gè)問題,我們要做的就是實(shí)現(xiàn)另一個(gè)FiberAsync方法requestSync ,如果在光纖上調(diào)用run ,它將調(diào)用原始的同步API。 我們的最終代碼如下所示(我們假設(shè)FiberFooClass具有類型為FooClient的syncClient字段):
@Override @Suspendable public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}public String requestSync() {return syncClient.op(arg);}}.run(); }就是這樣!
期貨
期貨是一種方便的方法,它允許我們?cè)诘却歇?dú)立的長時(shí)間IO操作完成時(shí)同時(shí)開始它們。 我們希望我們的纖維能夠阻擋期貨。 許多Java庫通過其異步操作返回期貨,因此用戶可以在完全異步,基于回調(diào)的用法和采用期貨的“半同步”用法之間進(jìn)行選擇。 我們的AsyncFooClient接口就是這樣工作的。
這是我們實(shí)現(xiàn)AsyncFooClient版本的AsyncFooClient ,該版本返回阻止光纖的期貨:
import co.paralleluniverse.strands.SettableFuture;public class FiberFooAsyncClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Overridepublic Future<String> asyncOp(String arg, FooCompletion<String> callback) {final SettableFuture<T> future = new SettableFuture<>();asyncClient.asyncOp(arg, callbackFuture(future, callback))return future;}private static <T> FooCompletion<T> callbackFuture(final SettableFuture<T> future, final FooCompletion<T> callback) {return new FooCompletion<T>() {@Overridepublic void success(T result) {future.set(result);callback.completed(result);}@Overridepublic void failure(Exception ex) {future.setException(ex);callback.failed(ex);}@Overridepublic void cancelled() {future.cancel(true);callback.cancelled();}};} }如果返回, co.paralleluniverse.strands.SettableFuture返回co.paralleluniverse.strands.SettableFuture ,如果我們?cè)诠饫w或普通線程(即任何類型的絞線上 )上對(duì)其進(jìn)行阻塞,則同樣可以很好地工作。
JDK 8的CompletableFuture和Guava的ListenableFuture
返回到CompletionStage (或?qū)崿F(xiàn)它的CompletableFuture )的API(在JDK 8中添加到Java中)可以通過預(yù)先構(gòu)建的FiberAsync更加輕松地進(jìn)行光纖阻塞。 例如,
CompletableFuture<String> asyncOp(String arg);通過以下方式變成光纖阻塞呼叫:
String res = AsyncCompletionStage.get(asyncOp(arg));返回Google Guava的方法類似地轉(zhuǎn)換為光纖阻塞同步,因此:
ListenableFuture<String> asyncOp(String arg);通過以下方式變成光纖阻塞:
String res = AsyncListenableFuture.get(asyncOp(arg));期貨的替代品
盡管期貨有用且熟悉,但我們實(shí)際上并不需要使用纖維時(shí)返回它們的特殊API。 產(chǎn)生的纖維是如此便宜( Fiber類實(shí)現(xiàn)了Future ,因此纖維本身可以代替“手工”的期貨。 這是一個(gè)例子:
void work() {Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op("first operation"));Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op("second operation"));String res1 = f1.get();String res2 = f2.get(); }因此,即使我們使用的API不提供,光纖也可以為我們提供期貨。
如果沒有異步API怎么辦?
有時(shí)我們很不幸地遇到一個(gè)僅提供同步線程阻塞API的庫。 JDBC是此類API的主要示例。 雖然Quasar不能提高使用此類庫的吞吐量,但是使API光纖兼容仍然值得(實(shí)際上非??常容易)。 為什么? 因?yàn)檎{(diào)用同步服務(wù)的光纖也可能做其他事情。 實(shí)際上,它們可能很少調(diào)用該服務(wù)(僅當(dāng)發(fā)生高速緩存未命中時(shí),才考慮從RDBMS讀取數(shù)據(jù)的光纖)。
實(shí)現(xiàn)此目的的方法是通過在專用線程池中執(zhí)行實(shí)際的調(diào)用,然后通過FiberAsync封裝該假的異步API,將阻塞API轉(zhuǎn)變?yōu)楫惒紸PI。 這個(gè)過程是如此機(jī)械, FiberAsync有一些靜態(tài)方法可以為我們處理所有事情。 因此,假設(shè)我們的服務(wù)僅公開了阻塞的FooClient API。 要使其成為光纖阻塞,我們要做的是:
public class SadFiberFooClient implements FooClient {private final FooClient client;private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool();public FiberFooClient(FooClient client) {this.client = client;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op());} catch(SuspendExecution e) {throw new AssertionError(e);}} }FooClient此實(shí)現(xiàn)可以安全地用于線程和光纖。 實(shí)際上,當(dāng)在普通線程上調(diào)用該方法時(shí),該方法將不會(huì)費(fèi)心將操作分派到所提供的線程池,而是在當(dāng)前線程上執(zhí)行該操作–就像我們使用原始FooClient實(shí)現(xiàn)那樣。
結(jié)論
此處顯示的技術(shù)FiberAsync和cpstrands.SettableFuture正是構(gòu)成Comsat項(xiàng)目的集成模塊的工作方式。 Comsat包括Servlet,JAX-RS(服務(wù)器和客戶端),JDBC,JDBI,jOOQ,MongoDB,Retrofit和Dropwizard的集成。
重要的是要了解如何-創(chuàng)建簡(jiǎn)單且高性能的光纖阻塞API-我們確實(shí)重新實(shí)現(xiàn)了API 接口 ,但沒有實(shí)現(xiàn)其內(nèi)部工作:仍然僅通過其異步API使用原始庫代碼,其丑陋之處在于現(xiàn)在對(duì)圖書館用戶隱藏了。
額外信用:單子怎么樣?
除了纖程外,還有其他方法可以處理回調(diào)地獄。 JVM世界中最著名的機(jī)制是Scala的可組合期貨,RxJava的可觀察對(duì)象以及JDK 8的CompletionStage / CompletableFuture 。 這些都是單子和單子組成的例子。 Monad可以工作,有些人喜歡使用它們,但是我認(rèn)為對(duì)于大多數(shù)編程語言來說,它們是錯(cuò)誤的方法。
您會(huì)看到,單子是從基于lambda演算的編程語言中借用的。 Lambda演算是一種理論計(jì)算模型,與Turing機(jī)器完全不同,但完全類似。 但是與圖靈機(jī)模型不同,lambda微積分計(jì)算沒有步驟,動(dòng)作或狀態(tài)的概念。 這些計(jì)算沒有做任何事情; 他們只是。 那么,Monads是Haskell等基于LC的語言將動(dòng)作,狀態(tài),時(shí)間等描述為純計(jì)算的一種方式。 它們是LC語言告訴計(jì)算機(jī)“先執(zhí)行然后再執(zhí)行”的一種方法。
問題是,命令式語言已經(jīng)有了“先做然后再做”的抽象,而這種抽象就是線程。 不僅如此,而且是必須的語言通常有一個(gè)非常簡(jiǎn)單的符號(hào)“這樣做,然后做”:聲明此后跟該語句。 命令式語言甚至考慮采用這種外來概念的唯一原因是因?yàn)?#xff08;通過OS內(nèi)核)線程的實(shí)現(xiàn)不令人滿意。 但是,與其采用一個(gè)陌生,陌生的概念(一個(gè)需要完全不同類型的API的概念),不如采用一個(gè)相似但細(xì)微不同的抽象,最好是修復(fù)(線程)的實(shí)現(xiàn)。 光纖保留抽象并修復(fù)實(shí)現(xiàn)。
Java和Scala等語言中的monad的另一個(gè)問題是,這些語言不僅勢(shì)在必行,而且還允許不受限制的共享狀態(tài)突變和副作用-Haskell卻沒有。 無限制的共享狀態(tài)突變和“線程”單核的結(jié)合可能是災(zāi)難性的。 在純FP語言中-由于副作用是受控的-計(jì)算單位(即功能)也是并發(fā)單位:您可以安全地同時(shí)執(zhí)行任何一對(duì)功能。 當(dāng)您不受限制的副作用時(shí),情況并非如此。 函數(shù)執(zhí)行的順序,是否可以同時(shí)執(zhí)行兩個(gè)函數(shù)以及一個(gè)函數(shù)是否以及何時(shí)可以觀察到另一個(gè)函數(shù)執(zhí)行的共享狀態(tài)突變都是非常重要的問題。 結(jié)果,作為“線程” monad的一部分運(yùn)行的函數(shù)要么必須是純函數(shù)(沒有任何副作用),要么必須非常小心如何執(zhí)行這些副作用。 這正是我們要避免的事情。 因此,盡管單子組合確實(shí)比回調(diào)地獄生成了更好的代碼,但它們不能解決異步代碼引入的任何并發(fā)問題。
聚苯乙烯
上一節(jié)不應(yīng)理解為像Haskell這樣的純“ FP”語言的認(rèn)可,因?yàn)槲覍?shí)際上認(rèn)為它們帶來了太多其他問題。 我相信(不久的將來)是命令式語言3 ,它將允許共享狀態(tài)變異但具有一些事務(wù)語義。 我相信那些未來的語言將主要從Clojure和Erlang等語言中獲得靈感。
翻譯自: https://www.javacodegeeks.com/2015/04/farewell-to-asynchronous-code.html
總結(jié)
- 上一篇: Quasar和Akka –比较
- 下一篇: 魔术二传手反模式