.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow
在學習的過程中,看一些一線的技術文檔很吃力,而且考慮到國內那些技術牛人英語都不差的,要向他們看齊,所以每天下班都在瘋狂地背單詞,博客有些日子沒有更新了,見諒見諒
什么是TPL?
Task Parallel Library (TPL), 在.NET Framework 4微軟推出TPL,并把TPL作為編寫多線程和并行代碼的首選方式,但是,在國內,到目前為止好像用的人并不多。(TPL)是System.Threading和System.Threading.Tasks命名空間中的一組公共類型和API 。TPL的目的是通過簡化向應用程序添加并行性和并發性的過程來提高開發人員的工作效率,TPL動態地擴展并發度,以最有效地使用所有可用的處理器。通過使用TPL,您可以最大限度地提高代碼的性能,讓我們專注于程序本身而不用去關注負責的多線程管理。
出自: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl
為什么使用TPL?
在上面介紹了什么是TPL,可能大家還是云里霧里,不知道TPL的好處到底是什么。
我在youtube上找到了一個優秀的視頻,講述的是TPL和Thread的區別,我覺得對比一下,TPL的優勢很快就能體現出來,如果大家能打開的話建議大家一定要看看。
地址是:https://www.youtube.com/watch?v=No7QqSc5cl8
現如今,我們的電腦的CPU怎么也是2核以上,下面假設我的電腦是四核的,我們來做一個實驗。
使用Thread
代碼中,如果使用Thread來處理任務,如果不做特出的處理,只是thread.Start(),監測電腦的核心的使用情況是下面這樣的。
每一條線代表CPU某個核心的使用情況,明顯,隨著代碼Run起來,其實只有某一個核心的使用率迅速提升,其他核心并無明顯波動,為什么會這樣呢?
?
原來,默認情況下,操作系統并不會調用所有的核心來處理任務,即使我們使用多線程,其實也是在一個核心里面運行這些Thread,而且Thread之間涉及到線程同步等問題,其實,效率也不會明顯提高。
使用TPL
在代碼中,引入了TPL來處理相同的任務,再次監視各個核心的使用情況,效果就變得截然不同,如下。
可以看到各個核心的使用情況都同時有了明顯的提高。
說明使用TPL后,不再是使用CPU的某個核心來處理任務了,而是TPL自動把任務分攤給每個核心來處理,處理效率可想而知,理論上會有明顯提升的(為什么說理論上?和使用多線程一樣,各個核心之間的同步管理也是要占用一定的效率的,所以對于并不復雜的任務,使用TPL可能適得其反)。
實驗結果出自https://www.youtube.com/watch?v=No7QqSc5cl8
看了這個實驗講解,是不是理解了上面所說的這句。
TPL的目的是通過簡化向應用程序添加并行性和并發性的過程來提高開發人員的工作效率,TPL動態地擴展并發度,以最有效地使用所有可用的處理器。
?
所以說,使用TPL 來處理多線程任務可以讓你不必吧把精力放在如何提高多線程處理效率上,因為這一切,TPL 能自動地幫你完成。
TPL Dataflow?
TPL處理Dataflow是TPL強大功能中的一種,它提供一套完整的數據流組件,這些數據流組件統稱為TPL Dataflow Library,那么,在什么場景下適合使用TPL Dataflow Library呢?
官方舉的一個 栗子 再恰當不過:
例如,通過TPL Dataflow提供的功能來轉換圖像,執行光線校正或防紅眼,可以創建管道數據流組件,管道中的每個功能可以并行執行,并且TPL能自動控制圖像流在不同線程之間的同步,不再需要Thread 中的Lock。
TPL數據流庫由Block組成,Block是緩沖和處理數據的單元,TPL定義了三種最基礎的Block。
source blocks(System.Threading.Tasks.Dataflow.ISourceBlock <TOutput>),源塊充當數據源并且可以從中讀取。
target blocks(System.Threading.Tasks.Dataflow.ITargetBlock <TInput>),目標塊充當數據接收器并可以寫入。
propagator blocks(System.Threading.Tasks.Dataflow.IPropagatorBlock <TInput,TOutput>),傳播器塊充當源塊和目標塊,并且可以被讀取和寫入。它繼承自ISourceBlock <TOutput>和ITargetBlock <TInput>。
?
還有其他一些個性化的Block,但其實他們都是對這三種Block進行一些擴充,可以結合下面的代碼來理解這三種Block.
Code Show
1.source block 和 target block 合并成propagator block.
?
private IPropagatorBlock<string, Dictionary<int, string>> Process1(){var bufferBlock = new BufferBlock<Dictionary<int, string>>();var actionBlock = new ActionBlock<string>(x =>{Console.WriteLine($"Process1 處理中:{x}");Thread.Sleep(5000);var dic = new Dictionary<int, string> { { 0, x } };dic.Add(1, "Process1");bufferBlock.Post(dic);}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});actionBlock.Completion.ContinueWith(_ =>{Console.WriteLine($"Process1 Complete,State{_.Status}");bufferBlock.Complete();});return DataflowBlock.Encapsulate(actionBlock, bufferBlock);}?
可以看到,我定義了BufferBlock和ActionBlock,它們分別繼承于ISourceBlock 和 ITargetBlock ,所以說,他們其實就是源塊和目標塊,在new actionBlock()中傳入了一個Action<String>,該Action就是該Block所執行的任務。 最后,DataflowBlock.Encapsulate(actionBlock, bufferBlock)把源塊和目標塊合并成了一個傳遞塊。
2.TransformBlock
private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2(){var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>{Console.WriteLine($"Process2 處理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(2, "Process2");return dic;}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process2 Complete,State{_.Status}");});return block;}TransfromBlock繼承了IPropagatorBlock,所以它本身就是一個傳遞塊,所以它除了要處理出入數據,還要返回數據,所以給new TransformBlock()中傳入的是Func<TInput, TOutput>而不是Action<TInput>.
?
3.TargetBlock來收尾
private ITargetBlock<Dictionary<int, string>> Process3(){var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>{Console.WriteLine($"Process3 處理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(3, "Process3");Console.WriteLine("Dic中的內容如下:");foreach (var item in dic){Console.Write($"{item.Key}:{item.Value}||");}Console.WriteLine();}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});return actionBlock;}TargetBlock只能寫入并處理數據,不能讀取,所以TargetBlock適合作為Pipeline的最后一個Block。
?
4.控制每個Block的并行度
在在構造TargetBlock(包括其子類)的時候,可以傳入ExecutionDataflowBlockOptions參數,ExecutionDataflowBlockOptions對象里面有一個MaxDegreeOfParallelism屬性,通過改制,可以控制該Block的同時處理任務的數量(可以理解成線程數)。
new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism}?
5.構建Pipeline,連接Block
public Task Builder(){_startBlock = Process1();var process2Block = Process2();var process3Block = Process3();_startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });process3Block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process3 Complete,State{_.Status}");Console.WriteLine("所有任務處理完成");});return process3Block.Completion;}通過
ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOption)
方法,可以把Block連接起來,即構建Pipeline,當DataflowLinkOptions對象的PropagateCompletion屬性為true時,SorceBlock任務處理完成是,會把TargetBlock也標記為完成。
?
Block被標記為Complete 后,無法傳入新的數據了,即不能再處理新的任務了。
?
6.Pipeline的運行
public void Process(string[] inputs){if (inputs == null)return;foreach (var input in inputs){_startBlock.Post(input);}_startBlock.Complete();}Pipeline構建好后,我們只需要給第一個Block傳入數據,該數據就會在管道內流動起來了,所有數據傳入完成后,調用Block的Complete方法,把該Block標記為完成,就不可以再往里面Post數據了。
?
完整代碼如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow;namespace Tpl.Dataflow {public class Pipeline{IPropagatorBlock<string, Dictionary<int, string>> _startBlock;private int _maxDegreeOfParallelism;public Pipeline(int maxDegreeOfParallelism){_maxDegreeOfParallelism = maxDegreeOfParallelism;}public void Process(string[] inputs){if (inputs == null)return;foreach (var input in inputs){_startBlock.Post(input);}_startBlock.Complete();}public Task Builder(){_startBlock = Process1();var process2Block = Process2();var process3Block = Process3();_startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });process3Block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process3 Complete,State{_.Status}");Console.WriteLine("所有任務處理完成");});return process3Block.Completion;}private IPropagatorBlock<string, Dictionary<int, string>> Process1(){var bufferBlock = new BufferBlock<Dictionary<int, string>>();var actionBlock = new ActionBlock<string>(x =>{Console.WriteLine($"Process1 處理中:{x}");Thread.Sleep(5000);var dic = new Dictionary<int, string> { { 0, x } };dic.Add(1, "Process1");bufferBlock.Post(dic);}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});actionBlock.Completion.ContinueWith(_ =>{Console.WriteLine($"Process1 Complete,State{_.Status}");bufferBlock.Complete();});return DataflowBlock.Encapsulate(actionBlock, bufferBlock);}private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2(){var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>{Console.WriteLine($"Process2 處理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(2, "Process2");return dic;}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process2 Complete,State{_.Status}");});return block;}private ITargetBlock<Dictionary<int, string>> Process3(){var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>{Console.WriteLine($"Process3 處理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(3, "Process3");Console.WriteLine("Dic中的內容如下:");foreach (var item in dic){Console.Write($"{item.Key}:{item.Value}||");}Console.WriteLine();}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});return actionBlock;}} }?
Main方法如下:
static void Main(string[] args){Console.WriteLine("請輸入管道并發數:");if (int.TryParse(Console.ReadLine(), out int max)){var pipeline = new Pipeline(max);var task = pipeline.Builder();pipeline.Process(new[] { "碼", "農", "阿", "宇" });task.Wait();Console.ReadKey();}}?
測試運行如圖:
我來解釋一下,為什么是這么運行的,因為把管道的并行度設置為2,所以每個Block可以同時處理兩個任務,所以,如果給管道傳入四個字符 ,每個字符作為一個任務,假設傳入? “碼農阿宇”四個任務,會時這樣的一個過程…..
?
該項目Github地址: https://github.com/liuzhenyulive/Tpl-Dataflow-Demo
參考文獻:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
碼字不易,如果對您有用,歡迎推薦和關注,謝謝!
總結
以上是生活随笔為你收集整理的.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BZOJ1563:[NOI2009]诗人
- 下一篇: mvc 过滤器