日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

Pipelines - .NET中的新IO API指引(二)

發(fā)布時間:2023/12/4 asp.net 66 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Pipelines - .NET中的新IO API指引(二) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文:Pipelines - a guided tour of the new IO API in .NET, part 2

作者:marcgravell

在上一章,我們討論了以往的StreamAPI中存在的一些問題,并且介紹了Pipe,PipeWriter,PipeReader?等API,研究如何寫出一個Pipe?并且從中消費數(shù)據(jù),我們也討論了FlushAsync()?和ReadAsync()?是如何協(xié)同保證兩端的工作,從而解決“空”和“滿”的場景——在沒有數(shù)據(jù)時掛起reader,并在數(shù)據(jù)到來時恢復它;在寫入快過讀取(即pipe滿載)時掛起writer,并在reader追上后恢復它;并且我們也在線程模型的層面上探討了什么是“掛起”。

在這章,我們將會研究pipelines的內(nèi)存模型:數(shù)據(jù)實際上存在于哪里?我們也會開始著手研究如何在現(xiàn)實場景中使用pipeline以滿足真實需求。

內(nèi)存模型:我的數(shù)據(jù)在哪里?

在上一章,我們講了pipe如何管理所有的緩沖區(qū),允許writer通過?GetMemory()和GetSpan()請求緩沖區(qū),隨后通過ReadAsync()中的?.Buffer?將提交后的數(shù)據(jù)暴露給reader——reader取得的數(shù)據(jù)是一個?ReadOnlySequence<byte>,即全部數(shù)據(jù)其中的一些片段。

那么其中究竟發(fā)生了什么?

每一個Pipe實例都有一個引用指向MemoryPool<byte>——一個System.Memory中的新東西,顧名思義,它創(chuàng)建了一個內(nèi)存池。在創(chuàng)建Pipe的時候,你可以在選項中指定一個專門的?MemoryPool<byte>,但是在默認情況下(我猜也是大多數(shù)情況下)——應(yīng)該是使用一個應(yīng)用級別共享的 (MemoryPool<byte>.Shared) 內(nèi)存池。

MemoryPool<byte>?的概念是非常開放的。其默認的實現(xiàn)是簡單地使用ArrayPool<byte>.Shared(應(yīng)用級別的數(shù)組池),在需要的時候租借數(shù)組,并在使用完后歸還。這個?ArrayPool<T>?使用了?WeakReference來實現(xiàn),所以池化的數(shù)組在內(nèi)存有壓力時是可以回收的,但是,當你請求GetMemory(someSize)?或者?GetSpan(someSize)時,它并不是簡單地向內(nèi)存池請求“someSize”,相反,它在內(nèi)部追蹤了一個“片段(segment)”,一個新“片段”將是(默認情況下,可以通過配置改變)someSize和2048字節(jié)中的最大值,這樣在請求一個大小可觀的內(nèi)存時就意味著我們的系統(tǒng)不會充滿著許多小數(shù)組,而后者會對GC造成顯著碰撞。當你在writer中?Advance(bytesWritten),它:

  • 移動一個表達當前已使用多少片段的內(nèi)部計數(shù)器

  • 更新reader的“備讀(available to be read)”鏈的末端;如果我們剛剛對一個空片段的第一個字節(jié)進行了寫入,這意味著將會向鏈中增加一個新片段,否則,它意味著當前鏈的結(jié)尾標志被增加(后移)

這就是我們從?ReadAsync()中獲取到的“備讀”鏈;而當我們在reader中?AdvanceTo?——在整個片段都被消費后,pipe會將這些片段送回內(nèi)存池。在那里,它們可以被多次復用。并且作為上述兩點導致的直接結(jié)果,我們可以看到在大多數(shù)情況下(即使在writer中多次調(diào)用Advance?),我們最終會在reader中發(fā)現(xiàn)一個單獨的片段;而如果是在片段邊界處,或reader落后于writer,數(shù)據(jù)開始累積的情況下,會有多個片段。

