日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

dnet 并行编程学习总结

發(fā)布時間:2025/4/14 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 dnet 并行编程学习总结 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

.Net并行編程高級教程--Parallel

http://www.cnblogs.com/stoneniqiu/p/4857021.html


一直覺得自己對并發(fā)了解不夠深入,特別是看了《代碼整潔之道》覺得自己有必要好好學(xué)學(xué)并發(fā)編程,

因為性能也是衡量代碼整潔的一大標(biāo)準(zhǔn)。而且在《失控》這本書中也多次提到并發(fā),不管是計算機還是

生物都并發(fā)處理著各種事物。人真是奇怪,當(dāng)你關(guān)注一個事情的時候,你會發(fā)現(xiàn)周圍的事物中就常出現(xiàn)

那個事情。所以好奇心驅(qū)使下學(xué)習(xí)并發(fā)。便有了此文。

一、理解硬件線程和軟件線程

? ? ?多核處理器帶有一個以上的物理內(nèi)核--物理內(nèi)核是真正的獨立處理單元,多個物理內(nèi)核使得多條指

令能夠同時并行運行。硬件線程也稱為邏輯內(nèi)核,一個物理內(nèi)核可以使用超線程技術(shù)提供多個硬件線程

。所以一個硬件線程并不代表一個物理內(nèi)核;Windows中每個運行的程序都是一個進程,每一個進程都會

創(chuàng)建并運行一個或多個線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其

中游泳的人。

二、并行場合

? ? .Net Framework4 引入了新的Task Parallel Library(任務(wù)并行庫,TPL),它支持?jǐn)?shù)據(jù)并行、任務(wù)

并行和流水線。讓開發(fā)人員應(yīng)付不同的并行場合。

數(shù)據(jù)并行:有大量數(shù)據(jù)需要處理,并且必須對每一份數(shù)據(jù)執(zhí)行同樣的操作。比如通過256bit的密鑰對100

個Unicode字符串進行AES算法加密。
任務(wù)并行:通過任務(wù)并發(fā)運行不同的操作。例如生成文件散列碼,加密字符串,創(chuàng)建縮略圖。
流水線:這是任務(wù)并行和數(shù)據(jù)并行的結(jié)合體。
? ? TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的并發(fā)的操作,然而我們不

一定要使用Task類的實例,可以使用Parallel靜態(tài)類。它提供了Parallel.Invoke, Parallel.For?

Parallel.Forecah 三個方法。

三、Parallel.Invoke

? ? ?試圖讓很多方法并行運行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個方法:

WatchMovie
HaveDinner
ReadBook
WriteBlog
? ? 通過下面的代碼就可以使用并行。

?System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
? 這段代碼會創(chuàng)建指向每一個方法的委托。Invoke方法接受一個Action的參數(shù)組。

1
public static void Invoke(params Action[] actions);
? 用lambda表達式或匿名委托可以達到同樣的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () =>?
ReadBook(), delegate() { WriteBlog(); });
?1.沒有特定的執(zhí)行順序。

? ?Parallel.Invoke方法只有在4個方法全部完成之后才會返回。它至少需要4個硬件線程才足以讓這4個
方法并發(fā)運行。但并不保證這4個方法能夠同時啟動運行,如果一個或者多個內(nèi)核處于繁忙狀態(tài),那么底
層的調(diào)度邏輯可能會延遲某些方法的初始化執(zhí)行。

給方法加上延時,就可以看到必須等待最長的方法執(zhí)行完成才回到主方法。

這樣會造成很多邏輯內(nèi)核處于長時間閑置狀態(tài)。

四、Parallel.For

Parallel.For為固定數(shù)目的獨立For循環(huán)迭代提供了負載均衡 (即將工作分發(fā)到不同的任務(wù)中執(zhí)行,這樣
所有的任務(wù)在大部分時間都可以保持繁忙) 的并行執(zhí)行。從而能盡可能地充分利用所有的可用的內(nèi)核。
我們比較下下面兩個方法,一個使用For循環(huán),一個使用Parallel.For ?都是生成密鑰在轉(zhuǎn)換為十六進制
字符串。

private static void GenerateAESKeys(){var sw = Stopwatch.StartNew();for (int i = 0; i < NUM_AES_KEYS; i++){var aesM = new AesManaged();aesM.GenerateKey();byte[] result = aesM.Key;string hexStr = ConverToHexString(result);}Console.WriteLine("AES:"+sw.Elapsed.ToString());}private static void ParallelGenerateAESKeys(){var sw = Stopwatch.StartNew();System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>{var aesM = new AesManaged();aesM.GenerateKey();byte[] result = aesM.Key;string hexStr = ConverToHexString(result);});Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());}private static int NUM_AES_KEYS = 100000;static void Main(string[] args){Console.WriteLine("執(zhí)行"+NUM_AES_KEYS+"次:");GenerateAESKeys();ParallelGenerateAESKeys();Console.ReadKey();}


執(zhí)行1000000次

這里并行的時間是串行的一半。
?
五、Parallel.ForEach

在Parallel.For中,有時候?qū)扔醒h(huán)進行優(yōu)化可能會是一個非常復(fù)雜的任務(wù)。Parallel.ForEach為固

定數(shù)目的獨立For Each循環(huán)迭代提供了負載均衡的并行執(zhí)行,且支持自定義分區(qū)器,讓使用者可以完全

掌握數(shù)據(jù)分發(fā)。實質(zhì)就是將所有要處理的數(shù)據(jù)區(qū)分為多個部分,然后并行運行這些串行循環(huán)。

修改上面的代碼:

? System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var aesM = new AesManaged();
? ? ? ? ? ? ? ? Console.WriteLine("AES Range({0},{1} 循環(huán)開始時間:

{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);

? ? ? ? ? ? ? ? for (int i = range.Item1; i < range.Item2; i++)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? aesM.GenerateKey();
? ? ? ? ? ? ? ? ? ? byte[] result = aesM.Key;
? ? ? ? ? ? ? ? ? ? string hexStr = ConverToHexString(result);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Console.WriteLine("AES:"+sw.Elapsed.ToString());
? ? ? ? ? ? });

從執(zhí)行結(jié)果可以看出,分了13個段執(zhí)行的。

第二次執(zhí)行還是13個段。速度上稍微有差異。開始沒有指定分區(qū)數(shù),Partitioner.Create使用的是內(nèi)置

默認值。

而且我們發(fā)現(xiàn)這些分區(qū)并不是同時執(zhí)行的,大致是分了三個時間段執(zhí)行。而且執(zhí)行順序是不同的??偟?

時間和Parallel.For的方法差不多。

?public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source,?

Action<TSource> body)
Parallel.ForEach方法定義了source和Body兩個參數(shù)。source是指分區(qū)器。提供了分解為多個分區(qū)的數(shù)

據(jù)源。body是要調(diào)用的委托。它接受每一個已定義的分區(qū)作為參數(shù)。一共有20多個重載,在上面的例子

中,分區(qū)的類型為Tuple<int,int>,是一個二元組類型。此外,返回一個ParallelLoopResult的值。

Partitioner.Create 創(chuàng)建分區(qū)是根據(jù)邏輯內(nèi)核數(shù)及其他因素決定。

?public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int?
toExclusive)
? ? {
? ? ? int num = 3;
? ? ? if (toExclusive <= fromInclusive)
? ? ? ? throw new ArgumentOutOfRangeException("toExclusive");
? ? ? int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount *?
num);
? ? ? if (rangeSize == 0)
? ? ? ? rangeSize = 1;
? ? ? return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive,?
toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
? ? }

因此我們可以修改分區(qū)數(shù)目,rangesize大致為250000左右。也就是說我的邏輯內(nèi)核是4.

? ?var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
? ?System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS +?
1,rangesize), range =>
再次執(zhí)行:

分區(qū)變成了四個,時間上沒有多大差別(第一個時間是串行時間)。我們看見這四個分區(qū)幾乎是同時執(zhí)

行的。大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,然而對分區(qū)的控制便于使用者對

自己的工作負載進行分析,來改進整體的性能。

Parallel.ForEach也能對IEnumerable<int>集合進行重構(gòu)。Enumerable.Range生產(chǎn)了序列化的數(shù)目。但

這樣就沒有上面的分區(qū)效果。

?private static void ParallelForEachGenerateMD5HasHes()
? ? ? ? {
? ? ? ? ? ? var sw = Stopwatch.StartNew();
? ? ? ? ? ? System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS),?

number =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var md5M = MD5.Create();
? ? ? ? ? ? ? ? byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
? ? ? ? ? ? ? ? byte[] result = md5M.ComputeHash(data);
? ? ? ? ? ? ? ? string hexString = ConverToHexString(result);
? ? ? ? ? ? });
? ? ? ? ? ? Console.WriteLine("MD5:"+sw.Elapsed.ToString());
? ? ? ? }

六、從循環(huán)中退出

和串行運行中的break不同,ParallelLoopState 提供了兩個方法用于停止Parallel.For 和?

Parallel.ForEach的執(zhí)行。

Break:讓循環(huán)在執(zhí)行了當(dāng)前迭代后盡快停止執(zhí)行。比如執(zhí)行到100了,那么循環(huán)會處理掉所有小于100的

迭代。
Stop:讓循環(huán)盡快停止執(zhí)行。如果執(zhí)行到了100的迭代,那不能保證處理完所有小于100的迭代。
修改上面的方法:執(zhí)行3秒后退出。

? private static void ParallelLoopResult(ParallelLoopResult loopResult)
? ? ? ? {
? ? ? ? ? ? string text;
? ? ? ? ? ? if (loopResult.IsCompleted)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? text = "循環(huán)完成";
? ? ? ? ? ? }
? ? ? ? ? ? else
? ? ? ? ? ? {
? ? ? ? ? ? ? ? if (loopResult.LowestBreakIteration.HasValue)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? text = "Break終止";
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? text = "Stop 終止";
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? Console.WriteLine(text);
? ? ? ? }

? ? ? ? private static void ParallelForEachGenerateMD5HasHesBreak()
? ? ? ? {
? ? ? ? ? ? var sw = Stopwatch.StartNew();
? ? ? ? ? ? var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1,?
NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var md5M = MD5.Create();
? ? ? ? ? ? ? ? byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
? ? ? ? ? ? ? ? byte[] result = md5M.ComputeHash(data);
? ? ? ? ? ? ? ? string hexString = ConverToHexString(result);
? ? ? ? ? ? ? ? if (sw.Elapsed.Seconds > 3)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? loopState.Stop();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? ? ? ParallelLoopResult(loopresult);
? ? ? ? ? ? Console.WriteLine("MD5:" + sw.Elapsed);
? ? ? ? }

七、捕捉并行循環(huán)中發(fā)生的異常。

? 當(dāng)并行迭代中調(diào)用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的

System.AggregateException負責(zé)處理這一組異常。

