如何运用并行编程Parallel提升任务执行效率
本文來自小易,【DoTNET技術(shù)圈】公眾號已獲得轉(zhuǎn)載授權(quán)。
《.NET并發(fā)變成實戰(zhàn)》讀后感:并行編程Parallel
手打目錄:
一、前言
二、任務(wù)并行庫(TPL)的介紹
三、Parallel.Invoke的使用
四、Parallel.For的使用
五、Parallel.ForEach+Partitioner的使用
六、指定最大并行度MaxDegreeOfParallelism
七、退出循環(huán)以及捕捉異常
八、參考的資料
一、前言
背景:在物聯(lián)網(wǎng)場景下,由于數(shù)據(jù)吞吐量較大,常規(guī)的Task異步執(zhí)行存在明顯的性能瓶頸,后通過參考Riccardo Terrel(里卡爾多Dian·特雷爾)著,葉偉民老師翻譯的《.NET并發(fā)編程實戰(zhàn)》,使用了Parallel并行編程,以及分區(qū)器Partitioner,將兩者結(jié)合使用提高了設(shè)備數(shù)據(jù)綁定及數(shù)據(jù)更新速度,也做到了對CPU的性能比較極致使用。
萌新記錄,大佬多加斧正!
可跳過概念,直接抵達使用實例—>五、Parallel.ForEach+Partitioner的使用
l?并行編程的原理
在《.net并發(fā)編程實戰(zhàn)》(以下稱《實戰(zhàn)》)中這樣解釋并行編程——同時執(zhí)行多個任務(wù)。
從開發(fā)人員的角度看,當我們考慮這些問題是,“我的程序可以同時執(zhí)行多項操作嗎?”或“我的程序如何更快地解決一個問題”我們會想到并行。并行是指同時在不同的內(nèi)核上執(zhí)行多個任務(wù),以提高應(yīng)用程序的速度,這需要硬件支持(多核),且并行只能在多核設(shè)備中實現(xiàn),是提高程序性能和吞吐量的手段。
l?并行與并發(fā)編程簡單區(qū)分
1、?并發(fā)編程一次處理多個操作,不需要硬件支持(使用一個或多個內(nèi)核)。
2、?并行編程在多個CPU或多個內(nèi)核上同時執(zhí)行多個操作。所有并行程序都是并發(fā)的,同時運行的,但并非所有并發(fā)都是并行的。原因是并行只能在多核設(shè)備上實現(xiàn)。
3、?多任務(wù)同時執(zhí)行來自不同進程的多個線程。多任務(wù)并不一定意味著并行執(zhí)行,只有在使用多個CPU 或多個內(nèi)核時才能實現(xiàn)并行執(zhí)行。
?
l?為什么需要使用并行編程
《實戰(zhàn)》對不同程序CPU使用資源使用的程度做了一個對比:?
?
《實戰(zhàn)》中認為,在一臺多核計算機上運行一個沒有考慮到并發(fā)的應(yīng)用程序,就是在浪費計算機的生產(chǎn)力,因為應(yīng)用程序在順序處理過程中只能使用一部分可用的計算能力,在這種情況下任何CPU性能計數(shù)器會發(fā)現(xiàn)只有一個內(nèi)核運行得很快,可能為100%,而其他內(nèi)核未充分利用或空閑,在上圖的8內(nèi)核的計算機中,運行的非并行程序意味著資源的總體使用率可能不到15%。
l?使用并行編程的兩種方式
1、?任務(wù)并行庫(TPL),本文中只使用了這種方式
2、?并行LINQ(PLINQ)—》官方文檔直達:https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/introduction-to-plinq
二、任務(wù)并行庫(TPL)的并行介紹
.Net Framework4 引入了新的Task Parallel Library(任務(wù)并行庫,TPL),它支持數(shù)據(jù)并行、任務(wù)并行和流水線。
當并行循環(huán)運行時,TPL會將數(shù)據(jù)源按照內(nèi)置的分區(qū)算法(或者你可以自定義一個分區(qū)算法)將數(shù)據(jù)劃分為多個不相交的子集,然后,從線程池中選擇線程并行地處理這些數(shù)據(jù)子集,每個線程只負責處理一個數(shù)據(jù)子集。在后臺,任務(wù)計劃程序?qū)⒏鶕?jù)系統(tǒng)資源和工作負荷來對任務(wù)進行分區(qū)。如有可能,計劃程序會在工作負荷變得不平衡的情況下在多個線程和處理器之間重新分配工作。
在對任何代碼(包括循環(huán))進行并行化時,一個重要的目標是利用盡可能多的處理器,但不要過度并行化到使行處理的開銷讓任何性能優(yōu)勢消耗殆盡的程度。比如:對于嵌套循環(huán),只會對外部循環(huán)進行并行化,原因是不會在內(nèi)部循環(huán)中執(zhí)行太多工作。少量工作和不良緩存影響的組合可能會導致嵌套并行循環(huán)的性能降低。
由于循環(huán)體是并行運行的,迭代范圍的分區(qū)是根據(jù)可用的邏輯內(nèi)核數(shù)、分區(qū)大小以及其他因素動態(tài)變化的,因此無法保證迭代的執(zhí)行順序。
? ? TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的并發(fā)的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態(tài)類。它提供了Parallel.Invoke,?Parallel.For,Parallel.Forecah?三個方法,以下分別介紹3個方法的簡單實例,每個方法都有多個重載,可自行查看源代碼
三、Parallel.Invoke的使用
????????
static void Main(){try{Parallel.Invoke(BasicAction,// Param #0 - 靜態(tài)方法() =>// Param #1 - lambda表達式{Console.WriteLine("干飯人干飯, Thread={0}", Thread.CurrentThread.ManagedThreadId);},delegate ()// Param #2 - 委托{Console.WriteLine("委托方法中, Thread={0}", Thread.CurrentThread.ManagedThreadId);});}// 在本例中不期望出現(xiàn)異常,但如果任務(wù)中仍然拋出異常,// 它將被包裝在AggregateException中,并傳播到主線程。catch (AggregateException e){Console.WriteLine("捕捉異常 \n{0}", e.InnerException.ToString());}}static void BasicAction(){Console.WriteLine("打工人打工, Thread={0}", Thread.CurrentThread.ManagedThreadId);}?
?
注解:
l?此方法可用于執(zhí)行可能并行執(zhí)行的一組操作。
l?不保證執(zhí)行操作的順序,或是否并行執(zhí)行操作。
l?此方法在每個提供的操作都已完成后才會返回,無論是由于正常終止還是異常終止而發(fā)生。
?
四、Parallel.For的使用
我們先用一個簡單的插入,來比較并行的for循環(huán)與串行for循環(huán)的速度。
?
這里因為Parallel.For在對處理器分配任務(wù)時候也有性能消耗,速度提升并不明顯。
?
接下來我們看一下Parallel.For的其中重載之一
?
??????????
var?list?=?new?List<int>()?{?10,?20,?30,?40?}; var options = new ParallelOptions(); var total = 0; var result = Parallel.For(0, list.Count, () =>{Console.WriteLine("------------ thead --------------");return 1;},(i, loop, j) =>{Console.WriteLine("------------ body --------------");Console.WriteLine("i=" + list[i] + " j=" + j);return list[i];},(b)?=>{Console.WriteLine("------------ tfoot --------------");Interlocked.Add(ref?total,?b);Console.WriteLine("total="?+?total); });Console.WriteLine("iscompleted:" + result.IsCompleted); Console.Read();注解:
l?因為并行任務(wù)當中不保證執(zhí)行順序,且多任務(wù)可能會同時嘗試更新total變量,所以這里使用了?Interlocked.Add執(zhí)行,來保證它是作為原子操作來執(zhí)行。
五、Parallel.ForEach+Partitioner的結(jié)合使用
Partitioner分區(qū)器:
首先我們來看看分區(qū)器源代碼,看他是如何對數(shù)據(jù)源進行分區(qū)的:
?
Partitioner.Create?若只指定的數(shù)據(jù)源的起始于結(jié)束的索引位置,創(chuàng)建分區(qū)則主要是根據(jù)邏輯內(nèi)核數(shù)(PlatformHelper.ProcessorCount)決定的。
?
?
?
大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,比如我們不使用分區(qū)器,直接對數(shù)據(jù)源進行負載均衡的并行執(zhí)行,案例請看—>六、指定最大并行度。
當然我們也可以自定義分區(qū)大小,以下我們進入到實際的開發(fā)環(huán)境中,當前實驗電腦為6核12線程處理器
注解:
dataList —>實時數(shù)據(jù)的數(shù)據(jù)源
Index —>數(shù)據(jù)源總數(shù),此處假設(shè)1W條數(shù)據(jù)
rangesize—>區(qū)塊大小,由此可以計算 ?10000/12+1=834(+1是為了適應(yīng)可能除不盡的情況)
Partitioner.Create(0,Index,rangesize) —>分區(qū)器將數(shù)據(jù)源0-1W條數(shù)據(jù)分成了12個數(shù)據(jù)塊,每一塊為834條,當然最后一塊沒有834條數(shù)據(jù)
?
?
打上斷點可以看到range.Item2-range.Item1=834,已經(jīng)分好區(qū)塊了,然后就是并行處理業(yè)務(wù)代碼了。
這里貼上示例,粘貼可用:
????????????
int index = 10000; var rangesize = (int)(index / Environment.ProcessorCount) + 1; var?rangePartitioner?=?Partitioner.Create(1,?index,?rangesize); System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{#region 業(yè)務(wù)代碼#endregion });六、指定最大并行度MaxDegreeOfParallelism
參考博客文章:Parallel.ForEach 之 MaxDegreeOfParallelism
https://www.cnblogs.com/QinQouShui/p/12134232.html
System.Threading.Tasks.Parallel.ForEach(list, new ParallelOptions() { MaxDegreeOfParallelism = 12 }, range =>{#region 業(yè)務(wù)代碼#endregion});此Parallel.ForEach并沒有使用分區(qū)器,而是用TPL進行負載均衡的并行。
該重載的源代碼為:
?
七、退出循環(huán)以及捕捉異常
和串行運行中的break不同,ParallelLoopState 提供了兩個方法用于停止Parallel.For 和 Parallel.ForEach的執(zhí)行。
?
public class ParallelLoopState {// 獲取循環(huán)的任何迭代是否已引發(fā)相應(yīng)迭代未處理的異常。public bool IsExceptional { get; }// 獲取循環(huán)的任何迭代是否已調(diào)用 ParallelLoopState.Stop()。public bool IsStopped { get; }// 獲取在Parallel循環(huán)中調(diào)用 ParallelLoopState.Break() 的最低循環(huán)迭代。public long? LowestBreakIteration { get; }// 獲取循環(huán)的當前迭代是否應(yīng)基于此迭代或其他迭代發(fā)出的請求退出。public bool ShouldExitCurrentIteration { get; }//通知Parallel循環(huán)當前迭代”之后”的其他迭代不需要運行。public void Break();//通知Parallel循環(huán)當前迭代“之外”的所有其他迭代不需要運行。public void Stop(); }?
l?Break:用于通知Parallel循環(huán)當前迭代“之后”的其他迭代不需要運行。例如,對于從 0 到 1000 并行迭代的 for 循環(huán),如果在第 100 次迭代調(diào)用 Break(),則低于 100 的所有迭代仍會運行(即使還未開始處理),并在退出循環(huán)之前處理完。從 101 到 1000 中還未開啟的迭代則會被放棄。對于已經(jīng)在執(zhí)行的長時間運行迭代,Break()將為已運行還未結(jié)束的迭代對應(yīng)ParallelLoopResult結(jié)構(gòu)的LowestBreakIteration屬性設(shè)置為調(diào)用Bread()迭代項的索引。
l?Stop:Stop() 用于通知Parallel循環(huán)當前迭代“之外”的所有其他迭代不需要運行,無論它們是位于當前迭代的上方還是下方。對于已經(jīng)在執(zhí)行的長時間運行迭代,可以檢查 IsStopped屬性,在觀測到是 true 時提前退出。Stop 通常在基于搜索的算法中使用,在找到一個結(jié)果之后就不需要執(zhí)行其他任何迭代。(比如在看視頻或漫畫時自動匹配響應(yīng)最快的服務(wù)器)
var loopresult = System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range => {#region 業(yè)務(wù)代碼loopState.Stop();#endregion });?
?
?
?當并行迭代中調(diào)用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。
try {System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{#region 業(yè)務(wù)代碼#endregion}); } Catch(AggregateException ex) {foreach (var innerEx in ex.InnerExceptions){Console.WriteLine(innerEx.ToString());} }八、參考的資料
l?《.net并發(fā)編程實戰(zhàn)》
l?官方文檔《.NET 中的并行編程》https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/
l?博客園《.Net并行編程高級教程--Parallel》https://www.cnblogs.com/stoneniqiu/p/4857021.html
l?博客園《8天玩轉(zhuǎn)并發(fā)》
https://www.cnblogs.com/huangxincheng/category/368987.html
l?《異步編程:.NET4.X 數(shù)據(jù)并行》
https://www.cnblogs.com/heyuquan/archive/2013/03/13/parallel-for-foreach-invoke.html
l?博客園《Parallel.ForEach 之 MaxDegreeOfParallelism》
https://www.cnblogs.com/QinQouShui/p/12134232.html
?end
?
輸入優(yōu)惠碼同樣也能享受到當當送的優(yōu)惠。
RCKZEV(長按復制)10元優(yōu)惠碼(滿99元可用)
7R99RD(長按復制)30元優(yōu)惠碼(滿199元可用)
使用渠道:當當小程序或當當APP
有效期:3月23日至3月25日
總結(jié)
以上是生活随笔為你收集整理的如何运用并行编程Parallel提升任务执行效率的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IOT必备之MQTT结构分析,不进来看看
- 下一篇: 通过 GitHub Actions 自动