只有使用默認池才能:

  • 我們不用在每次調(diào)用GetMemory()?/?GetSpan()時都要分配內(nèi)存

  • 我們不需要每次GetMemory()?/?GetSpan()都產(chǎn)生一個單獨的數(shù)組——通常我們只是獲得同樣的“片段”中的某個不同的范圍

  • 只使用少量的大緩沖數(shù)組

  • 它們不需要大量的類庫代碼,就可以自動回收

  • 當不再需要時,它們可以被GC回收

這也解釋了為什么可以在GetMemory()?/?GetSpan()中請求少量空間再在之后檢查其大小:我們可以訪問當前段的剩下未使用的部分。這意味著:一個大小為2048的片段,在之前的寫入中用掉了200字節(jié)——即使我們只請求5字節(jié),我們也可以看到我們還剩下1848字節(jié)可供使用,或者更多——記住:從ArrayPool.Shared?中獲取到的數(shù)組也是一個“至少這么大”的操作。

零復制緩沖區(qū)

在此還有需要注意的地方是,我們獲取數(shù)據(jù)緩沖的時候,沒有進行任何數(shù)據(jù)的復制。writer申請一個緩沖區(qū),然后第一次寫入數(shù)據(jù)到需要的位置。這就成了writer和reader之間的緩沖區(qū),無需復制數(shù)據(jù)。而如果reader當前無法處理完所有的數(shù)據(jù),它能夠通過顯示聲明其“未被消費”的方式將數(shù)據(jù)放回pipe。這樣無需為reader維護一個單獨的數(shù)據(jù)積壓處(backlog),而種情況這在使用Stream的協(xié)議處理代碼中是非常常見的。

正是這種功能間的組合使得pipeline代碼在內(nèi)存層面顯得非常友好。你可以用Stream做到所有的這些,但是卻需要大量令人痛苦的易出錯的代碼去實現(xiàn),甚至需要更多,如果你想做好的話——并且你幾乎必須去為每個場景單獨地實現(xiàn)它。Pipelines讓良好的內(nèi)存處理變?yōu)槟J的簡單的途徑——落入成功之中(譯注:即如自由落體一般實現(xiàn)成功的代碼)

更多奇特的內(nèi)存池

你并不受限于使用我們之前討論的內(nèi)存池;你可以實現(xiàn)你自己的自定義內(nèi)存池!默認內(nèi)存池的優(yōu)點在于它很簡單。尤其是在我們是否100%完美地返回每個片段并不重要的情況下——如果我們以某種方式丟棄某個pipe,最壞的情況會是GC將在某個時刻回收掉被丟棄的片段。它們不會回到池中,但那沒關(guān)系。

但是,你可以做很多有趣的東西。想象一下,比如一個?MemoryPool<byte>承載巨量的內(nèi)存——通過一些非常大的數(shù)組得到的托管內(nèi)存,或是通過?Marshal.AllocHGlobal?獲得的非托管內(nèi)存(注意?Memory?和?Span?并不受限于數(shù)組——它們需要的不過是某種連續(xù)內(nèi)存),按需使用這些巨大的內(nèi)存塊。這有很大的潛在場景,但是它會使片段的可靠回收變得更加重要。大多數(shù)系統(tǒng)不應(yīng)該這么做,但是提供這樣的靈活性是好的。

在真實系統(tǒng)中有用的pipes

我們在第一部分中用的例子,是一個讀寫均在同一代碼的單獨Pipe。很明顯這不是個真實場景(除非我們是在試圖模擬一個"echo"服務(wù)器),所以我們在更真實的場景中可以做什么呢?首先,我們需要把我們的pipelines連接到什么東西上。我們通常并不想單獨地使用pipe,相反,我們希望可以有一個結(jié)合一個普遍的系統(tǒng)或API使用的pipe。所以,來讓我們開始看看接下來會是什么樣子吧。

在這里,我們需要注意:發(fā)布于.NET Core 2.1的pipelines不包括任何終端實現(xiàn)。這意味著:?Pipe?雖然存在,但是在框架內(nèi)沒有提供任何的與現(xiàn)有系統(tǒng)的實際連接——就像提供了抽象的?Stream?基類,卻沒有?FileStream,,NetworkStream等。是的,這聽起來讓人感到失望,但是這只是因為時間所限,不要慌!現(xiàn)在在進行一些關(guān)于它們應(yīng)該以哪種優(yōu)先級實現(xiàn)的“活躍的”討論。并且現(xiàn)在有一些社區(qū)貢獻來補足那些最為明顯的缺陷。