?private static void ParallelForEachGenerateMD5HasHesException()
? ? ? ? {
? ? ? ? ? ? var sw = Stopwatch.StartNew();
? ? ? ? ? ? var loopresult = new ParallelLoopResult();
? ? ? ? ? ? try
? ? ? ? ? ? {
? ? ? ? ? ? ? ? loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1,?
NUM_AES_KEYS), (number, loopState) =>
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? var md5M = MD5.Create();
? ? ? ? ? ? ? ? ? ? byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
? ? ? ? ? ? ? ? ? ? byte[] result = md5M.ComputeHash(data);
? ? ? ? ? ? ? ? ? ? string hexString = ConverToHexString(result);
? ? ? ? ? ? ? ? ? ? if (sw.Elapsed.Seconds > 3)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? throw new TimeoutException("執(zhí)行超過三秒");
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
? ? ? ? ? ? }
? ? ? ? ? ? catch (AggregateException ex)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? foreach (var innerEx in ?ex.InnerExceptions)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? Console.WriteLine(innerEx.ToString());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ?
? ? ? ? ? ? ParallelLoopResult(loopresult);
? ? ? ? ? ? Console.WriteLine("MD5:" + sw.Elapsed);
? ? ? ? }


結(jié)果:

?異常出現(xiàn)了好幾次。

?八、指定并行度。

TPL的方法總會試圖利用所有可用的邏輯內(nèi)核來實現(xiàn)最好的結(jié)果,但有時候你并不希望在并行循環(huán)中使用

所有的內(nèi)核。比如你需要留出一個不參與并行計算的內(nèi)核,來創(chuàng)建能夠響應(yīng)用戶的應(yīng)用程序,而且這個

內(nèi)核需要幫助你運行代碼中的其他部分。這個時候一種好的解決方法就是指定最大并行度。

這需要創(chuàng)建一個ParallelOptions的實例,設(shè)置MaxDegreeOfParallelism的值。

?private static void ParallelMaxDegree(int maxDegree)
? ? ? ? {
? ? ? ? ? ? var parallelOptions = new ParallelOptions();
? ? ? ? ? ? parallelOptions.MaxDegreeOfParallelism = maxDegree;

? ? ? ? ? ? var sw = Stopwatch.StartNew();
? ? ? ? ? ? System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int?

i) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var aesM = new AesManaged();
? ? ? ? ? ? ? ? aesM.GenerateKey();
? ? ? ? ? ? ? ? byte[] result = aesM.Key;
? ? ? ? ? ? ? ? string hexStr = ConverToHexString(result);
? ? ? ? ? ? });
? ? ? ? ? ? Console.WriteLine("AES:" + sw.Elapsed.ToString());
? ? ? ? }

調(diào)用:如果在四核微處理器上運行,那么將使用3個內(nèi)核。

?ParallelMaxDegree(Environment.ProcessorCount - 1);

時間上大致慢了點(第一次Parallel.For 3.18s),但可以騰出一個內(nèi)核來處理其他的事情。

小結(jié):這次學(xué)習(xí)了Parallel相關(guān)方法以及如何退出并行循環(huán)和捕獲異常、設(shè)置并行度,還有并行相關(guān)的

知識。園子里也有類似的博客。但作為自己知識的管理,在這里梳理一遍。
========

C#并行編程-Task

http://www.cnblogs.com/lonelyxmas/p/3959303.html


菜鳥學(xué)習(xí)并行編程,參考《C#并行編程高級教程.PDF》,如有錯誤,歡迎指正。

任務(wù)簡介

TPL引入新的基于任務(wù)的編程模型,通過這種編程模型可以發(fā)揮多核的功效,提升應(yīng)用程序的性能,不需

要編寫底層復(fù)雜且重量級的線程代碼。

但需要注意:任務(wù)并不是線程(任務(wù)運行的時候需要使用線程,但并不是說任務(wù)取代了線程,任務(wù)代碼

是使用底層的線程(軟件線程,調(diào)度在特定的硬件線程或邏輯內(nèi)核上)運行的,任務(wù)與線程之間并沒有

一對一的關(guān)系。)

創(chuàng)建一個新的任務(wù)時,調(diào)度器(調(diào)度器依賴于底層的線程池引擎)會使用工作竊取隊列找到一個最合適

的線程,然后將任務(wù)加入隊列,任務(wù)所包含的代碼會在一個線程中運行。如圖:

System.Threading.Tasks.Task

一個Task表示一個異步操作,Task提供了很多方法和屬性,通過這些方法和屬性能夠?qū)ask的執(zhí)行進行

控制,并且能夠獲得其狀態(tài)信息。

Task的創(chuàng)建和執(zhí)行都是獨立的,因此可以對關(guān)聯(lián)操作的執(zhí)行擁有完全的控制權(quán)。

使用Parallel.For、Parallel.ForEach的循環(huán)迭代的并行執(zhí)行,TPL會在后臺創(chuàng)建

System.Threading.Tasks.Task的實例。

使用Parallel.Invoke時,TPL也會創(chuàng)建與調(diào)用的委托數(shù)目一致的System.Threading.Tasks.Task的實例。

注意項

程序中添加很多異步的操作作為Task實例加載的時候,為了充分利用運行時所有可用的邏輯內(nèi)核,任務(wù)

調(diào)度器會嘗試的并行的運行這些任務(wù),也會嘗試在所有的可用內(nèi)核上對工作進行負載均衡。

但在實際的編碼過程當(dāng)中,并不是所有的代碼片段都能夠方便的用任務(wù)來運行,因為任務(wù)會帶來額外的

開銷,盡管這種開銷比添加線程所帶來的開銷要小,但是仍然需要將這個開銷考慮在內(nèi)。

Task狀態(tài)與生命周期

一個Task實例只會完成其生命周期一次,當(dāng)Task到達它的3種肯呢過的最終狀態(tài)之一是,就無法回到之前

