日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

多线程并发如何高效实现生产者/消费者?

發(fā)布時(shí)間:2023/12/4 编程问答 61 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程并发如何高效实现生产者/消费者? 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
【導(dǎo)讀】無(wú)需引入第三方消息隊(duì)列組件,我們?nèi)绾卫脙?nèi)置C#語(yǔ)法高效實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者對(duì)數(shù)據(jù)進(jìn)行處理呢?

在.NET Core共享框架(Share Framework)引入了通道(Channel),也就是說(shuō)無(wú)需額外通過(guò)NuGet包安裝,若為.NET Framework則需通過(guò)NuGet安裝,前提是版本必須是4.6+(包含4.6),查詢網(wǎng)上資料少的可憐,估計(jì)也有部分童鞋都沒(méi)聽(tīng)說(shuō)這玩意,所以接下來(lái)將通過(guò)幾篇文章詳細(xì)介紹其使用和底層具體實(shí)現(xiàn)原理

生產(chǎn)者/消費(fèi)者概念

生產(chǎn)者/消費(fèi)者這一概念,相信我們大家都不陌生,在日常生活無(wú)處不在、隨處可見(jiàn),其本質(zhì)可用一句話概括:具有多個(gè)連續(xù)步驟的工作流程。比如美團(tuán)外賣(mài)、再比如工廠里面的流水作業(yè)線、又比如線下實(shí)體快餐店等等

整個(gè)過(guò)程如同一條鏈,在這個(gè)鏈中每個(gè)步驟必須被完全隔離執(zhí)行,生產(chǎn)者產(chǎn)生“東西”,然后對(duì)其交由下一步驟進(jìn)行處理,最終到達(dá)消費(fèi)者。

上述敘述為一切抽象,我們回到軟件領(lǐng)域,在軟件中每一塊都在對(duì)應(yīng)的線程中執(zhí)行,以確保數(shù)據(jù)能得到正確處理,當(dāng)然,這也就包括跨線程共享數(shù)據(jù)可能引起的并發(fā)問(wèn)題。

此前我們利用內(nèi)置BlockingCollection實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者機(jī)制(詳見(jiàn):.NET/.NET Core實(shí)現(xiàn)簡(jiǎn)單消息隊(duì)列),但依然無(wú)法解決我們所面臨的兩個(gè)問(wèn)題:其一:阻塞問(wèn)題,其二:無(wú)任何基于Task的異步APi執(zhí)行異步操作

通過(guò)引入System.Threading.Channel庫(kù)則可以完美解決生產(chǎn)者/消費(fèi)者問(wèn)題,毫無(wú)疑問(wèn),線程安全是前提,性能測(cè)試有保證,異步提高吞吐量,配置選項(xiàng)夠靈活。目前來(lái)看,利用通道可能將是實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者的最終手段

通道(Channel)概念

名為通道還是比較形象,如同管道一樣,說(shuō)到底就是線程安全的隊(duì)列,既然是隊(duì)列,那么勢(shì)必涉及邊界問(wèn)題,通道類(lèi)型分為有界通道和無(wú)界通道

有界通道(Bounded Channel):對(duì)傳入數(shù)據(jù)具有指定容量,這也就意味著,若生產(chǎn)者產(chǎn)生的數(shù)據(jù)一旦達(dá)到容量空間,將不得不等待消費(fèi)者執(zhí)行完為生產(chǎn)者推送數(shù)據(jù)騰出額外可用空間

無(wú)界通道:(Unbounded Channel):對(duì)傳入數(shù)據(jù)無(wú)上限,這也就意味著生產(chǎn)者可以持續(xù)不斷發(fā)布數(shù)據(jù),以此希望消費(fèi)者能跟上生產(chǎn)者的節(jié)奏

到這里我們完全可得出一結(jié)論:因通道提供有界和無(wú)界選項(xiàng),所以內(nèi)置不可能利用并發(fā)隊(duì)列來(lái)實(shí)現(xiàn),一定是通過(guò)鏈表數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)隊(duì)列機(jī)制。

那么問(wèn)題來(lái)了,全部指定為無(wú)界通道豈不萬(wàn)事大吉,這個(gè)問(wèn)題想想就有問(wèn)題,雖說(shuō)無(wú)界通道為毫無(wú)上限,但計(jì)算機(jī)的系統(tǒng)內(nèi)存不是,無(wú)論是有界通道抑或是無(wú)界通道都會(huì)通過(guò)緩存區(qū)來(lái)存儲(chǔ)數(shù)據(jù)。所以選擇正確的通道類(lèi)型,取決于業(yè)務(wù)上下文。

那么問(wèn)題又來(lái)了,若創(chuàng)建有界通道,一旦達(dá)到容量限制,通道應(yīng)該如何處理呢?別擔(dān)心,這個(gè)事情則交由我們根據(jù)實(shí)際業(yè)務(wù)情況來(lái)處理,邊界通道容量滿模式(BoundedChannelFullMode)枚舉

?????Wait:?等待可用空間以完成寫(xiě)操作