一旦我們處于那些場景,我們可能會問:“將pipelines連接到另一個數(shù)據(jù)后端需要什么?”

也許將一個pipe連接到一個?Stream會是一個不錯的開頭。我知道你在想:“但是Marc,你在上一章你不遺余力地再說?Stream?有多么糟糕!”。我沒有改變我的看法,它不一定是完美的——對于那些特定場景的Stream實現(xiàn)(比如NetworkStream或FileStream)我們可以有一個專門的基于pipelines的終端直接與那些服務(wù)以最小的中轉(zhuǎn)代價進行通訊;但是這是一個有用的起步:

  • 它使我們可以立即訪問到巨量的API——任何可以通過Stream暴露數(shù)據(jù),或任何通過封裝的streams作為中間層的API(加密、壓縮等)

  • 它將所有老舊的StreamAPI隱藏在一個明確清晰的表層下

  • 它帶來了幾乎所有我們之前提到過的優(yōu)點

所以,讓我們開始吧!我們首先要思考的是:這里的方向是什么?就像剛才提到的一樣,Stream是模糊不清的——可能只讀,只寫,或可讀可寫。來假設(shè)我們想解決的是最通常的問題:一個可讀可寫表現(xiàn)為雙工行為的stream——這可以讓我們訪問如sockets(通過NetworkStream)之類的東西。這意味著我們實際上將會需要兩個pipe——一個用來輸入,一個用來輸出。Pipelines通過明確地聲明IDuplexPipe接口來幫助我們指明道路。這是一個非常簡單的接口,數(shù)據(jù)傳輸給IDuplexPipe就像傳輸給兩個pipe的端點一樣——一個標記為"in",一個標記為"out":

interface IDuplexPipe{PipeReader Input { get; }PipeWriter Output { get; }}

我們接下來想要做的是創(chuàng)建一個類來實現(xiàn)?IDuplexPipe,不過其內(nèi)部使用了兩個Pipe實例:

  • 一個Pipe會是輸出緩沖區(qū)(從消費者的角度來看),它將會在調(diào)用者寫入Output時被填充——并且我們將會用一個循環(huán)來消費這個Pipe并且將數(shù)據(jù)推入底層Stream(被用來寫入網(wǎng)絡(luò),或者其它任何stream可以寫入的)

  • 一個Pipe將會是輸入緩沖區(qū)(從消費者的角度來看),我們將有一個循環(huán)來從底層Stream讀取數(shù)據(jù),并將其推入Pipe,它將會在調(diào)用者從Input中讀取時排出

這個方法可以立即解決普遍影響著那些使用Stream的人一大堆的問題:

  • 我們現(xiàn)在有了input/output緩沖區(qū),用于從讀/寫調(diào)用中解耦stream訪問,而不用添加BufferedStream或是其它類似的防止數(shù)據(jù)碎片的功能(對于寫入代碼來說),并且這將會使我們在處理數(shù)據(jù)時很方便去接收更多數(shù)據(jù)(特別是對于讀取代碼來說,這樣我們不用在請求更多數(shù)據(jù)時保持暫停)

  • 如果調(diào)用代碼的寫入,快過stream的Write可以處理的程度,背壓特性將會展現(xiàn)出來,對調(diào)用代碼進行節(jié)流,這樣我們不會被充滿未發(fā)送數(shù)據(jù)的巨大緩沖區(qū)所終結(jié)

  • 如果stream的Read超過了消費這些數(shù)據(jù)的調(diào)用代碼,背壓特性也會在這里出場,對我們的stream讀取循環(huán)進行節(jié)流,這樣我們不會被充滿未處理數(shù)據(jù)的巨大緩沖區(qū)所終結(jié)

  • 讀取和寫入代碼都會受益于我們之前所討論的內(nèi)存池的所有優(yōu)點

  • 調(diào)用代碼從來不用擔心數(shù)據(jù)的后備存儲(未完成幀)等——pipe去解決它