的任何狀態(tài)

下面貼代碼,詳解見注釋,方便大家理解Task的狀態(tài):

使用任務(wù)來對代碼進行并行化

使用Parallel.Invoke可以并行加載多個方法,使用Task實例也能完成同樣的工作,下面貼代碼:

等待任務(wù)完成Task.WaitAll
Task.WaitAll 方法,這個方法是同步執(zhí)行的,在Task作為參數(shù)被接受,所有Task結(jié)束其執(zhí)行前,主線程

不會繼續(xù)執(zhí)行下一條指令,下面貼代碼

Task.WaitAll 限定等待時長

如圖10毫秒沒有完成任務(wù),則輸出了****

通過取消標(biāo)記取消任務(wù)

通過取消標(biāo)記來中斷Task實例的執(zhí)行。 CancellationTokenSource,CancellationToken下的IsCanceled

屬性標(biāo)志當(dāng)前是否已經(jīng)被取消,取消任務(wù),任務(wù)也不一定會馬上取消,下面貼代碼:

Task異常處理 當(dāng)很多任務(wù)并行運行的時候,可能會并行發(fā)生很多異常。Task實例能夠處理一組一組的異

常,這些異常有System.AggregateException類處理

Task返回值 ?Task<TResult>

通過延續(xù)串聯(lián)多個任務(wù)

ContinueWith:創(chuàng)建一個目標(biāo)Task完成時,異步執(zhí)行的延續(xù)程序,await,如代碼所示:

TaskContinuationOptions

TaskContinuationOptions參數(shù),可以控制延續(xù)另一個任的任務(wù)調(diào)度和執(zhí)行的可選行為。下面看代碼:

TaskContinuationOptions 屬性有很多,如下所示

?關(guān)于并行編程中的Task就寫到這,如有問題,請指正。
========

C# 并行編程 之 并發(fā)集合 (.Net Framework 4.0)

http://blog.csdn.net/wangzhiyu1980/article/details/45497907


此文為個人學(xué)習(xí)《C#并行編程高級教程》的筆記,總結(jié)并調(diào)試了一些文章中的代碼示例。 在以后開發(fā)過

程中可以加以運用。

對于并行任務(wù),與其相關(guān)緊密的就是對一些共享資源,數(shù)據(jù)結(jié)構(gòu)的并行訪問。經(jīng)常要做的就是對一些隊

列進行加鎖-解鎖,然后執(zhí)行類似插入,刪除等等互斥操作。 .NetFramework 4.0 中提供了一些封裝好

的支持并行操作數(shù)據(jù)容器,可以減少并行編程的復(fù)雜程度。

基本信息
.NetFramework中并行集合的名字空間: System.Collections.Concurrent

并行容器:

ConcurrentQueue
ConcurrentStack
ConcurrentBag : 一個無序的數(shù)據(jù)結(jié)構(gòu)集,當(dāng)不需要考慮順序時非常有用。
BlockingCollection : 與經(jīng)典的阻塞隊列數(shù)據(jù)結(jié)構(gòu)類似
ConcurrentDictionary

這些集合在某種程度上使用了無鎖技術(shù)(CAS Compare-and-Swap和內(nèi)存屏障 Memory Barrier),與加互斥

鎖相比獲得了性能的提升。但在串行程序中,最好不用這些集合,它們必然會影響性能。

關(guān)于CAS:?
http://www.tuicool.com/articles/zuui6z
http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml
關(guān)于內(nèi)存屏障
http://en.wikipedia.org/wiki/Memory_barrier


用法與示例
ConcurrentQueue
其完全無鎖,但當(dāng)CAS面臨資源競爭失敗時可能會陷入自旋并重試操作。


Enqueue:在隊尾插入元素
TryDequeue:嘗試刪除隊頭元素,并通過out參數(shù)返回
TryPeek:嘗試將對頭元素通過out參數(shù)返回,但不刪除該元素。


程序示例:

using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_1_concurrent_queue { class Program { internal static ConcurrentQueue<int> _TestQueue; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestQueue.Enqueue(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (; ; ) { IsDequeuue = _TestQueue.TryDequeue(out i); if (IsDequeuue) System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "====="); if (i == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestQueue = new ConcurrentQueue<int>(); Console.WriteLine("Sample 3-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 3-1 Main }"); Console.ReadKey(); } } }


ConcurrentStack
其完全無鎖,但當(dāng)CAS面臨資源競爭失敗時可能會陷入自旋并重試操作。

Push:向棧頂插入元素
TryPop:從棧頂彈出元素,并且通過out 參數(shù)返回
TryPeek:返回棧頂元素,但不彈出。

程序示例:

using System; ?
using System.Text; ?
??
using System.Threading.Tasks; ?
using System.Collections.Concurrent; ?
??
namespace Sample4_2_concurrent_stack ?
{ ?
? ? class Program ?
? ? { ?
? ? ? ? internal static ConcurrentStack<int> _TestStack; ?
??
? ? ? ? class ThreadWork1 ?// producer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork1() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run { "); ?
? ? ? ? ? ? ? ? for (int i = 0; i < 100; i++) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 producer: " + i); ?
? ? ? ? ? ? ? ? ? ? _TestStack.Push(i); ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? class ThreadWork2 ?// consumer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork2() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? int i = 0; ?
? ? ? ? ? ? ? ? bool IsDequeuue = false; ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run { "); ?
? ? ? ? ? ? ? ? for (; ; ) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? IsDequeuue = _TestStack.TryPop(out i); ?
? ? ? ? ? ? ? ? ? ? if (IsDequeuue) ?
? ? ? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ??


=====" + i); ?
??
? ? ? ? ? ? ? ? ? ? if (i == 99) ?
? ? ? ? ? ? ? ? ? ? ? ? break; ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? static void StartT1() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork1 work1 = new ThreadWork1(); ?
? ? ? ? ? ? work1.run(); ?
? ? ? ? } ?
??
? ? ? ? static void StartT2() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork2 work2 = new ThreadWork2(); ?
? ? ? ? ? ? work2.run(); ?
? ? ? ? } ?
? ? ? ? static void Main(string[] args) ?
? ? ? ? { ?
? ? ? ? ? ? Task t1 = new Task(() => StartT1()); ?
? ? ? ? ? ? Task t2 = new Task(() => StartT2()); ?
??
? ? ? ? ? ? _TestStack = new ConcurrentStack<int>(); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-1 Main {"); ?
??
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started {"); ?
? ? ? ? ? ? t1.Start(); ?
? ? ? ? ? ? t2.Start(); ?
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started }"); ?
??
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end {"); ?
? ? ? ? ? ? Task.WaitAll(t1, t2); ?
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end }"); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-1 Main }"); ?
??
? ? ? ? ? ? Console.ReadKey(); ?
? ? ? ? } ?
? ? } ?
} ?


