【转】1.5异步编程:.NET4.X 数据并行
任務并行庫?(TPL)?是?.NET Framework 4的?System.Threading?和?System.Threading.Tasks?命名空間中的一組公共類型和?API。TPL的目的在于簡化向應用程序中添加并行性和并發(fā)性的過程,從而提高開發(fā)人員的工作效率。TPL會動態(tài)地按比例調節(jié)并發(fā)程度,以便最有效地使用所有可用的處理器。此外,TPL還處理工作分區(qū)、ThreadPool?上的線程調度、取消支持、狀態(tài)管理以及其他低級別的細節(jié)操作。通過使用TPL,您可以在將精力集中于程序要完成的工作,同時最大程度地提高代碼的性能。
從?.NET Framework 4?開始,TPL?是編寫多線程代碼和并行代碼的首選方法。但是,并不是所有代碼都適合并行化;例如,如果某個循環(huán)在每次迭代時只執(zhí)行少量工作,或它在很多次迭代時都不運行,那么并行化的開銷可能導致代碼運行更慢。此外,像任何多線程代碼一樣,并行化會增加程序執(zhí)行的復雜性(容易產生bug)。盡管?TPL?簡化了多線程方案,但我們建議您對線程處理概念(例如,鎖、死鎖和爭用條件《異步編程:同步基元對象(上)》)進行基本的了解,以便能夠有效地使用?TPL。
?
示例:異步編程:.NET4.X數據并行.rar
傳送門:異步編程系列目錄……
?
本主題分兩部分講解:
《異步編程:.NET4.X任務并行》
《異步編程:.NET4.X?數據并行》,本節(jié)所述內容:
?
并發(fā)與并行
???????首先,我們要理解并發(fā)與并行的區(qū)別:
???????在理解和設計支持多核操作時,我們可以借用甘特圖來幫助我們更清晰地知道多任務的運行情況,常用的甘特圖軟件有:GanttProject?、翰文橫道圖編制系統(tǒng)、Microsoft Office Project。
???????并發(fā):一個處理器在“同一時段(時間間隔)”處理多個任務,各個任務之間快速交替執(zhí)行。如圖:
???????
并行:多個處理器或者多核的處理器“同一時刻(時間點)”處理多個不同的任務。并行是真正的細粒度上的同時進行,既同一時間點上同時發(fā)生著多個并發(fā)。并行一定是并發(fā),而并發(fā)不一定是并行。如圖:
??
?
?
數據并行
數據并行是指對源集合或數組中的元素同時(即并行)執(zhí)行相同操作的情況。
?
先稍微了解下Action和Func委托,此兩委托由微軟提供;Action是一個沒有返回參數的委托,Func是一個有返回值的委托。
?
??????????????并行循環(huán)?????????????
當并行循環(huán)運行時,TPL會將數據源按照內置的分區(qū)算法(或者你可以自定義一個分區(qū)算法)將數據劃分為多個不相交的子集,然后,從線程池中選擇線程并行地處理這些數據子集,每個線程只負責處理一個數據子集。在后臺,任務計劃程序將根據系統(tǒng)資源和工作負荷來對任務進行分區(qū)。如有可能,計劃程序會在工作負荷變得不平衡的情況下在多個線程和處理器之間重新分配工作。
在對任何代碼(包括循環(huán))進行并行化時,一個重要的目標是利用盡可能多的處理器,但不要過度并行化到使行處理的開銷讓任何性能優(yōu)勢消耗殆盡的程度。比如:對于嵌套循環(huán),只會對外部循環(huán)進行并行化,原因是不會在內部循環(huán)中執(zhí)行太多工作。少量工作和不良緩存影響的組合可能會導致嵌套并行循環(huán)的性能降低。
由于循環(huán)體是并行運行的,迭代范圍的分區(qū)是根據可用的邏輯內核數、分區(qū)大小以及其他因素動態(tài)變化的,因此無法保證迭代的執(zhí)行順序。
?
1.????????Parallel.For
為固定數目的獨立For循環(huán)迭代提供了負載均衡的潛在并行執(zhí)行。Parallel內部通過RangeManger對象實現負載均衡。
負載均衡的執(zhí)行會嘗試將工作分發(fā)在不同的任務中,這樣所有的任務在大部分時間內部可以保持繁忙。負載均衡總是試圖減少任務的閑置時間。
| 1 2 3 4 5 6 7 8 9 | public?static?ParallelLoopResult For(int?fromInclusive, int?toExclusive ????, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body); ????// 執(zhí)行具有線程本地數據的 for 循環(huán),泛型類型參數TLocal為本地線程數據類型。 public?static?ParallelLoopResult For<TLocal>(int?fromInclusive, int?toExclusive ????, ParallelOptions parallelOptions ????, Func<TLocal> localInit ????, Func<int, ParallelLoopState, TLocal, TLocal> body ????, Action<TLocal> localFinally ????); |
參數:
1)????????返回ParallelLoopResult結構
System.Threading.Tasks.ParallelLoopResult是結構體,當所有線程均已完成時,For?將返回?ParallelLoopResult?對象。若你手動停止或中斷循環(huán)迭代時,此返回值特別有用,因為?ParallelLoopResult?存儲諸如完成運行的最后一個迭代等信息。
| 1 2 3 4 5 6 7 | ????public?struct?ParallelLoopResult ????{ ????????// 獲取該循環(huán)是否已運行完成。 ????????public?bool?IsCompleted { get; } ????????// 獲取從中調用 ParallelLoopState.Break() 的最低迭代的索引。 ????????public?long? LowestBreakIteration { get; } } |
l??如果?IsCompleted?返回?true,該循環(huán)的所有迭代均已執(zhí)行,并且該循環(huán)沒有收到提前結束的請求.
l??如果?IsCompleted?返回?false:
???????????????????????????????????????i.??????????????LowestBreakIteration?返回?null,則為調用?ParallelLoopState.Stop()?提前結束循環(huán)。
?????????????????????????????????????ii.??????????????LowestBreakIteration?返回非?null?整數值,則為調用?ParallelLoopState.Break()?提前結束循環(huán)。
2)????????迭代范圍
對于迭代范圍(fromInclusive<= x <toExclusive)中的每個值調用一次body委托。如果?fromInclusive?大于或等于?toExclusive,則該方法立即返回,而無需執(zhí)行任何迭代。
3)????????ParallelOptions類型
ParallelOptions實例存儲用于配置?Parallel?類的方法的操作的選項。
| 1 2 3 4 5 6 7 8 9 10 | public?class?ParallelOptions { ????public?ParallelOptions(); ????// 獲取或設置與此 ParallelOptions 實例關聯(lián)的 CancellationToken。 ????public?CancellationToken CancellationToken { get; set; } ????// 獲取或設置此 ParallelOptions 實例所允許的最大并行度。 ????public?int?MaxDegreeOfParallelism { get; set; } ????// 獲取或設置與此 ParallelOptions 實例關聯(lián)的 TaskScheduler。 ????public?TaskScheduler TaskScheduler { get; set; } } |
a)????????提供一個無參數的構造函數,此構造函數使用默認值初始化實例。MaxDegreeOfParallelism?初始化為?-1,表示并行量沒有上限設置;CancellationToken?初始化為CancellationToken.None不可取消的標記;TaskScheduler?初始化為默認計劃程序?(TaskScheduler.Default)。(CancellationToken取消示例請看章節(jié)”協(xié)作式取消”)
b)????????指定最大并行度
有時候,你并不希望在并行循環(huán)中使用所有的內核,因為你對剩余的內核有特定的需求和更好的使用計劃。
通常指定Environment.ProcessorCount,或者是根據此值計算出來的值(eg:Environment.ProcessorCount-1)。默認情況下,如果沒有指定最大并行度,TPL就會允許通過啟發(fā)式算法提高或降低線程的數目,通常這樣會高于ProcessorCount,因為這樣可以更好地支持CPU和I/O混合型的工作負荷。
4)????????ParallelLoopState類型
可用來使?Tasks.Parallel?循環(huán)的迭代與其他迭代交互,并為?Parallel?類的循環(huán)提供提前退出循環(huán)的功能。此類的實例不要自行創(chuàng)建,它由?Parallel?類創(chuàng)建并提供給每個循環(huán)項,并且只應該在提供此實例的“循環(huán)內部”使用。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public?class?ParallelLoopState { ????// 獲取循環(huán)的任何迭代是否已引發(fā)相應迭代未處理的異常。 ????public?bool?IsExceptional { get; } ????// 獲取循環(huán)的任何迭代是否已調用 ParallelLoopState.Stop()。 ????public?bool?IsStopped { get; } ????// 獲取在Parallel循環(huán)中調用 ParallelLoopState.Break() 的最低循環(huán)迭代。 ????public?long? LowestBreakIteration { get; } ????// 獲取循環(huán)的當前迭代是否應基于此迭代或其他迭代發(fā)出的請求退出。 ????public?bool?ShouldExitCurrentIteration { get; } ? ????//通知Parallel循環(huán)當前迭代”之后”的其他迭代不需要運行。 ????public?void?Break(); ????//通知Parallel循環(huán)當前迭代“之外”的所有其他迭代不需要運行。 ????public?void?Stop(); } |
a)????????Break()
Break()用于通知Parallel循環(huán)當前迭代“之后”的其他迭代不需要運行。例如,對于從?0?到?1000?并行迭代的?for?循環(huán),如果在第?100?次迭代調用?Break(),則低于?100?的所有迭代仍會運行(即使還未開始處理),并在退出循環(huán)之前處理完。從?101?到?1000?中還未開啟的迭代則會被放棄。
對于已經在執(zhí)行的長時間運行迭代,Break()將為已運行還未結束的迭代對應ParallelLoopResult結構的LowestBreakIteration屬性設置為調用Bread()迭代項的索引。
b)????????Stop()
Stop()?用于通知Parallel循環(huán)當前迭代“之外”的所有其他迭代不需要運行,無論它們是位于當前迭代的上方還是下方。
對于已經在執(zhí)行的長時間運行迭代,可以檢查?IsStopped屬性,在觀測到是?true?時提前退出。
Stop?通常在基于搜索的算法中使用,在找到一個結果之后就不需要執(zhí)行其他任何迭代。(比如在看視頻或漫畫時自動匹配響應最快的服務器)
c)????????ShouldExitCurrentIteration?屬性
當循環(huán)的迭代調用?Break?或?Stop時,或一個迭代引發(fā)異常,或取消循環(huán)時,Parallel?類將主動嘗試禁止開始執(zhí)行循環(huán)的其他迭代。但是,可能有無法阻止其他迭代啟動的情況。也可能是長時間運行的迭代已經開始執(zhí)行的情況。在此類情況下,迭代可以通過顯式檢查?ShouldExitCurrentIteration?屬性,在該屬性返回?true?時停止執(zhí)行。
5)????????委托函數:localInit,body,localFinally(委托中注意并行訪問問題)
a)????????localInit???????用于返回每個線程的本地數據的初始狀態(tài)的委托。
b)????????body?????????????將為每個迭代調用一次的委托。
c)????????localFinally???用于對每個線程的本地狀態(tài)執(zhí)行一個最終操作的委托。
對于參與循環(huán)執(zhí)行的每個線程調用一次?localInit?委托(每個分區(qū)一個線程),并返回每個線程的初始本地狀態(tài)。這些初始狀態(tài)傳遞到每個線程上的第一個?body?調用。然后,該線程的每個后續(xù)body調用返回可能修改過的狀態(tài)值,并傳遞給下一個body調用。最后,每個線程上最后body調用的返回值傳遞給?localFinally?委托。每個線程調用一次?localFinally?委托,以對每個線程的本地狀態(tài)執(zhí)行最終操作。
Parallel.For中三個委托執(zhí)行流程如下:
????????????????????????i.??????????????分區(qū)依據:Parallel.For也會為集合進行分區(qū),分區(qū)算法由FCL內部RangeManger對象提供,以提供負載平衡。
??????????????????????ii.??????????????RangeManger根據最大并發(fā)度將集合源拆分為多個小集合,再并行訪問其對應的RangeWorker的FindNewWork()?返回當前分區(qū)中是否還有迭代元素bool值。(FindNewWork()實現為無鎖(Interlocked)循環(huán)結構)
????????????????????iii.??????????????三個委托之間的變量值傳遞由內部聲明的局部變量支持。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | // 整體思路:依據內置RangeManger算法分區(qū),再由多個線程“并行”執(zhí)行下面委托 // 第一步: Action action =()=> ????{ ????????try ????????{ ????????????localInit(); ? ????????????Label_00FF: ????????????body(); ????????????if(RangeWorker.FindNewWork()) ????????????{ ????????????????Goto Lable_00FF; ????????????} ????????} ????????catch(){} ????????finaly ????????{ ????????????localFinally(); ????????} ????} // 第二步:再將action傳遞給Task的內部派生類ParallelForReplicatingTask, // 根據最大并發(fā)級別(ParallelOptions.MaxDegreeOfParallelism)進行并行調用 |
6)????????示例:(ParallelTest.cs,帶本地變量的Parallel.For)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | public?static?void?Parallel_For_Local_Test() { ????int[] nums = Enumerable.Range(0, 1000000).ToArray<int>(); ????long?total = 0; ????ParallelLoopResult result = Parallel.For<long>(0, nums.Length, ?????????() => { return?0; }, ?????????(j, loop, subtotal) => ?????????{ ?????????????// 延長任務時間,更方便觀察下面得出的結論 ?????????????Thread.SpinWait(200); ?????????????Console.WriteLine("當前線程ID為:{0},j為{1},subtotal為:{2}。" ?????????????????, Thread.CurrentThread.ManagedThreadId, j.ToString(), subtotal.ToString()); ?????????????if?(j == 23) ?????????????????loop.Break(); ?????????????if?(j > loop.LowestBreakIteration) ?????????????{ ?????????????????Thread.Sleep(4000); ?????????????????Console.WriteLine("j為{0},等待4s種,用于判斷已開啟且大于阻斷迭代是否會運行完。", j.ToString()); ?????????????} ?????????????Console.WriteLine("j為{0},LowestBreakIteration為:{1}", j.ToString(), loop.LowestBreakIteration); ?????????????subtotal += nums[j]; ?????????????return?subtotal; ?????????}, ?????????(finalResult) => Interlocked.Add(ref?total, finalResult) ????); ????Console.WriteLine("total值為:{0}", total.ToString()); ????if?(result.IsCompleted) ????????Console.WriteLine("循環(huán)執(zhí)行完畢"); ????else ????????Console.WriteLine("{0}" ????????????, result.LowestBreakIteration.HasValue ? "調用了Break()阻斷循環(huán)."?: "調用了Stop()終止循環(huán)."); } |
?????????運行截圖:
分析一下:
a)????????泛型類型參數TLocal為本地線程數據類型,本示例設置為long。
b)????????三個委托的參數解析body(j, loop, subtotal):首先初始委托l(wèi)ocalInit中返回了0,所以body委托中參數subtotal的初始值即為0,body委托的參數j對應的是當前迭代索引,參數loop為當前迭代狀態(tài)ParallelLoopState對象;localFinally委托參數為body委托的返回值。
c)????????三個委托三個階段中都可能并行運行,因此您必須同步對任何共享變量的訪問,如示例中在finally委托中使用了System.Threading.Interlocked對象。
d)????????在索引為23的迭代中調用Break()后:
????????????????????????i.??????????????索引小于23的所有迭代仍會運行(即使還未開始處理),并在退出循環(huán)之前處理完。
??????????????????????ii.??????????????索引大于?23?的迭代若還未開啟則會被放棄;若已處于運行中則會在退出循環(huán)之前處理完。
e)????????對于調用Break()之后,在任何循環(huán)迭代中訪問LowestBreakIteration屬性都會返回調用Break()的迭代對應的索引。
?
2.????????Parallel.Foreach
為給定數目的獨立ForEach循環(huán)迭代提供了負載均衡的潛在并行執(zhí)行。這個方法還支持自定義分區(qū)程序(抽象類Partitioner<TSource>),讓你可以完全掌控數據分發(fā)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // 對 System.Collections.IEnumerable 執(zhí)行foreach 操作。 public?static?ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source ????, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body); // 對 System.Collections.IEnumerable 執(zhí)行具有 64 位索引的 foreach 操作。 public?static?ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source ????, ParallelOptions parallelOptions, Action<TSource,???? ParallelLoopState, long> body); // 對 System.Collections.IEnumerable 執(zhí)行具有線程本地數據的 foreach 操作。 public?static?ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source ????, ParallelOptions parallelOptions, Func<TLocal> localInit ????, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally); // 對 System.Collections.IEnumerable 執(zhí)行具有線程本地數據和 64 位索引的 foreach 操作。 public?static?ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source ????, ParallelOptions parallelOptions, Func<TLocal> localInit ????, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally); |
1)????????如果打算要在?ForEach?方法中使用分區(qū)程序,你必須支持動態(tài)數量的分區(qū),即:
a)????????在Partitioner<TSource>的派生類中重寫?GetDynamicPartitions()?方法和?SupportsDynamicPartitions屬性
b)????????在OrderablePartitioner<TSource>派生類中重寫GetOrderableDynamicPartitions()?方法和SupportsDynamicPartitions?屬性。
分區(qū)程序能夠在循環(huán)執(zhí)行過程中隨時按需為新分區(qū)提供枚舉器。基本上,每當循環(huán)添加一個新并行任務時,它都會為該任務請求一個新分區(qū)。動態(tài)數量的分區(qū)程序在本質上也是負載平衡的。
2)????????Parallel.ForEach還支持集合源為Partitioner<TSource>類型的重載,此重載不提供迭代索引。其中Partitioner<TSource>表示將一個數據源拆分成多個分區(qū)的特定方式。
| 1 2 3 4 5 6 7 8 9 10 11 12 | public?abstract?class?Partitioner<TSource>???? // partitioner [pa:'ti??n?]瓜分者,分割者 { ????protected?Partitioner(); ????// 獲取是否可以動態(tài)創(chuàng)建附加分區(qū)。 ????public?virtual?bool?SupportsDynamicPartitions { get; } ????// 將基礎集合分區(qū)成給定數目的分區(qū),參數partitionCount為要創(chuàng)建的分區(qū)數。 ????// 返回一個包含 partitionCount 枚舉器的列表。 ????public?abstract?IList<IEnumerator<TSource>> GetPartitions(int?partitionCount); ????// 創(chuàng)建一個可將基礎集合分區(qū)成可變數目的分區(qū)的對象。 ????// 返回一個可針對基礎數據源創(chuàng)建分區(qū)的對象。 ????public?virtual?IEnumerable<TSource> GetDynamicPartitions(); } |
示例見:CustomerPartitioner.cs
3)????????Parallel.ForEach還支持集合源為OrderablePartitioner<TSource>類型的重載。OrderablePartitioner<TSource>表示將一個可排序數據源拆分成多個分區(qū)的特定方式,因此次重載提供迭代索引。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public?abstract?class?OrderablePartitioner<TSource> : Partitioner<TSource> { ????// 從派生類中的構造函數進行調用以便使用索引鍵上指定的約束初始化 OrderablePartitioner<TSource> ????protected?OrderablePartitioner(bool?keysOrderedInEachPartition ????????, bool?keysOrderedAcrossPartitions, bool?keysNormalized); ????// 獲取是否按鍵增加的順序生成每個分區(qū)中的元素。 ????public?bool?KeysOrderedInEachPartition { get; } ????// 獲取前一分區(qū)中的元素是否始終排在后一分區(qū)中的元素之前。 ????public?bool?KeysOrderedAcrossPartitions { get; } ????// 獲取是否規(guī)范化順序鍵。如果為 true,則所有順序鍵為 [0 .. numberOfElements-1]。 ????// 如果為 false,順序鍵仍必須互不相同,但只考慮其相對順序,而不考慮其絕對值。 ????public?bool?KeysNormalized { get; } ? ????// 將基礎集合分區(qū)成給定數目的可排序分區(qū)。 ????public?override?IList<IEnumerator<TSource>> GetPartitions(int?partitionCount); ????// 創(chuàng)建一個可將基礎集合分區(qū)成可變數目的分區(qū)的對象。 ????public?override?IEnumerable<TSource> GetDynamicPartitions(); ????// 創(chuàng)建一個可將基礎集合分區(qū)成可變數目的分區(qū)的對象。 ????public?virtual?IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions(); ????// 將基礎集合分區(qū)成指定數目的可排序分區(qū)。 ????public?abstract?IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int?partitionCount); } |
三個bool值為true所要遵循的規(guī)則:
a)????????KeysOrderedInEachPartition?:每個分區(qū)返回具有不斷增加的鍵索引的元素。
b)????????KeysOrderedAcrossPartitions?:對于返回的所有分區(qū),分區(qū)?i?中的鍵索引大于分區(qū)?i-1?中的鍵索引。
c)????????KeysNormalized?:所有鍵索引將從零開始單調遞增(沒有間隔)。
示例見:CustomerOrderablePartitioner.cs
4)????????ForEach中的3個委托調用流程:(委托中注意并行訪問問題)
a)?????????對于Parallel.ForEach()使用IEnumerable<TSource>集合重載的循環(huán),會轉化為Parallel.For()循環(huán)調用邏輯。
b)?????????對于使用OrderablePartitioner<TSource>或Partitioner<TSource>派生類構造的自定義分區(qū)的循環(huán)邏輯如下:
??????????????????????????????????????i.??????????????分區(qū)依據:由OrderablePartitioner<TSource>或Partitioner<TSource>派生類提供自定義分區(qū)算法,注意要重寫動態(tài)數量分區(qū)相關方法。
????????????????????????????????????ii.??????????????在各個線程中,先取緩存中的enumerator,若沒有才會獲取動態(tài)分區(qū)(即每個線程的動態(tài)分區(qū)只會獲取一次)
??????????????????????????????????iii.??????????????三個委托之間的變量值傳遞由內部聲明局部變量支持。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | // 總體思路:依據自定義算法分區(qū),再由多個線程“并行”執(zhí)行下面代碼 // 第一步: Action action = ()=> { ????try ????{ ?????????localInit(); ? ?????????// 在各個線程中,先取緩存中的enumerator,若沒有才會獲取動態(tài)分區(qū)(即每個線程的動態(tài)分區(qū)只會獲取一次) ????????var?enumerator = OrderablePartitioner<TSource>.GetOrderableDynamicPartitions(); ????????// 若為Partitioner<TSource>對象,則var enumerator =???????? Partitioner<TSource>.GetDynamicPartitions(); ????????while(enumerator.MoveNext()) ????????{ ????????????body(); ????????} ????} ????catch(){} ????finaly ????{ ?????????localFinally(); ????} } // 第二步:再將action傳遞給Task的內部派生類ParallelForReplicatingTask, // 它根據最大并發(fā)級別(ParallelOptions. MaxDegreeOfParallelism)進行并行調用. |
5)????????分析一個重載
| 1 2 3 | public?static?ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source ????, ParallelOptions parallelOptions, Func<TLocal> localInit ????, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally); |
a)????????返回ParallelLoopResult結構;泛型參數TSource指定集合源元素的類型,泛型參數TLocal指定線程本地變量的類型。
b)????????Func<TSource, ParallelLoopState, long, TLocal, TLocal> body委托參數解析:TSource為集合迭代特定項;ParallelLoopState為循環(huán)迭代項狀態(tài);long為迭代索引;第一個TLocal為localInit委托返回的初始值;第二個TLocal為body委托自身返回值類型。
示例:(詳見:ParallelTest.cs)
| 1 2 3 4 5 6 7 8 9 10 11 12 | int[] nums = Enumerable.Range(100, 1000000).ToArray<int>(); long?total = 0; Parallel.ForEach<int, long>(nums, ?????() => { return?0; }, ?????(j, loop, index, subtotal) => ?????{ ?????????subtotal += j; ?????????Console.WriteLine("索引為{0},當前項值為{1}.", index.ToString(), j.ToString()); ?????????return?subtotal; ?????}, ?????(finalResult) => Interlocked.Add(ref?total, finalResult) ); |
6)????????將?ForEach?用于非泛型集合
可以用System.Linq命名空間中IEnumerable擴展API的?Cast<TResult>()?方法將集合轉換為泛型集合。
| 1 2 3 4 5 6 7 8 9 10 11 | // 擴展API public?static?class?Enumerable { ????…… ????public?static?IEnumerable<TResult> Cast<TResult>(this?IEnumerable source); } // 示例 System.Collections.ArrayList fruits = new?System.Collections.ArrayList(); fruits.Add("apple"); fruits.Add("mango"); IEnumerable<string> query = fruits.Cast<string>(); |
?
??????????????Parallel.Invoke??????????????
對給定的獨立任務提供潛在的并行執(zhí)行。
| 1 2 | public?static?void?Invoke(params?Action[] actions); public?static?void?Invoke(ParallelOptions parallelOptions, params?Action[] actions); |
Invoke內部通過Task.Factory.StartNew()來為每個委托參數創(chuàng)建并開啟任務并且在最后調用Task.WaitAll(Tasks[])來等待所有任務執(zhí)行完成,所以此方法在每個提供的操作都完成后才會返回,與完成是因為正常終止還是異常終止無關。
???????注意點:
1)????????如果使用Parallel.Invoke加載運行委托的時間迥異,那么依需要最長時間的委托才能返回控制;并且還要考慮邏輯內核的使用情況,因為可能出現有單獨一個委托被延遲到后面單獨執(zhí)行。
2)????????在并行可擴展方面具有一定的局限性,因為Parallel.Invoke調用的是固定數目的委托。
3)????????不能保證操作的執(zhí)行順序或是否并行執(zhí)行。
?
分區(qū)程序
若要對數據源操作進行并行化,其中一個必要步驟是將源分區(qū)為可由多個線程同時訪問的多個部分。
1.????????Parallel支持的兩種分區(qū)程序:
1)????????默認分區(qū)程序:”PLINQ并行查詢”或“并行循環(huán)”提供了默認的分區(qū)程序,該分區(qū)程序將以透明方式工作,即Parallel.For()?中提到的RangeManage分區(qū)對象。
2)????????自定義分區(qū)程序:在某些情況下(eg:一個自定義集合類,根據您對該類的內部結構的了解,您能夠采用比默認分區(qū)程序更有效的方式對其進行分區(qū)。或者,根據您對在源集合中的不同位置處理元素所花費時間的了解,您可能需要創(chuàng)建大小不同的范圍分區(qū)),可能值得通過繼承OrderablePartitioner<TSource>或??Partitioner<TSource>抽象類實現您自己的分區(qū)程序。
2.????????兩種分區(qū)類型
1)????????按范圍分區(qū)(屬于靜態(tài)數量的分區(qū)):
a)????????適用于數據和其他已建立索引的集合源(eg:IList集合);
b)????????并行循環(huán)或PLINQ查詢中的每個線程或任務分別接收唯一的開始和結束索引,以便在處理集合源時,不會覆蓋任何其他線程或被任何其他線程覆蓋;
c)????????同步開銷:涉及的唯一同步開銷是創(chuàng)建范圍的初始工作;
d)????????缺點:如果一個線程提前完成,它將無法幫助其他線程完成它們的工作。
示例關鍵代碼:
| 1 2 3 4 5 6 7 8 9 | var?rangePartitioner = Partitioner.Create(0, source.Length); double[] results = new?double[source.Length]; Parallel.ForEach(rangePartitioner, (range, loopState) => { ????for?(int?i = range.Item1; i < range.Item2; i++) ????{ ????????results[i] = source[i] * Math.PI; ????} }); |
注意這個示例用范圍還有一個優(yōu)勢:因為該示例主體開銷非常小,倘若不使用范圍分區(qū),那么頻繁調用主體委托會使并行循環(huán)效率更低。而依范圍分區(qū)后,就使得一個區(qū)只會產生一次主體委托調用的開銷。
2)????????按區(qū)塊分區(qū)(屬于動態(tài)數量的分區(qū)):
a)????????適用于長度未知的鏈接列表或其他集合;
b)????????并行循環(huán)或PLINQ查詢中的每個線程或任務分別處理一個區(qū)塊中一定數量的源元素,然后返回檢索其他元素。
c)????????區(qū)塊的大小可以任意(即使大小為1)。只要區(qū)塊不是太大,這種分區(qū)在本質上是負載平衡的,原因是為線程分配元素的操作不是預先確定的;
d)????????同步開銷:當線程需要獲取另一個區(qū)塊時,都會產生同步開銷;
示例關鍵代碼:(詳見MyDynamicOrderablePartitioner.cs文件)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // 分區(qū)程序 public?override?IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions() { ????return?new?ListDynamicPartitions(m_input); } ? // 枚舉對象 private?class?ListDynamicPartitions : IEnumerable<KeyValuePair<long, TSource>> { ????private?IList<TSource> m_input; ????private?int?m_pos = 0; ????public?IEnumerator<KeyValuePair<long, TSource>> GetEnumerator() ????{ ????????while?(true) ????????{ ????????????// 由于使用到公共資源只有m_pos值類型索引,所以只需要保證m_pos訪問的原子性 ????????????int?elemIndex = Interlocked.Increment(ref?m_pos) - 1; ????????????if?(elemIndex >= m_input.Count) ????????????{ ????????????????yield?break; ????????????} ????????????yield?return?new?KeyValuePair<long, TSource>(elemIndex, m_input[elemIndex]); ????????} ????} ????…… ?} |
?
自定義分區(qū)程序
我們已經知道通過繼承OrderablePartitioner<TSource>或??Partitioner<TSource>抽象類我們可以針對特定場合實現自己的分區(qū)程序。
下面展出一個示例,這個示例給我們展示了如何構建一個分區(qū)程序,這個示例為我們演示了“動態(tài)數量分區(qū)結合Parallel.ForEach()”和“靜態(tài)數量分區(qū)結合Parallel.Invoke()”的使用方式。
示例見:CustomerPartitioner.cs
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | class?SingleElementPartitioner<T> : Partitioner<T> { …… } ? public?static?void?Test() { ????String[] collection = new?string[]{"red", "orange", "yellow", "green", "blue", "indigo", ????????"violet", "black", "white", "grey"}; ????SingleElementPartitioner<string> myPart = new?SingleElementPartitioner<string>(collection); ? ????Console.WriteLine("示例:Parallel.ForEach"); ????Parallel.ForEach(myPart, item => ????????{ ????????????Console.WriteLine("? item = {0}, thread id = {1}" ????????????????, item, Thread.CurrentThread.ManagedThreadId); ????????} ????); ? ? ????Console.WriteLine("靜態(tài)數量的分區(qū):2個分區(qū),2個任務"); ????var?staticPartitions = myPart.GetPartitions(2); ????int?index = 0; ????Action staticAction = () => ????????{ ????????????int?myIndex = Interlocked.Increment(ref?index) - 1; ????????????var?myItems = staticPartitions[myIndex]; ????????????int?id = Thread.CurrentThread.ManagedThreadId; ? ????????????while?(myItems.MoveNext()) ????????????{ ????????????????// 保證多個線程有機會執(zhí)行 ????????????????Thread.Sleep(50); ????????????????Console.WriteLine("? item = {0}, thread id = {1}" ????????????????????, myItems.Current, Thread.CurrentThread.ManagedThreadId); ? ????????????} ????????????myItems.Dispose(); ????????}; ????Parallel.Invoke(staticAction, staticAction); ? ? ????Console.WriteLine("動態(tài)分區(qū): 3個任務 "); ????var?dynamicPartitions = myPart.GetDynamicPartitions(); ????Action dynamicAction = () => ????????{ ????????????var?enumerator = dynamicPartitions.GetEnumerator(); ????????????int?id = Thread.CurrentThread.ManagedThreadId; ? ????????????while?(enumerator.MoveNext()) ????????????{ ????????????????Thread.Sleep(50); ????????????????Console.WriteLine("? item = {0}, thread id = {1}", enumerator.Current, id); ????????????} ????????}; ????Parallel.Invoke(dynamicAction, dynamicAction, dynamicAction); } |
?
快速創(chuàng)建可排序分區(qū)
???????.NET為我們提供的System.Collections.Concurrent.Partitioner?對象可實現快速獲得可排序分區(qū)的方式。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | namespace?System.Collections.Concurrent { ????// 提供針對數組、列表和可枚舉項的常見分區(qū)策略,創(chuàng)建一個可排序分區(qū)程序。 ????public?static?class?Partitioner ????{ ????????// 參數: ????????// loadBalance:該值指示創(chuàng)建的分區(qū)程序是否應在各分區(qū)之間保持動態(tài)負載平衡,而不是靜態(tài)負載平衡。 ????????// EnumerablePartitionerOptions:控制分區(qū)緩沖行為的選項。 ????????// rangeSize:每個子范圍的大小。 ????????// 范圍:fromInclusive <= 范圍< toExclusive ? ????????public?static?OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source); ????????public?static?OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions); ????????public?static?OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool?loadBalance); ????????public?static?OrderablePartitioner<Tuple<int, int>> Create(int?fromInclusive, int?toExclusive); ????????public?static?OrderablePartitioner<Tuple<long, long>> Create(long?fromInclusive, long?toExclusive); ????????public?static?OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool?loadBalance); ????????public?static?OrderablePartitioner<Tuple<int, int>> Create(int?fromInclusive, int?toExclusive, int?rangeSize); ????????public?static?OrderablePartitioner<Tuple<long, long>> Create(long?fromInclusive, long?toExclusive, long?rangeSize); } ? ????[Flags][Serializable] ????public?enum?EnumerablePartitionerOptions ????{ ????????None = 0, ????????NoBuffering = 1, ????} } |
1.????????Partitioner.Create創(chuàng)建的分區(qū)與負載平衡
| Partitioner.Create重載 | 負載平衡 |
| Create<TSource>(IEnumerable<TSource>) | 始終 |
| Create<TSource>(TSource[], Boolean) Create<TSource>(IList<TSource>, Boolean) | 將布爾型參數指定為?true?時 |
| Create(Int32, Int32) Create(Int32, Int32, Int32) Create(Int64, Int64) Create(Int64, Int64, Int64) | 從不 |
2.????????EnumerablePartitionerOptions
將EnumerablePartitionerOptions枚舉傳遞給Partitioner.Create()用于指示在快速創(chuàng)建分區(qū)時是否啟用緩存提高來實現最優(yōu)性能。
1)????????當傳遞EnumerablePartitionerOptions.None時,指示默認為啟用緩存。在分好區(qū)后,每個線程會加鎖,在臨界區(qū)中,第一次迭代獲取該分區(qū)元素時,會獲取這一分區(qū)的所有迭代元素并緩存下來。
2)????????當傳遞EnumerablePartitionerOptions.NoBuffering時,指示為不啟用緩存。每個線程會加鎖,在臨界區(qū)中,每次迭代都從同一個集合源獲取需要的一個迭代元素,因為每次只獲取一個,所以也不會再進行分區(qū)。
?
處理并行循環(huán)中的異常
Parallel的For和?ForEach?重載沒有任何用于處理可能引發(fā)的異常的特殊機制。并行循環(huán)中的異常處理邏輯需要處理可能在多個線程上同時引發(fā)類似異常的情況,以及一個線程上引發(fā)的異常導致在另一個線程上引發(fā)另一個異常的情況。通過將循環(huán)中的所有異常包裝在?System.AggregateException?中。eg:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | try { ????// 在 ProcessDataInParallel 拋出 throw new ArgumentException(); ????ProcessDataInParallel(data); } catch?(AggregateException ae) { ????foreach?(var?ex in?ae.InnerExceptions) ????{ ????????if?(ex is?ArgumentException) ????????????Console.WriteLine(ex.Message); ????????else ????????????throw?ex; ????} } |
?
注意事項
1.????????在做性能測試時,請避免在循環(huán)內進行諸如?Console.WriteLine,Debug.Write?等調用。因為同步調用共享資源(如控制臺或文件系統(tǒng))將大幅降低并行循環(huán)的性能
2.????????將串行代碼轉化為并行代碼,需要檢查可并行化的熱點。
熱點指的是代碼中消費大量時間運行的部分,這是算法性能的瓶頸。如果熱點可以被分解為很多能并行運行的部分,那么熱點就可以獲得加速。但如果被分解為多部分代碼的單體并沒有消費大量的運行時間,那么TPL所引入的開銷就有可能會完全消減并行化帶來的加速,甚至可能導致并行化的代碼比串行化代碼運行得還慢。(TPL所引入的開銷:在轉化過程中,我們常常需要將方法的局部變量變?yōu)槲蟹椒ǖ膬炔孔兞恳詣?chuàng)建安全無狀態(tài)的并行化代碼,這樣的變化會讓每次迭代執(zhí)行更多指令;另外還增加了大量的內存分配操作,這也會致使垃圾回收器(GC)觸發(fā)的更頻繁)
3.????????避免過度并行化
倘若對操作過度并行化,那么并行循環(huán)很可能比順序循環(huán)的運行速度還慢。規(guī)則:
a)????????嵌套循環(huán)中只對外部循環(huán)進行并行化。
b)????????對于body委托開銷小而循環(huán)次數多的情況,可以采取按范圍分區(qū)的方式。
c)????????循環(huán)中很多次迭代都不執(zhí)行。
4.????????不要調用非線程安全的方法。對于線程安全方法的調用也要清楚內部同步消耗,來判斷是否應該使用并行化方式。
5.????????避免在UI線程上執(zhí)行并行循環(huán)。應該使用任務封裝并行循環(huán),比如:
| 1 2 3 4 5 6 7 8 9 | private?void?button1_Click(object?sender, EventArgs e) { ????Task.Factory.StartNew(() => ????????Parallel.For(0, N, i => ????????{ ????????????button1.Invoke((Action)delegate?{ DisplayProgress(i); }); ????????}) ????); } |
6.????????在由?Parallel.Invoke?調用的委托中等待時要小心
在某些情況下,當等待任務時,該任務可在正在執(zhí)行等待操作的線程上以同步方式執(zhí)行(詳見:局部隊列內聯(lián)機制)。這樣可提高性能,因為它利用了將以其他方式阻止的現有線程,因此不需要附加線程。但此性能優(yōu)化在某些情況下可能會導致死鎖。例如,兩個任務可能運行相同的委托代碼,該代碼在事件發(fā)生時發(fā)出信號,并等待另一個任務發(fā)出信號。如果在相同線程上將第二個任務內聯(lián)到第一個,并且第一個任務進入等待狀態(tài),則第二個任務將永遠無法發(fā)出其事件信號。為了避免發(fā)生這種情況,您可以在等待操作上指定超時,或使用?Thread?或?ThreadPool?來確保任務不會發(fā)生內聯(lián)。
7.????????不要假定?ForEach、For?和?ForAll?的迭代始終并行執(zhí)行
請務必謹記,For、ForEach?或?ForAll<TSource>?循環(huán)中的迭代不一定并行執(zhí)行。因此,您應避免編寫任何依賴于并行執(zhí)行的正確性或依賴于按任何特定順序執(zhí)行迭代的代碼。例如,此代碼有可能會死鎖:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | ManualResetEventSlim mre = new?ManualResetEventSlim(); Enumerable.Range(0, Environment.ProcessorCount * 100) ????.AsParallel() ????.ForAll((j) => ????????{ ????????????if?(j == Environment.ProcessorCount) ????????????{ ????????????????Console.WriteLine("Set on {0} with value of {1}", ????????????????????Thread.CurrentThread.ManagedThreadId, j); ????????????????mre.Set(); ????????????} ????????????else ????????????{ ????????????????Console.WriteLine("Waiting on {0} with value of {1}", ????????????????????Thread.CurrentThread.ManagedThreadId, j); ????????????????mre.Wait(); ????????????} ????????}); //deadlocks |
在此示例中,一個迭代設置事件,而所有其他迭代則等待事件。??在事件設置迭代完成之前,任何等待迭代均無法完成。但是,在事件設置迭代有機會執(zhí)行之前,等待迭代可能會阻止用于執(zhí)行并行循環(huán)的所有線程。這將導致死鎖–事件設置迭代將從不執(zhí)行,并且等待迭代將從不覺醒。
?
?
?
???????本節(jié)博文內容到此結束,主要是解說了Parallel處理數據并行化的方式、Parallel迭代原理、分區(qū)原理、自定義分區(qū)以及使用Parallel的注意事項。接下來我會寫一篇關于Task類的博文,敬請觀賞。若此文對你有幫助,還請園友多幫推薦、推薦……
?
?
參考資料:MSDN
?
總結
以上是生活随笔為你收集整理的【转】1.5异步编程:.NET4.X 数据并行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2022上半年5000元档安卓旗舰手机推
- 下一篇: 【转】ASP.NET 2.0 - 导航(