那么它看起來會是什么樣?

基本上,我們需要做的就是這樣:

class StreamDuplexPipe : IDuplexPipe{Stream _stream;Pipe _readPipe, _writePipe;public PipeReader Input => _readPipe.Reader;public PipeWriter Output => _writePipe.Writer;// ... more here}

注意我們有兩個不同的pipe;調(diào)用者獲取每個pipe的一個端點——然后我們的代碼將會操作每個pipe的另一個端點。

對pipe進行泵送(Pumping)

那么我們與stream交互的代碼是什么樣的呢?像之前說過的那樣,我們需要兩個方法。首先——很簡單——一個循環(huán),從_stream中讀取數(shù)據(jù)并且將其推入_readPipe,然后被調(diào)用代碼所消費;這個方法的核心類似這樣:

while (true){// note we'll usually get *much* more than we ask forvar buffer = _readPipe.Writer.GetMemory(1);int bytes = await _stream.ReadAsync(buffer);_readPipe.Writer.Advance(bytes);if (bytes == 0) break; // source EOFvar flush = await _readPipe.Writer.FlushAsync();if (flush.IsCompleted || flush.IsCanceled) break;}

這個循環(huán)向pipie請求一個緩沖區(qū),然后用?netcoreapp2.1?中Stream.ReadAsync?的新重載接收一個?Memory<byte>?來填充緩沖區(qū)——我們一會兒討論如果你現(xiàn)在沒有一個能接收?Memory<byte>的API該怎么辦。當讀取完成后,它使用Advance向pipe提交這個數(shù)量的字節(jié),然后它在pipe上調(diào)用?FlushAsync()?來(如果需要的話)喚醒reader,或者在背壓減輕時暫停寫循環(huán)。注意我們還需要檢查Pipe的?FlushAsync()的結(jié)果——它可以告訴我們pipe的消費者已經(jīng)告知其已經(jīng)讀取完了所有想要的數(shù)據(jù)(Iscompleted),或者pipe本身被關(guān)閉(IsCanceled)。

注意在這兩種情況下,我們都希望確保在此循環(huán)退出時告訴管道,這樣我們就不會最終在沒有數(shù)據(jù)到來時永遠在調(diào)用端等待下去,有時會發(fā)生意外,有時在調(diào)用?_stream.ReadAsync?(或其它方法)可能會有異常拋出,所以最好是利用try/finally:

Exception error = null;try{// our loop from the previous sample}catch(Exception ex) { error = ex; }finally { _readPipe.Writer.Complete(error); }

如果你愿意的話,你可以使用兩個?Complete?——一個在try末尾(成功時),一個在catch中(失敗時)。

我們需要的第二個方法會比較復雜。我們需要一個循環(huán)來從_writePipe中消費數(shù)據(jù),然后將其推入_stream。核心代碼會像這樣:

while (true){var read = await _writePipe.Reader.ReadAsync();var buffer = read.Buffer;if (buffer.IsCanceled) break;if (buffer.IsEmpty && read.IsCompleted) break;// write everything we got to the streamforeach (var segment in buffer){await _stream.WriteAsync(segment);}_writePipe.AdvanceTo(buffer.End);await _stream.FlushAsync(); ? ?}

這會等待一些數(shù)據(jù)(可能在多個緩沖區(qū)里),然后進行一些退出判斷檢查;像之前一樣,我們可以在IsCanceled時放棄,但是下一個檢查會比較微妙:我們不希望只因為producer表示它們已經(jīng)寫入了所有想要的數(shù)據(jù)(Iscompleted)就停止寫入,不然我們也許會丟失它們末尾幾段數(shù)據(jù)——我們需要繼續(xù)直到我們已經(jīng)寫入了它們所有的數(shù)據(jù),直到buffer.IsEmpty。這是個簡化后的例子,因為我們一直寫入所有數(shù)據(jù)——我們之后會看到更復雜的例子。一旦我們有了數(shù)據(jù),我們按順序?qū)⒚總€非連續(xù)緩沖區(qū)寫入stream中——因為Stream一次只能寫入一個緩沖區(qū)(同樣,我使用的是netcoreapp2.1中的重載,接受ReadOnlyMemory<byte>參數(shù),但是我們不限于此)。一旦它寫完了緩沖區(qū),它告訴pipe我們已經(jīng)消費完了所有數(shù)據(jù),然后刷新(flush)底層的Stream。

在“真實”代碼中,我們也許希望更積極地優(yōu)化從而減少刷新底層stream,直到我們知道再也不會有可讀取的數(shù)據(jù),那么也許在_writePipe.Reader.ReadAsync()之外我們可以使用_writePipe.Reader.TryRead(...)。這個方法的工作方式類似于ReadAsync(),但是會保證同步返回——這可以用來測試“在我忙的時候writer是否附加了什么?”。但是上面的內(nèi)容已經(jīng)講述了這一點。

另外,像之前一樣,我們也許需要添加一個?try/finally,這樣在我們退出時總是會調(diào)用_writePipe.Reader.Complete()。

我們可以使用?PipeScheduler?來啟動這兩個泵(pumps),這會確保它們在預期環(huán)境中運行,然后我們的循環(huán)開始泵送數(shù)據(jù)。我們要添加一些格外的內(nèi)容(我們可能需要一種機制來?Close()/Dispose()?底層stream等)——但是像你所看到的,將?IDuplexPipe?連接到?jīng)]有pipeline設(shè)計的源并不需要是一項艱巨的任務(wù)。

這是我之前做的...

我已經(jīng)將上面的內(nèi)容簡化了一些(說真的,不是太多),以便讓它適合討論,但是你可能仍然不應(yīng)該從這里復制粘貼代碼來嘗試讓它工作。我并沒有聲稱它們是適用于所有情況的完美解決方案,但是作為StackExchange.Redis 2.0版工作的一部分,我們實現(xiàn)了一系列pipelines的綁定放在nuget上——毫無創(chuàng)意地命名為?Pipelines.Sockets.Unofficial(nuget,github(github.com/mgravell/Pip),它包括了:

  • 將雙工的Stream轉(zhuǎn)換為?IDuplexPipe?(就像上面說的)

  • 將只讀Stream轉(zhuǎn)換為PipeReader

  • 將只寫Stream轉(zhuǎn)換為PipeWriter

  • 將?IDuplexPipe?轉(zhuǎn)換為雙工的Stream

  • 將PipeReader轉(zhuǎn)換為只讀Stream

  • 將PipeWriter轉(zhuǎn)換為只寫Stream

  • 將Socket直接轉(zhuǎn)換成IDuplexPipe(不經(jīng)過NetworkStream)

前六個在?StreamConnection的靜態(tài)方法中,最后一個在SocketConnection里。

StackExchange.Redis?牽涉著大量Socket工作,所以我們對如何將pipeline連接到socket上非常感興趣,對于沒有TLS的redis連接,我們可以直接將我們的Socket連接到pipeline:

  • Socket???SocketConnection

對于需要TLS的redis連接(比如云redis提供商),我們可以這樣連接:

  • Socket???NetworkStream???SslStream???StreamConnection

所有這兩種配置都是一個Socket在其中一端,一個IDuplexPipe在另一端,它開始展示我們?nèi)绾螌ipeline作為更復雜系統(tǒng)的一部分。也許更重要的是,它為我們在未來實施改變提供了空間。將來有可能的例子:

  • Tim Seaward一直在折騰Leto,它提供了不需要?SslStream?,直接用IDuplexPipe實現(xiàn)TLS的能力(并且不需要stream逆變器)

  • 在 Tim Seaward,David Fowler 和Ben Adams之間,有一系列直接實現(xiàn)pipelines而不用托管sockets的實驗性/正在進行的網(wǎng)絡(luò)層工作,包括"libuv","RIO"(Registerd IO),和最近的"magma"——它將整個TCP棧推入用戶代碼從而減少系統(tǒng)調(diào)用。

看這個空間如何發(fā)展將會非常有趣!

但是我當前的API不會使用?Span?或者?Memory!

當在寫將數(shù)據(jù)從pipe中泵送到其它系統(tǒng)(比如一個Socket)時,很有可能你會遇到不接收?Span或者?Memory的API。不要慌,這沒有大礙,你依然可以有很多種變通方案使其變得更……傳統(tǒng)。

在你有一個?Memory?或者?ReadOnlyMemory時,第一個技巧是MemoryMarshal.TryGetArray(...)。它接收一個memory并且嘗試獲取一個ArraySegment?,它用一個T[]vector和一個int偏移/計數(shù)對描述相同的數(shù)據(jù)。顯然,這只有在這塊內(nèi)存是基于一個vector時才能用,而情況并非總是如此,所以這可能會在異種的內(nèi)存池上失敗。我們第二個解決辦法時MemoryMarshal.GetReference(...),它接受一個span然后返回一個原始數(shù)據(jù)起點的引用(實際上是一個“托管指針”,又叫做?ref T)。一旦我們有了一個?ref T,我們可以用unsafe語法來獲得一個這個數(shù)據(jù)的非托管指針,在這種情況下會有用:

Span<byte> span = ...fixed(byte* ptr = &MemoryMarshal.GetReference(span)){// ...}

即使span的長度是零,你依然可以這么做,其會返回一個第0項將會存在的位置,而且甚至在使用defaultspan即根本沒有實際后備內(nèi)存的時候,也可以這么使用。后面這個有一點需要注意,因為ref T通常不被認為會是null,但是在這里它卻是了。實際上,只要你不去嘗試對這種空引用進行解引用,不會有什么問題。如果你使用fixed將其轉(zhuǎn)換為一個非托管指針,你會得到一個空(零)指針,這相對來說更合理(并且在一些P/Invoke場景中會有用),MemoryMarshal?本質(zhì)上是unsafe?代碼的同義詞,即使你調(diào)用的那段代碼并沒有使用unsafe?關(guān)鍵字。使用它是完全有效的,但是如果不恰當?shù)厥褂盟?#xff0c;它可能會坑到你——所以小心就是了。

Pipe的應(yīng)用端代碼是什么樣的?

OK,我們有了IDuplexPipe,并且我們也看到了如何將兩個pipe的“業(yè)務(wù)端”連接到你選擇的后端數(shù)據(jù)服務(wù)。現(xiàn)在,我們在應(yīng)用代碼中如何使用它?

按照我們上一章的例子,我們將從?IDuplexPipe.Output?中把PipeWriter傳遞給我們的出站代碼,從?IDuplexPipe.Input?中把?PipeReader?傳遞給我們的入站代碼。

出站代碼相當簡單,并且通常是需要直接從基于Stream的代碼移植成基于PipeWriter的代碼。關(guān)鍵的區(qū)別還是那樣,即你不再手動控制緩沖區(qū)。下面是一個典型的實現(xiàn):

ValueTask<bool> Write(SomeMessageType message, PipeWriter writer){// (this may be multiple GetSpan/Advance calls, or a loop,// depending on what makes sense for the message/protocol)var span = writer.GetSpan(...);// TODO: ... actually write the messageint bytesWritten = ... // from writingwriter.Advance(bytesWritten);return FlushAsync(writer);}private static async ValueTask<bool> FlushAsync(PipeWriter writer){// apply back-pressure etcvar flush = await writer.FlushAsync();// tell the calling code whether any more messages// should be writtenreturn !(flush.IsCanceled || flush.IsCompleted);}

Write?的第一部分是我們的業(yè)務(wù)代碼,我們需要把數(shù)據(jù)從writer寫入到緩沖區(qū);通常這會多次調(diào)用?GetSpan(...)?和?Advance()。當我們寫完了數(shù)據(jù),我們可以flush它從而保證啟動泵送并且應(yīng)用背壓控制。對于那些非常大的消息體,我們也可以在中間點flush,但是對于大多數(shù)場景:一個消息flush一次足夠了。

如果你好奇為什么我將FlushAsync?分割到不同的代碼中:那是因為我想await?FlushAsync的結(jié)果來檢查退出條件,所以它需要在一個async?方法里,在這里最有效率的訪問內(nèi)存方式是通過?Span<byte>?API,Span<byte>?是一個?ref struct?類型,因此我們不能在異步方法中將 Span<byte> 作為局部變量使用。一個實用的辦法是簡單地分割代碼,這樣一個方法做?Span<byte>?工作,一個方法做async方面的工作。

發(fā)散一下:異步代碼、同步熱路徑和異步機制開銷

async?/?await?中引入的機制(譯注:指ValueTask,machinery應(yīng)該是和async狀態(tài)機關(guān)聯(lián)的詞,但是我并不知道怎么翻譯合適,只好翻譯成機制了)非常棒,但是它仍然會是一個會產(chǎn)生驚人棧開銷的工作——你可以從?sharplab.io?中看到——看看OurCode.FlushAsync?中生成的機制——和整個?struct <FlushAsync>d__0。現(xiàn)在,這些代碼并不是很糟糕——它非常努力地嘗試在同步路徑上避免內(nèi)存分配——但是沒有必要。

這里有兩種方法可以顯著地改善它;一個是壓根不去?await?,通常如果?await?是在方法中地最后一行并且我們不需要去處理結(jié)果:不去?await?——只要去除async然后return這個task——完成或者未完成。在這里我們沒辦法這樣做,因為我們需要去檢查返回的狀態(tài),但是我們可以通過檢查這個task是否已經(jīng)完成來對成功的結(jié)果進行優(yōu)化(通過?.IsCompletedSuccessfully?——如果它已經(jīng)結(jié)束但是有錯誤,我們?nèi)匀恍枰褂胊wait來讓異常可以正確表現(xiàn)出來)。如果它是成功完成的,我們可以請求到.Result。所以我們也可以將FlushAsync?寫成這樣:

private static ValueTask<bool> Flush(PipeWriter writer){bool GetResult(FlushResult flush)// tell the calling code whether any more messages// should be written=> !(flush.IsCanceled || flush.IsCompleted);async ValueTask<bool> Awaited(ValueTask<FlushResult> incomplete)=> GetResult(await incomplete);// apply back-pressure etcvar flushTask = writer.FlushAsync();return flushTask.IsCompletedSuccessfully? new ValueTask<bool>(GetResult(flushTask.Result)): Awaited(flushTask);}

這在大多數(shù)情況(同步完成)下完全避免了async/await?機制——如我們再次在?sharplab.io中看到的一樣。我要強調(diào):如果代碼是經(jīng)常(或僅僅)進行真正的異步行為時,這樣做是完全沒有必要的;它只對于那些結(jié)果通常(或僅僅)會同步地產(chǎn)生時才有幫助。

(譯注:對于ValueTask的"hot path"場景的使用,這里有個視頻講過一些,以及其它一些.NET中新的優(yōu)化性能的方法:?Adam Sitnik - State of the .NET Performance)

那么Reader呢?

就像我們多次看到的一樣,reader總是稍微復雜一些——我們無從得知一個單獨的“讀”操作是否會準確包含一個入站消息,我們也許需要開啟循環(huán)直到我們獲取到了所有所需的數(shù)據(jù),并且我們也許需要推回一些多余的數(shù)據(jù)。因此,讓我們假設(shè)我們想要消費某種單一的消息:

async ValueTask<SomeMessageType> GetNextMessage(PipeReader reader,CancellationToken cancellationToken = default){while (true){var read = await reader.ReadAsync(cancellationToken);if (read.IsCanceled) ThrowCanceled();// can we find a complete frame?var buffer = read.Buffer;if (TryParseFrame(buffer,out SomeMessageType nextMessage,out SequencePosition consumedTo)){reader.AdvanceTo(consumedTo);return nextMessage;}reader.AdvanceTo(buffer.Start, buffer.End);if (read.IsCompleted) ThrowEOF(); ? ? ? ?}}

這里我們從pipe中獲取了一些數(shù)據(jù),進行退出檢查(比如取消)。然后我們嘗試辨識一個消息,這是什么意思取決于你具體的代碼——它可以是:

  • 從緩沖區(qū)中尋找某些特定的值,比如一個ASCII行尾,然后把所有到這里的數(shù)據(jù)當作一個消息(丟棄行尾)

  • 解析一個定義良好的二進制幀頭,獲取其內(nèi)容長度,通過檢查獲取這樣長度的數(shù)據(jù)然后處理

  • 或者其它你需要的!

如果我們能夠辨識到一個消息,我們可以告訴pipe令其丟棄我們已經(jīng)消費過的數(shù)據(jù)——通過?AdvanceTo(consumedTo),在這里使用我們自己的幀解析代碼告訴我們消費了多少。如果我們沒能辨識出一個消息,我們要做的第一件事就是告訴pipe我們什么也沒消費,盡管我們嘗試讀取了所有數(shù)據(jù)——通過?reader.AdvanceTo(buffer.Start, buffer.End)。在這里會有兩種可能:

  • 我們還沒有獲得足夠的數(shù)據(jù)

  • pipe已經(jīng)死亡,我們再也不會獲得足夠的數(shù)據(jù)

我們通過read.IsCompleted?檢查了這些,在第二種情況時報告錯誤;否則我們繼續(xù)循環(huán),等待更多數(shù)據(jù)。那么剩下的,就是我們的幀解析——我們已經(jīng)把復雜的IO管理降低成了簡單的操作;比如,如果我們的消息是以行標記分隔:

private static bool TryParseFrame(ReadOnlySequence<byte> buffer,out SomeMessageType nextMessage,out SequencePosition consumedTo){// find the end-of-line markervar eol = buffer.PositionOf((byte)'\n');if (eol == null){nextMessage = default;consumedTo = default;return false;}// read past the line-endingconsumedTo = buffer.GetPosition(1, eol.Value);// consume the datavar payload = buffer.Slice(0, eol.Value);nextMessage = ReadSomeMessageType(payload);return true;}

這里PositionOf?嘗試獲取第一個行標記的位置。如果一個也找不到,我們就放棄,否則我們將consumedTo?設(shè)為”行標記+1“(即我們會消費行標記),然后我們分割我們的緩沖區(qū)來創(chuàng)建一個子集,表示不包括行標記的內(nèi)容,這樣我們就可以解析了。最終,我們報告成功,并且慶祝我們可以簡單地解析Linux風格的行尾。

這里的重點是什么?

用這些和大多數(shù)最簡單最簡樸的Stream版本(沒有任何nice的特性)非常相似的最少量的代碼,我們的應(yīng)用現(xiàn)在有了一個reader和writer,利用廣泛的能力確保高效和有效的處理。你可以用Stream來做所有的這些事,但是這樣真的、真的很難去做好做可靠。通過將所有的這些特性集成進框架,許多代碼都可以受益于這一單獨的實現(xiàn)。并且它也給了那些直接在pipeline API上開發(fā)并且對自定義pipeline端點和修飾感興趣的人更多的未來空間。

總結(jié)

在這節(jié),我們研究了pipeline使用的內(nèi)存模型和其如何幫助我們避免分配內(nèi)存,然后我們研究了怎樣才可以將pipeline與現(xiàn)有的API和系統(tǒng)(如Stream)進行交互——并且我們介紹了?Pipelines.Sockets.Unofficial?這樣的可用的工具庫。我們研究了在不支持 span/memory 代碼的API上集成它們的可用選項,最終我們展示了和pipeline交互的真正的調(diào)用代碼是什么樣子的(并且簡單地介紹了如何優(yōu)化那些通常是同步的async代碼)——展示了我們的應(yīng)用代碼會是什么樣子。在最后一部分,我們將會研究如何在開發(fā)現(xiàn)實中的庫,比如StackExchange.Redis,將我們學到的這些知識點聯(lián)系起來——討論我們在代碼里需要解決哪些復雜點,而pipeline又如何將它們變得簡單。

相關(guān)文章:

  • System.IO.Pipelines: .NET高性能IO

  • Pipelines - .NET中的新IO API指引(一)

原文地址:?https://zhuanlan.zhihu.com/p/39969692


.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com

總結(jié)

以上是生活随笔為你收集整理的Pipelines - .NET中的新IO API指引(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。