測試中一個有趣的現(xiàn)象:


雖然生產(chǎn)者已經(jīng)在棧中插入值已經(jīng)到了25,但消費者第一個出棧的居然是4,而不是25。很像是出錯了。


但仔細想想入棧,出棧和打印語句是兩個部分,而且并不是原子操作,出現(xiàn)這種現(xiàn)象應(yīng)該也算正常。


Sample 3-1 Main {
Main t1 t2 started {
Main t1 t2 started }
Main wait t1 t2 end {
ThreadWork1 run {
ThreadWork1 producer: 0
ThreadWork2 run {
ThreadWork1 producer: 1
ThreadWork1 producer: 2
ThreadWork1 producer: 3
ThreadWork1 producer: 4
ThreadWork1 producer: 5
ThreadWork1 producer: 6
ThreadWork1 producer: 7
ThreadWork1 producer: 8
ThreadWork1 producer: 9
ThreadWork1 producer: 10
ThreadWork1 producer: 11
ThreadWork1 producer: 12
ThreadWork1 producer: 13
ThreadWork1 producer: 14
ThreadWork1 producer: 15
ThreadWork1 producer: 16
ThreadWork1 producer: 17
ThreadWork1 producer: 18
ThreadWork1 producer: 19
ThreadWork1 producer: 20
ThreadWork1 producer: 21
ThreadWork1 producer: 22
ThreadWork1 producer: 23
ThreadWork1 producer: 24
ThreadWork1 producer: 25
ThreadWork2 consumer: 16 ? =====4
ThreadWork2 consumer: 625 ? =====25
ThreadWork2 consumer: 576 ? =====24
ThreadWork2 consumer: 529 ? =====23
ThreadWork1 producer: 26
ThreadWork1 producer: 27
ThreadWork1 producer: 28


ConcurrentBag
一個無序的集合,程序可以向其中插入元素,或刪除元素。
在同一個線程中向集合插入,刪除元素的效率很高。


?Add:向集合中插入元素
?TryTake:從集合中取出元素并刪除
?TryPeek:從集合中取出元素,但不刪除該元素。


程序示例:


[csharp] view plain copy 在CODE上查看代碼片派生到我的代碼片
using System; ?
using System.Text; ?
??
using System.Threading.Tasks; ?
using System.Collections.Concurrent; ?
??
namespace Sample4_3_concurrent_bag ?
{ ?
? ? class Program ?
? ? { ?
? ? ? ? internal static ConcurrentBag<int> _TestBag; ?
??
? ? ? ? class ThreadWork1 ?// producer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork1() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run { "); ?
? ? ? ? ? ? ? ? for (int i = 0; i < 100; i++) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 producer: " + i); ?
? ? ? ? ? ? ? ? ? ? _TestBag.Add(i); ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? class ThreadWork2 ?// consumer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork2() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? int i = 0; ?
? ? ? ? ? ? ? ? int nCnt = 0; ?
? ? ? ? ? ? ? ? bool IsDequeuue = false; ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run { "); ?
? ? ? ? ? ? ? ? for (;;) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? IsDequeuue = _TestBag.TryTake(out i); ?
? ? ? ? ? ? ? ? ? ? if (IsDequeuue) ?
? ? ? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ??


=====" + i); ?
? ? ? ? ? ? ? ? ? ? ? ? nCnt++; ?
? ? ? ? ? ? ? ? ? ? } ?
??
? ? ? ? ? ? ? ? ? ? if (nCnt == 99) ?
? ? ? ? ? ? ? ? ? ? ? ? break; ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? static void StartT1() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork1 work1 = new ThreadWork1(); ?
? ? ? ? ? ? work1.run(); ?
? ? ? ? } ?
??
? ? ? ? static void StartT2() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork2 work2 = new ThreadWork2(); ?
? ? ? ? ? ? work2.run(); ?
? ? ? ? } ?
? ? ? ? static void Main(string[] args) ?
? ? ? ? { ?
? ? ? ? ? ? Task t1 = new Task(() => StartT1()); ?
? ? ? ? ? ? Task t2 = new Task(() => StartT2()); ?
??
? ? ? ? ? ? _TestBag = new ConcurrentBag<int>(); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-3 Main {"); ?
??
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started {"); ?
? ? ? ? ? ? t1.Start(); ?
? ? ? ? ? ? t2.Start(); ?
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started }"); ?
??
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end {"); ?
? ? ? ? ? ? Task.WaitAll(t1, t2); ?
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end }"); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-3 Main }"); ?
??
? ? ? ? ? ? Console.ReadKey(); ?
? ? ? ? } ?
? ? } ?
} ?


BlockingCollection
一個支持界限和阻塞的容器


Add :向容器中插入元素
TryTake:從容器中取出元素并刪除
TryPeek:從容器中取出元素,但不刪除。
CompleteAdding:告訴容器,添加元素完成。此時如果還想繼續(xù)添加會發(fā)生異常。
IsCompleted:告訴消費線程,生產(chǎn)者線程還在繼續(xù)運行中,任務(wù)還未完成。


示例程序:


程序中,消費者線程完全使用 ?while (!_TestBCollection.IsCompleted) 作為退出運行的判斷條件。
在Worker1中,有兩條語句被注釋掉了,當(dāng)i 為50時設(shè)置CompleteAdding,但當(dāng)繼續(xù)向其中插入元素時,


系統(tǒng)拋出異常,提示無法再繼續(xù)插入。


[csharp] view plain copy 在CODE上查看代碼片派生到我的代碼片
using System; ?
using System.Text; ?
??
using System.Threading.Tasks; ?
using System.Collections.Concurrent; ?
??
namespace Sample4_4_concurrent_bag ?
{ ?
? ? class Program ?
? ? { ?
? ? ? ? internal static BlockingCollection<int> _TestBCollection; ?
??
? ? ? ? class ThreadWork1 ?// producer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork1() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run { "); ?
? ? ? ? ? ? ? ? for (int i = 0; i < 100; i++) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 producer: " + i); ?
? ? ? ? ? ? ? ? ? ? _TestBCollection.Add(i); ?
? ? ? ? ? ? ? ? ? ? //if (i == 50) ?
? ? ? ? ? ? ? ? ? ? // ? ?_TestBCollection.CompleteAdding(); ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? _TestBCollection.CompleteAdding(); ?
??
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? class ThreadWork2 ?// consumer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork2() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? int i = 0; ?
? ? ? ? ? ? ? ? int nCnt = 0; ?
? ? ? ? ? ? ? ? bool IsDequeuue = false; ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run { "); ?
? ? ? ? ? ? ? ? while (!_TestBCollection.IsCompleted) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? IsDequeuue = _TestBCollection.TryTake(out i); ?
? ? ? ? ? ? ? ? ? ? if (IsDequeuue) ?
? ? ? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ??


=====" + i); ?
? ? ? ? ? ? ? ? ? ? ? ? nCnt++; ?
? ? ? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? static void StartT1() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork1 work1 = new ThreadWork1(); ?
? ? ? ? ? ? work1.run(); ?
? ? ? ? } ?
??
? ? ? ? static void StartT2() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork2 work2 = new ThreadWork2(); ?
? ? ? ? ? ? work2.run(); ?
? ? ? ? } ?
? ? ? ? static void Main(string[] args) ?
? ? ? ? { ?
? ? ? ? ? ? Task t1 = new Task(() => StartT1()); ?
? ? ? ? ? ? Task t2 = new Task(() => StartT2()); ?
??
? ? ? ? ? ? _TestBCollection = new BlockingCollection<int>(); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-4 Main {"); ?
??
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started {"); ?
? ? ? ? ? ? t1.Start(); ?
? ? ? ? ? ? t2.Start(); ?
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started }"); ?
??
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end {"); ?
? ? ? ? ? ? Task.WaitAll(t1, t2); ?
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end }"); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-4 Main }"); ?
??
? ? ? ? ? ? Console.ReadKey(); ?
? ? ? ? } ?
? ? } ?
} ?


當(dāng)然可以嘗試在Work1中注釋掉 CompleteAdding 語句,此時Work2陷入循環(huán)無法退出。


ConcurrentDictionary
對于讀操作是完全無鎖的,當(dāng)很多線程要修改數(shù)據(jù)時,它會使用細粒度的鎖。


AddOrUpdate:如果鍵不存在,方法會在容器中添加新的鍵和值,如果存在,則更新現(xiàn)有的鍵和值。
GetOrAdd:如果鍵不存在,方法會向容器中添加新的鍵和值,如果存在則返回現(xiàn)有的值,并不添加新值



TryAdd:嘗試在容器中添加新的鍵和值。
TryGetValue:嘗試根據(jù)指定的鍵獲得值。
TryRemove:嘗試刪除指定的鍵。
TryUpdate:有條件的更新當(dāng)前鍵所對應(yīng)的值。
GetEnumerator:返回一個能夠遍歷整個容器的枚舉器。


程序示例:


[csharp] view plain copy 在CODE上查看代碼片派生到我的代碼片
using System; ?
using System.Text; ?
??
using System.Threading.Tasks; ?
using System.Collections.Concurrent; ?
? ??
namespace Sample4_5_concurrent_dictionary ?
{ ?
? ? class Program ?
? ? { ?
? ? ? ? internal static ConcurrentDictionary<int, int> _TestDictionary; ?
??
? ? ? ? class ThreadWork1 ?// producer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork1() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run { "); ?
? ? ? ? ? ? ? ? for (int i = 0; i < 100; i++) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 producer: " + i); ?
? ? ? ? ? ? ? ? ? ? _TestDictionary.TryAdd(i, i); ?
? ? ? ? ? ? ? ? } ?
??
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork1 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? class ThreadWork2 ?// consumer ?
? ? ? ? { ?
? ? ? ? ? ? public ThreadWork2() ?
? ? ? ? ? ? { } ?
??
? ? ? ? ? ? public void run() ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? int i = 0, nCnt = 0; ?
? ? ? ? ? ? ? ? int nValue = 0; ?
? ? ? ? ? ? ? ? bool IsOk = false; ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run { "); ?
? ? ? ? ? ? ? ? while (nCnt < 100) ?
? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? IsOk = _TestDictionary.TryGetValue(i, out nValue); ?
? ? ? ? ? ? ? ? ? ? if (IsOk) ?
? ? ? ? ? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ??


=====" + i); ?
? ? ? ? ? ? ? ? ? ? ? ? nValue = nValue * nValue; ?
? ? ? ? ? ? ? ? ? ? ? ? _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return?


value = nValue; }); ?
? ? ? ? ? ? ? ? ? ? ? ? nCnt++; ?
? ? ? ? ? ? ? ? ? ? ? ? i++; ?
? ? ? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? } ?
? ? ? ? ? ? ? ? System.Console.WriteLine("ThreadWork2 run } "); ?
? ? ? ? ? ? } ?
? ? ? ? } ?
??
? ? ? ? static void StartT1() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork1 work1 = new ThreadWork1(); ?
? ? ? ? ? ? work1.run(); ?
? ? ? ? } ?
??
? ? ? ? static void StartT2() ?
? ? ? ? { ?
? ? ? ? ? ? ThreadWork2 work2 = new ThreadWork2(); ?
? ? ? ? ? ? work2.run(); ?
? ? ? ? } ?
? ? ? ? static void Main(string[] args) ?
? ? ? ? { ?
? ? ? ? ? ? Task t1 = new Task(() => StartT1()); ?
? ? ? ? ? ? Task t2 = new Task(() => StartT2()); ?
? ? ? ? ? ? bool bIsNext = true; ?
? ? ? ? ? ? int ?nValue = 0; ?
??
? ? ? ? ? ? _TestDictionary = new ConcurrentDictionary<int, int>(); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-5 Main {"); ?
??
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started {"); ?
? ? ? ? ? ? t1.Start(); ?
? ? ? ? ? ? t2.Start(); ?
? ? ? ? ? ? Console.WriteLine("Main t1 t2 started }"); ?
??
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end {"); ?
? ? ? ? ? ? Task.WaitAll(t1, t2); ?
? ? ? ? ? ? Console.WriteLine("Main wait t1 t2 end }"); ?
??
? ? ? ? ? ? foreach (var pair in _TestDictionary) ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? Console.WriteLine(pair.Key + " : " + pair.Value); ?
? ? ? ? ? ? } ? ?
? ? ? ? ? ??


System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> ??
? ? ? ? ? ? ? ? enumer = _TestDictionary.GetEnumerator(); ?
??
? ? ? ? ? ? while (bIsNext) ?
? ? ? ? ? ? { ?
? ? ? ? ? ? ? ? bIsNext = enumer.MoveNext(); ?
? ? ? ? ? ? ? ? Console.WriteLine("Key: " + enumer.Current.Key + ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? " ?Value: " + enumer.Current.Value); ?
??
? ? ? ? ? ? ? ? _TestDictionary.TryRemove(enumer.Current.Key, out nValue); ?
? ? ? ? ? ? } ?
??
? ? ? ? ? ? Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count); ?
??
? ? ? ? ? ? Console.WriteLine("Sample 4-5 Main }"); ?
??
? ? ? ? ? ? Console.ReadKey(); ?
? ? ? ? } ?
? ? } ?
}?
========