?????DropNewest:?直接刪除并忽略通道中的最新數(shù)據(jù),以便為待寫(xiě)入數(shù)據(jù)騰出空間

?????DropOldest:?直接刪除并忽略通道中的最舊數(shù)據(jù),以便為待寫(xiě)入數(shù)據(jù)騰出空間

?????DropWrite:?直接刪除要寫(xiě)入的數(shù)據(jù)

我們通過(guò)如下簡(jiǎn)單3個(gè)步驟實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者

創(chuàng)建通道類(lèi)型

//創(chuàng)建通道類(lèi)型 public?static?class?Channel {//有界通道(指定容量)public?static?Channel<T>?CreateBounded<T>(int?capacity);//有界通道(指定容量、配置通道滿模式選項(xiàng)、配置讀(是否單個(gè)讀取)、寫(xiě)(是否單個(gè)寫(xiě)入)、是否允許延續(xù)同步操作)public?static?Channel<T>?CreateBounded<T>(BoundedChannelOptions?options);//無(wú)界通道public?static?Channel<T>?CreateUnbounded<T>();//無(wú)界通道(配置讀(是否單個(gè)讀取)、寫(xiě)(是否單個(gè)寫(xiě)入)、是否允許延續(xù)同步操作)public?static?Channel<T>?CreateUnbounded<T>(UnboundedChannelOptions?options); }

創(chuàng)建生產(chǎn)者

//向通道寫(xiě)入數(shù)據(jù)(生產(chǎn)者) public?abstract?class?ChannelWriter<T> {??protected?ChannelWriter();??//標(biāo)識(shí)寫(xiě)入通道完成,不再有數(shù)據(jù)寫(xiě)入public?void?Complete(Exception?error?=?null);??//嘗試向通道寫(xiě)入數(shù)據(jù),若被寫(xiě)入則返回true,否則為falsepublic?abstract?bool?TryWrite(T?item);//異步返回通道是否有可寫(xiě)入空間public?abstract?ValueTask<bool>?WaitToWriteAsync(CancellationToken?cancellationToken?=?default);//異步寫(xiě)入數(shù)據(jù)到通道public?virtual?ValueTask?WriteAsync(T?item,?CancellationToken?cancellationToken?=?default); }

創(chuàng)建消費(fèi)者

//從通道讀取數(shù)據(jù)(消費(fèi)者) public?abstract?class?ChannelReader<T> {protected?ChannelReader();public?virtual?Task?Completion?{?get;?}//異步讀取通道所有數(shù)據(jù)public?virtual?IAsyncEnumerable<T>?ReadAllAsync([EnumeratorCancellation]?CancellationToken?cancellationToken?=?default);//異步讀取通道每一項(xiàng)數(shù)據(jù)public?virtual?ValueTask<T>?ReadAsync(CancellationToken?cancellationToken?=?default);//嘗試向通道讀取數(shù)據(jù)public?abstract?bool?TryRead(out?T?item);//異步返回通道是否有可讀取數(shù)據(jù)public?abstract?ValueTask<bool>?WaitToReadAsync(CancellationToken?cancellationToken?=?default); }

有界通道(Channel)示例

一切已就緒,接下來(lái)我們通過(guò)示例重點(diǎn)演示有界通道,然后無(wú)界通道只不過(guò)是通道類(lèi)型不同,額外增加選項(xiàng)配置而已

首先我們創(chuàng)建消息數(shù)據(jù)類(lèi)

public?class?Message {public?Message(string?data){Data?=?data;}public?string?Data?{?get;?} }

然后為方便觀察生產(chǎn)者和消費(fèi)者數(shù)據(jù)打印情況,在控制臺(tái)中通過(guò)不同字體顏色來(lái)進(jìn)行區(qū)分,簡(jiǎn)單來(lái)個(gè)日志類(lèi)

