C# 8中的Async Streams
關(guān)鍵要點(diǎn)
異步編程技術(shù)提供了一種提高程序響應(yīng)能力的方法。
Async/Await模式在C# 5中首次亮相,但只能返回單個標(biāo)量值。
C# 8添加了異步流(Async Streams),允許異步方法返回多個值,從而擴(kuò)展了其可用性。
異步流提供了一種用于表示異步數(shù)據(jù)源的絕佳方法。
異步流是Java和JavaScript中使用的反應(yīng)式編程模型的替代方案。
C# 5引入了Async/Await,用以提高用戶界面響應(yīng)能力和對Web資源的訪問能力。換句話說,異步方法用于執(zhí)行不阻塞線程并返回一個標(biāo)量結(jié)果的異步操作。
微軟多次嘗試簡化異步操作,因?yàn)锳sync/Await模式易于理解,所以在開發(fā)人員當(dāng)中獲得了良好的認(rèn)可。
現(xiàn)有異步方法的一個重要不足是它必須提供一個標(biāo)量返回結(jié)果(一個值)。比如這個方法async Task<int> DoAnythingAsync(),DoAnythingAsync的結(jié)果是一個整數(shù)(一個值)。
由于存在這個限制,你不能將這個功能與yield關(guān)鍵字一起使用,并且也不能將其與async IEnumerable<int>(返回異步枚舉)一起使用。
如果可以將Async/Await特性與yield操作符一起使用,我們就可以使用非常強(qiáng)大的編程模型(如異步數(shù)據(jù)拉取或基于拉取的枚舉,在F#中被稱為異步序列)。
C# 8中新提出的Async Streams去掉了標(biāo)量結(jié)果的限制,并允許異步方法返回多個結(jié)果。
這個變更將使異步模式變得更加靈活,這樣就可以按照延遲異步序列的方式從數(shù)據(jù)庫中獲取數(shù)據(jù),或者按照異步序列的方式下載數(shù)據(jù)(這些數(shù)據(jù)在可用時以塊的形式返回)。
例如:
foreach await (var streamChunck in asyncStreams){Console.WriteLine($“Received data count = {streamChunck.Count}”);}Reactive Extensions(Rx)是解決異步編程問題的另一種方法。Rx越來越受到開發(fā)人員的歡迎。很多其他編程語言(如Java和JavaScript)已經(jīng)實(shí)現(xiàn)了這種技術(shù)(RxJava、RxJS)。Rx基于推送式編程模型(Push Programming Model),也稱為反應(yīng)式編程。反應(yīng)式編程是事件驅(qū)動編程的一種類型,它處理的是數(shù)據(jù)而不是通知。
通常,在推送式編程模型中,你不需要控制Publisher。數(shù)據(jù)被異步推送到隊(duì)列中,消費(fèi)者在數(shù)據(jù)到達(dá)時消費(fèi)數(shù)據(jù)。與Rx不同,Async Streams可以按需被調(diào)用,并生成多個值,直到達(dá)到枚舉的末尾。
在本文中,我將對拉取模型和推送模型進(jìn)行比較,并演示每一種技術(shù)各自的適用場景。我將使用很多代碼示例向你展示整個概念和它們的優(yōu)點(diǎn),最后,我將討論Async Streams功能,并向你展示示例代碼。
拉取式編程模型與推送式編程模型
圖-1-拉取式編程模型與推送式編程模型
我使用的例子是著名的生產(chǎn)者和消費(fèi)者問題,但在我們的場景中,生產(chǎn)者不是生成食物,而是生成數(shù)據(jù),消費(fèi)者消費(fèi)的是生成的數(shù)據(jù),如圖-1所示。拉取模型很容易理解。消費(fèi)者詢問并拉取生產(chǎn)者的數(shù)據(jù)。另一種方法是使用推送模型。生產(chǎn)者將數(shù)據(jù)發(fā)布到隊(duì)列中,消費(fèi)者通過訂閱隊(duì)列來接收所需的數(shù)據(jù)。
拉取模型更合適“快生產(chǎn)者和慢消費(fèi)者”的場景,因?yàn)橄M(fèi)者可以從生產(chǎn)者那里拉取其所需的數(shù)據(jù),避免消費(fèi)者出現(xiàn)溢出。推送模型更適合“慢生產(chǎn)者和快消費(fèi)者”的場景,因?yàn)樯a(chǎn)者可以將數(shù)據(jù)推送給消費(fèi)者,避免消費(fèi)者不必要的等待時間。
Rx和Akka Streams(流式編程模型)使用了回壓技術(shù)(一種流量控制機(jī)制)。它使用拉取模型或推送模型來解決上面提到的生產(chǎn)者和消費(fèi)者問題。
在下面的示例中,我使用了一個慢消費(fèi)者從快生產(chǎn)者那里異步拉取數(shù)據(jù)序列。消費(fèi)者在處理完一個元素后,會向生產(chǎn)者請求下一個元素,依此類推,直到到達(dá)序列的末尾。
動機(jī)和背景
要了解我們?yōu)槭裁葱枰狝sync Streams,讓我們來看下面的代碼。
// 對參數(shù)(count)進(jìn)行循環(huán)相加操作 static int SumFromOneToCount(int count){ConsoleExt.WriteLine("SumFromOneToCount called!");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;}return sum;}方法調(diào)用:
const int count = 5;ConsoleExt.WriteLine($"Starting the application with count: {count}!");ConsoleExt.WriteLine("Classic sum starting.");ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");ConsoleExt.WriteLine("Classic sum completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);輸出:
我們可以通過使用yield運(yùn)算符讓這個方法變成惰性的,如下所示。
static IEnumerable<int> SumFromOneToCountYield(int count){ConsoleExt.WriteLine("SumFromOneToCountYield called!");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;yield return sum;}}調(diào)用方法:
const int count = 5;ConsoleExt.WriteLine("Sum with yield starting.");foreach (var i in SumFromOneToCountYield(count)){ConsoleExt.WriteLine($"Yield sum: {i}");}ConsoleExt.WriteLine("Sum with yield completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);輸出:
正如你在輸出窗口中看到的那樣,結(jié)果被分成幾個部分返回,而不是作為一個值返回。以上顯示的累積結(jié)果被稱為惰性枚舉。但是,仍然存在一個問題,即sum方法阻塞了代碼的執(zhí)行。如果你查看線程,可以看到所有東西都在主線程中運(yùn)行。
現(xiàn)在,讓我們將async應(yīng)用于第一個方法SumFromOneToCount上(沒有yield關(guān)鍵字)。
static async Task<int> SumFromOneToCountAsync(int count){ConsoleExt.WriteLine("SumFromOneToCountAsync called!");var result = await Task.Run(() =>{var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;}return sum;});return result;}調(diào)用方法:
const int count = 5;ConsoleExt.WriteLine("async example starting.");// 相加操作是異步進(jìn)行得!這樣還不夠,我們要求不僅是異步的,還必須是惰性的。var result = await SumFromOneToCountAsync(count);ConsoleExt.WriteLine("async Result: " + result);ConsoleExt.WriteLine("async completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);輸出:
我們可以看到計(jì)算過程是在另一個線程中運(yùn)行,但結(jié)果仍然是作為一個值返回!
想象一下,我們可以按照命令式風(fēng)格將惰性枚舉(yield return)與異步方法結(jié)合起來。這種組合稱為Async Streams。這是C# 8中新提出的功能。這個新功能為我們提供了一種很好的技術(shù)來解決拉取式編程模型問題,例如從網(wǎng)站下載數(shù)據(jù)或從文件或數(shù)據(jù)庫中讀取記錄。
讓我們嘗試使用當(dāng)前的C# 版本。我將async關(guān)鍵字添加到SumFromOneToCountYield方法中,如下所示。
圖-2 組合使用async關(guān)鍵字和yield發(fā)生錯誤
我們試著將async添加到SumFromOneToCountYield,但直接出現(xiàn)錯誤,如上所示!
讓我們試試別的吧。我們可以將IEnumerable放入任務(wù)中并刪除yield關(guān)鍵字,如下所示:
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count){ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");var collection = new Collection<int>();var result = await Task.Run(() =>{var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;collection.Add(sum);}return collection;});return result;}調(diào)用方法:
const int count = 5;ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");var scs = await SumFromOneToCountTaskIEnumerable(count);ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");foreach (var sc in scs){// 這不是我們想要的,結(jié)果將作為塊返回!!!!ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");}ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);輸出:
可以看到,我們異步計(jì)算所有的內(nèi)容,但仍然存在一個問題。結(jié)果(所有結(jié)果都在集合中累積)作為一個塊返回,但這不是我們想要的惰性行為,我們的目標(biāo)是將惰性行為與異步計(jì)算風(fēng)格相結(jié)合。
為了實(shí)現(xiàn)所需的行為,你需要使用外部庫,如Ix(Rx的一部分),或者你必須使用新提出的C#特性Async Streams。
回到我們的代碼示例。我使用了一個外部庫來顯示異步行為。
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence){ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");await sequence.ForEachAsync(value =>{ConsoleExt.WriteLineAsync($"Consuming the value: {value}");// 模擬延遲!Task.Delay(TimeSpan.FromSeconds(1)).Wait();});}static IEnumerable<int> ProduceAsyncSumSeqeunc(int count){ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;// 模擬延遲!Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();yield return sum;}}調(diào)用方法:
const int count = 5;ConsoleExt.WriteLine("Starting Async Streams Demo!");// 啟動一個新任務(wù),用于生成異步數(shù)據(jù)序列!IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");// 啟動另一個新任務(wù),用于消費(fèi)異步數(shù)據(jù)序列!var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));// 出于演示目的,等待任務(wù)完成!consumingTask.Wait();ConsoleExt.WriteLineAsync("Async Streams Demo Done!");輸出:
最后,我們實(shí)現(xiàn)了我們想要的行為!我們可以在枚舉上進(jìn)行異步迭代。
源代碼在這里。
客戶端/服務(wù)器端的異步拉取
我將使用一個更現(xiàn)實(shí)的例子來解釋這個概念。客戶端/服務(wù)器端架構(gòu)是演示這一功能優(yōu)勢的絕佳方法。
客戶端/服務(wù)器端同步調(diào)用
客戶端向服務(wù)器端發(fā)送請求,客戶端必須等待(客戶端被阻塞),直到服務(wù)器端做出響應(yīng),如圖-3所示。
圖-3 同步數(shù)據(jù)拉取,客戶端等待請求完成
異步數(shù)據(jù)拉取
客戶端發(fā)出數(shù)據(jù)請求然后繼續(xù)執(zhí)行其他操作。一旦有數(shù)據(jù)到達(dá),客戶端就繼續(xù)處理達(dá)到的數(shù)據(jù)。
圖-4 異步數(shù)據(jù)拉取,客戶端可以在請求數(shù)據(jù)時執(zhí)行其他操作
異步序列數(shù)據(jù)拉取
客戶端發(fā)出數(shù)據(jù)塊請求,然后繼續(xù)執(zhí)行其他操作。一旦數(shù)據(jù)塊到達(dá),客戶端就處理接收到的數(shù)據(jù)塊并詢問下一個數(shù)據(jù)塊,依此類推,直到達(dá)到最后一個數(shù)據(jù)塊為止。這正是Async Streams想法的來源。圖-5顯示了客戶端可以在收到任何數(shù)據(jù)時執(zhí)行其他操作或處理數(shù)據(jù)塊。
圖-5 異步序列數(shù)據(jù)拉取(Async Streams),客戶端未被阻塞!
Async Streams
與IEnumerable<T>和IEnumerator<T>類似,Async Streams提供了兩個新接口IAsyncEnumerable<T>和IAsyncEnumerator<T>,定義如下:
public interface IAsyncEnumerable<out T>{IAsyncEnumerator<T> GetAsyncEnumerator();}public interface IAsyncEnumerator<out T> : IAsyncDisposable ? ?{Task<bool> MoveNextAsync();T Current { get; }}// Async Streams Feature可以被異步銷毀public interface IAsyncDisposable{Task DiskposeAsync();}Jonathan Allen已經(jīng)在InfoQ網(wǎng)站上介紹過這個主題,我不想在這里再重復(fù)一遍,所以我建議你也閱讀一下他的文章。
關(guān)鍵在于Task<bool> MoveNextAsync()的返回值(從bool改為Task<bool>,bool IEnumerator.MoveNext())。這樣可以讓整個計(jì)算和迭代都保持異步。大多數(shù)情況下,這仍然是拉取模型,即使它是異步的。IAsyncDisposable接口可用于進(jìn)行異步清理。有關(guān)異步的更多信息,請點(diǎn)擊此處。
語法
最終語法應(yīng)如下所示:
foreach await (var dataChunk in asyncStreams){// 處理數(shù)據(jù)塊或做一些其他的事情!}如上所示,我們現(xiàn)在可以按順序計(jì)算多個值,而不只是計(jì)算單個值,同時還能夠等待其他異步操作結(jié)束。
重寫微軟的示例
我重寫了微軟的演示代碼,你可以從我的GitHub下載相關(guān)代碼。
這個例子背后的想法是創(chuàng)建一個大的MemoryStream(20000字節(jié)的數(shù)組),并按順序異步迭代集合中的元素或MemoryStream。每次迭代從數(shù)組中拉取8K字節(jié)。
在(1)處,我們創(chuàng)建了一個大字節(jié)數(shù)組并填充了一些虛擬值。在(2)處,我們定義了一個叫作checksum的變量。我們將使用checksum來確保計(jì)算的總和是正確的。數(shù)組和checksum位于內(nèi)存中,并通過一個元組返回,如(3)所示。
在(4)處,AsEnumarble(或者叫AsAsyncEnumarble)是一種擴(kuò)展方法,用于模擬由8KB塊組成的異步流( (6)處所示的BufferSize = 8000)。
通常,你不必繼承IAsyncEnumerable,但在上面的示例中,微軟這樣做是為了簡化演示,如(5)處所示。
(7)處是“foreach”,它從異步內(nèi)存流中拉取8KB的塊數(shù)據(jù)。當(dāng)消費(fèi)者(foreach代碼塊)準(zhǔn)備好接收更多數(shù)據(jù)時,拉取過程是順序進(jìn)行的,然后它從生產(chǎn)者(內(nèi)存流數(shù)組)中拉取更多的數(shù)據(jù)。最后,當(dāng)?shù)瓿珊?#xff0c;應(yīng)用程序?qū)ⅰ痗’的校驗(yàn)和與checksum進(jìn)行比較,如果它們匹配,就打印出“Checksums match!”,如(8)所示!
微軟演示的輸出窗口:
概要
我們已經(jīng)討論過Async Streams,它是一種出色的異步拉取技術(shù),可用于進(jìn)行生成多個值的異步計(jì)算。
Async Streams背后的編程概念是異步拉取模型。我們請求獲取序列的下一個元素,并最終得到答復(fù)。這與IObservable<T>的推送模型不同,后者生成與消費(fèi)者狀態(tài)無關(guān)的值。Async Streams提供了一種表示異步數(shù)據(jù)源的絕佳方法,例如,當(dāng)消費(fèi)者尚未準(zhǔn)備好處理更多數(shù)據(jù)時。示例包含了Web應(yīng)用程序或從數(shù)據(jù)庫中讀取記錄。
我已經(jīng)演示了如何生成異步枚舉數(shù)據(jù),并使用外部異步序列庫來消費(fèi)枚舉數(shù)據(jù)。我也演示了如何將這個功能用于從Web站點(diǎn)下載內(nèi)容。最后,我們看到了新的Async Streams語法和一個完整的示例,該示例是基于微軟的Build Demo Code(2018年5月7日至9日,西雅圖,華盛頓州)。
關(guān)于作者
Bassam Alugili?是STRATEC AG的高級軟件專家和數(shù)據(jù)庫專家。STRATEC是全球領(lǐng)先的全自動分析儀系統(tǒng)、實(shí)驗(yàn)室數(shù)據(jù)管理軟件和智能耗材的合作伙伴。
原文地址:http://www.infoq.com/cn/articles/Async-Streams
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結(jié)
以上是生活随笔為你收集整理的C# 8中的Async Streams的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .Net思想篇:为何我们需要思想大洗礼?
- 下一篇: 【送书活动】C# 程序员的自我修养