C#并行編程-PLINQ:聲明式數(shù)據(jù)并行

http://www.cnblogs.com/woxpp/p/3951096.html
目錄


C#并行編程-相關(guān)概念


C#并行編程-Parallel


C#并行編程-Task


C#并行編程-并發(fā)集合


C#并行編程-線程同步原語


C#并行編程-PLINQ:聲明式數(shù)據(jù)并行


背景


通過LINQ可以方便的查詢并處理不同的數(shù)據(jù)源,使用Parallel LINQ (PLINQ)來充分獲得并行化所帶來的


優(yōu)勢。


PLINQ不僅實現(xiàn)了完整的LINQ操作符,而且還添加了一些用于執(zhí)行并行的操作符,與對應(yīng)的LINQ相比,通


過PLINQ可以獲得明顯的加速,但是具體的加速效果還要取決于具體的場景,不過在并行化的情況下一段


會加速。


如果一個查詢涉及到大量的計算和內(nèi)存密集型操作,而且順序并不重要,那么加速會非常明顯,然而,


如果順序很重要,那么加速就會受到影響。


AsParallel() 啟用查詢的并行化


下面貼代碼,看下效果,詳情見注釋:


?View Code


當(dāng)前模擬的數(shù)據(jù)量比較少,數(shù)據(jù)量越多,采用并行化查詢的效果越明顯