public?static?class?Logger {private?static?readonly?object?obj?=?new?object();public?static?void?Log(string?text,?ConsoleColor?color?=?ConsoleColor.White){lock?(obj){Console.ForegroundColor?=?color;Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd?hh:mm:ss.ff}]?-?{text}");}} }

接下來(lái)定義生產(chǎn)者發(fā)布數(shù)據(jù)

public?class?Producer {private?readonly?ChannelWriter<Message>?_writer;private?readonly?int?_msgId;public?Producer(ChannelWriter<Message>?writer,?int?msgId){_writer?=?writer;_msgId?=?msgId;}public?async?Task?PublishAsync(Message?message,?CancellationToken?cancellationToken?=?default){await?_writer.WriteAsync(message,?cancellationToken);Logger.Log($"生產(chǎn)者?{_msgId}?>?發(fā)布消息?【{message.Data}】",?ConsoleColor.Yellow);} }

消費(fèi)者接收數(shù)據(jù),為模擬演示,延遲50毫秒作為消息處理時(shí)間

public?class?Consumer {private?readonly?ChannelReader<Message>?_reader;private?readonly?int?_msgId;public?Consumer(ChannelReader<Message>?reader,?int?msgId){_reader?=?reader;_msgId?=?msgId;}public?async?Task?BeginConsumeAsync(CancellationToken?cancellationToken?=?default){Logger.Log($"消費(fèi)者?{_msgId}?>?等待處理消息",?ConsoleColor.Green);try{await?foreach?(var?message?in?_reader.ReadAllAsync(cancellationToken)){Logger.Log($"消費(fèi)者?({_msgId})>?接收消息:?【{message.Data}】",?ConsoleColor.Green);await?Task.Delay(50,?cancellationToken);}}catch?(Exception?ex){Logger.Log($"消費(fèi)者?{_msgId}?>?被強(qiáng)迫停止:{ex}",?ConsoleColor.Green);}Logger.Log($"消費(fèi)者?{_msgId}?>?完成處理消息",?ConsoleColor.Green);} }

然后定義啟動(dòng)初始化生產(chǎn)者和消費(fèi)者任務(wù)數(shù)量

//啟動(dòng)指定數(shù)量的消費(fèi)者 private?static?Task[]?StartConsumers(Channel<Message>?channel,?int?consumersCount,?CancellationToken?cancellationToken) {var?consumerTasks?=?Enumerable.Range(1,?consumersCount).Select(i?=>?new?Consumer(channel.Reader,?i).BeginConsumeAsync(cancellationToken)).ToArray();return?consumerTasks; }//啟動(dòng)指定數(shù)量的生產(chǎn)者 private?static?async?Task?ProduceAsync(Channel<Message>?channel,int?messagesCount,int?producersCount,CancellationTokenSource?tokenSource) {var?producers?=?Enumerable.Range(1,?producersCount).Select(i?=>?new?Producer(channel.Writer,?i)).ToArray();int?index?=?0;var?tasks?=?Enumerable.Range(1,?messagesCount).Select(i?=>{index?=?++index?%?producersCount;var?producer?=?producers[index];var?msg?=?new?Message($"{i}");return?producer.PublishAsync(msg,?tokenSource.Token);}).ToArray();await?Task.WhenAll(tasks);Logger.Log("生產(chǎn)者發(fā)布消息完成,結(jié)束寫(xiě)入");channel.Writer.Complete();Logger.Log("等待消費(fèi)者處理");await?channel.Reader.Completion;Logger.Log("消費(fèi)者正在處理"); }

最后一步則是創(chuàng)建通道類(lèi)型(有界通道),啟動(dòng)生產(chǎn)者和消費(fèi)者線程任務(wù)并運(yùn)行

private?static?async?Task?Run(int?maxMessagesToBuffer,?int?messagesToSend,?int?producersCount,?int?consumersCount) {Logger.Log("***?開(kāi)始執(zhí)行?***");Logger.Log($"生產(chǎn)者數(shù)量?#:?{producersCount},?容量大小:?{maxMessagesToBuffer},?消息數(shù)量:?{messagesToSend},?消費(fèi)者數(shù)量?#:?{consumersCount}");var?channel?=?Channel.CreateBounded<Message>(maxMessagesToBuffer);var?tokenSource?=?new?CancellationTokenSource();var?cancellationToken?=?tokenSource.Token;var?tasks?=?new?List<Task>(StartConsumers(channel,?consumersCount,?cancellationToken)){ProduceAsync(channel,?messagesToSend,?producersCount,?tokenSource)};await?Task.WhenAll(tasks);Logger.Log("***?執(zhí)行完成?***"); }

接下來(lái)我們?cè)谥鞣椒ㄖ姓{(diào)用上述Run方法,指定有界通道容量為100,消費(fèi)數(shù)量為10,生產(chǎn)者和消費(fèi)者數(shù)量各為1,如下:

static?async?Task?Main(string[]?args) {await?Run(100,?10,?1,?1);Console.ReadLine(); }

根據(jù)業(yè)務(wù)上下文我們可指定有界通道滿模式以及其他對(duì)應(yīng)參數(shù)

var?channel?=?Channel.CreateBounded<Message>(new?BoundedChannelOptions(maxMessagesToBuffer) {FullMode?=?BoundedChannelFullMode.Wait,SingleReader?=?true,SingleWriter?=?true,AllowSynchronousContinuations?=?false });

關(guān)于無(wú)界通道沒(méi)啥太多要講解的地方,配置選項(xiàng)如下:

var?channel?=?Channel.CreateUnbounded<Message>(new?UnboundedChannelOptions() {SingleReader?=?true,SingleWriter?=?true,AllowSynchronousContinuations?=?false });

相比阻塞模型,通道提供異步支持以及靈活配置,更適合在實(shí)際業(yè)務(wù)場(chǎng)景中使用

關(guān)于通道大概就講解這么多,后續(xù)我們將分析通道實(shí)現(xiàn)原理,更詳細(xì)介紹請(qǐng)參看如下外鏈

An?Introduction?to?System.Threading.Channels

https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

總結(jié)

以上是生活随笔為你收集整理的多线程并发如何高效实现生产者/消费者?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。