AsOrdered()與orderby
AsOrdered:保留查詢的結(jié)果按源序列排序,在并行查詢中,多條數(shù)據(jù)會被分在多個區(qū)域中進行查詢,查


詢后再將多個區(qū)的數(shù)據(jù)結(jié)果合并到一個結(jié)果集中并按源序列順序返回。


orderby:將返回的結(jié)果集按指定順序進行排序


下面貼代碼方便大家理解:


在PLINQ查詢中,AsOrdered()和orderby子句都會降低運行速度,所以如果順序并不是必須的,那么在請


求特定順序的結(jié)果之前,將加速效果與串行執(zhí)行的性能進行比較是非常重要的。


指定執(zhí)行模式 WithExecutionMode


對串行化代碼進行并行化,會帶來一定的額外開銷,Plinq查詢執(zhí)行并行化也是如此,在默認情況下,執(zhí)


行PLINQ查詢的時候,.NET機制會盡量避免高開銷的并行化算法,這些算法有可能會將執(zhí)行的性能降低到


地獄串行執(zhí)行的性能。


.NET會根據(jù)查詢的形態(tài)做出決策,并不開了數(shù)據(jù)集大小和委托執(zhí)行的時間,不過也可以強制并行執(zhí)行,


而不用考慮執(zhí)行引擎分析的結(jié)果,可以調(diào)用WithExecutionMode方法來進行設(shè)置。、


下面貼代碼,方便大家理解


通過PLINQ執(zhí)行歸約操作


PLINQ可以簡化對一個序列或者一個組中所有成員應(yīng)用一個函數(shù)的過程,這個過程稱之為歸約操作,如在


PLINQ查詢中使用類似于Average,Max,Min,Sum之類的聚合函數(shù)就可以充分利用并行所帶來好處。


并行執(zhí)行的規(guī)約和串行執(zhí)行的規(guī)約的執(zhí)行結(jié)果可能會不同,因為在操作不能同時滿足可交換和可傳遞的


情況下產(chǎn)生攝入,在每次執(zhí)行的時候,序列或組中的元素在不同并行任務(wù)中分布可能也會有區(qū)別,因而


在這種操作的情況下可能會產(chǎn)生不同的最終結(jié)果,因此,一定要通過對于的串行版本來興義原始的數(shù)據(jù)


源,這樣才能幫助PLINQ獲得最優(yōu)的執(zhí)行結(jié)果。


下面貼代碼:


如上述代碼所示


在LINQ版本中,該方法會返回一個 IEumerable<int>,即調(diào)用 Eumerable.Range方法生成指定范圍整數(shù)


序列的結(jié)果,
在PLINQ版本中,該方法會返回一個 ParallelQuery<int>,即調(diào)用并行版本中


System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通過這種方法得到的結(jié)果序列也是


并行序列,可以再PLINQ中并行運行。


如果想對特定數(shù)據(jù)源進行LINQ查詢時,可以定義為 ?private IEquatable<int> products


如果想對特定數(shù)據(jù)源進行PLINQ查詢時,可以定義為 private ParallelQuery<int> products


并發(fā)PLINQ任務(wù)


如代碼所示tk1,tk2,tk3三個任務(wù),tk2,tk3任務(wù)的運行需要基于tk1任務(wù)的結(jié)果,因此,參數(shù)中指定了


TaskContinuationOptions.OnlyOnRanToCompletion,通過這種方式,每個被串聯(lián)的任務(wù)都會等待之前的


任務(wù)完成之后才開始執(zhí)行,tk2,tk3在tk1執(zhí)行完成后,這兩個任務(wù)的PLINQ查詢可以并行運行,并將會可


能地使用多個邏輯內(nèi)核。


取消PLINQ WithCancellation


通過WithCancellation取消當(dāng)前PLINQ正在執(zhí)行的查詢操作,代碼如下:


指定查詢時所需的并行度 WithDegreeOfParallelism


默認情況下,PLINQ總是會試圖利用所有的可用邏輯內(nèi)核達到最佳性能,在程序中我們可以利用


WithDegreeOfParallelism方法指定一個不同最大并行度。


下面貼代碼:


好處:如果計算機有8個可用的邏輯內(nèi)核,PLINQ查詢最多運行4個并發(fā)任務(wù),這樣可用使用


Parallel.Invoke 加載多個帶有不同并行度的PLINQ查詢,有一些PLINQ查詢的可擴展性有限,因此這些


選項可用讓您充分利用額外的內(nèi)核。


使用ForAll 并行遍歷結(jié)果


下面貼代碼:


ForAll是并行,foreach是串行,如果需要以特定的順序處理數(shù)據(jù),那么必須使用上述串行循環(huán)或方法。


WithMergeOptions


通過WithMergeOptions擴展方法提示PLINQ應(yīng)該優(yōu)先使用哪種方式合并并行結(jié)果片段,如下:


下面貼代碼查看下差異:


需要注意的是:每一個選項都有其優(yōu)點和缺點,因此一定奧測量顯示第一個結(jié)果的時間以及完成整個查


詢所需要的時間,這點很重要 。


使用PLINQ執(zhí)行MapReduce算法 ILookup IGrouping


mapreduce ,也稱為Map/reduce 或者Map&Reduce ,是一種非常流行的框架,能夠充分利用并行化處理巨


大的數(shù)據(jù)集,MapReduce的基本思想非常簡單:將數(shù)據(jù)處理問題分解為以下兩個獨立且可以并行執(zhí)行的操


作:


映射(Map)-對數(shù)據(jù)源進行操作,為每一個數(shù)據(jù)項計算出一個鍵值對。運行的結(jié)果是一個鍵值對的集合


,根據(jù)鍵進行分組。


規(guī)約(Reduce)-對映射操作產(chǎn)生的根據(jù)鍵進行分組的所有鍵值對進行操作,對每一個組執(zhí)行歸約操作,


這個操作可以返回一個或多個值。


下面貼代碼,方便大家理解,但是該案列所展示的并不是一個純粹的MapReduce算法實現(xiàn):


關(guān)于PLINQ:聲明式數(shù)據(jù)并行就寫到這,主要是PLINQ下的查詢注意項和查詢調(diào)優(yōu)的一些擴展方法。如有問


題,歡迎指正。
========

總結(jié)

以上是生活随笔為你收集整理的dnet 并行编程学习总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。