C#并行编程(3):并行循环
初識并行循環(huán)
并行循環(huán)主要用來處理數(shù)據(jù)并行的,如,同時對數(shù)組或列表中的多個數(shù)據(jù)執(zhí)行相同的操作。
在C#編程中,我們使用并行類System.Threading.Tasks.Parallel提供的靜態(tài)方法Parallel.For和Parallel.ForEach來實現(xiàn)并行循環(huán)。從方法名可以看出,這兩個方法是對常規(guī)循環(huán)for和foreach的并行化。
簡單用法
使用并行循環(huán)時需要傳入循環(huán)范圍(集合)和操作數(shù)據(jù)的委托Action<T>:
Parallel.For(0, 100, i => { Console.WriteLine(i); });Parallel.ForEach(Enumerable.Range(0, 100), i => { Console.WriteLine(i); });
使用場景
對于數(shù)據(jù)的處理需要耗費較長時間的循環(huán)適宜使用并行循環(huán),利用多線程加快執(zhí)行速度。
對于簡單的迭代操作,且迭代范圍較小,使用常規(guī)循環(huán)更好好,因為并行循環(huán)涉及到線程的創(chuàng)建、上下文切換和銷毀,使用并行循環(huán)反而影響執(zhí)行效率。
對于迭代操作簡單但迭代范圍很大的情況,我們可以對數(shù)據(jù)進行分區(qū),再執(zhí)行并行循環(huán),減少線程數(shù)量。
循環(huán)結(jié)果
Parallel.For和Parallel.ForEach方法的所有重載有著同樣的返回值類型ParallelLoopResult,并行循環(huán)結(jié)果包含循環(huán)是否完成以及最低迭代次數(shù)兩項信息。
下面的例子使用Parallel.ForEach展示了并行循環(huán)的結(jié)果。
ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>{
Console.WriteLine(i + 1);
Thread.Sleep(100);
if (i == 30)
{
loop.Break();
}
});
Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");
值得一提的是,循環(huán)的Break()和Stop()只能盡早地跳出或者停止循環(huán),而不能立即停止。
取消循環(huán)操作
有時候,我們需要在中途取消循環(huán)操作,但又不知道確切條件是什么,比如用戶觸發(fā)的取消。這時候,可以利用循環(huán)的ParallelOptions傳入一個CancellationToken,同時使用異常處理捕獲OperationCanceledException以進行取消后的處理。下面是一個簡單的例子。
public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();
public static void CancelParallelLoop()
{
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
i =>
{
Console.WriteLine(i + 1);
Thread.Sleep(1000);
});
}
catch (OperationCanceledException oce)
{
Console.WriteLine(oce.Message);
}
});
}static void Main(string[] args)
{
ParallelDemo.CancelParallelLoop();
Thread.Sleep(3000);
ParallelDemo.CTSource.Cancel();
Console.ReadKey();
}
循環(huán)異常收集
并行循環(huán)執(zhí)行過程中,可以捕獲并收集迭代操作引發(fā)的異常,循環(huán)結(jié)束時拋出一個AggregateException異常,并將收集到的異常賦給它的內(nèi)部異常集合InnerExceptions。外部使用時,捕獲AggregateException,即可進行并行循環(huán)的異常處理。
下面的例子模擬了并行循環(huán)的異常拋出、收集及處理的過程。
public static void CaptureTheLoopExceptions()
{
ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
Parallel.ForEach(Enumerable.Range(0, 100), i =>
{
try
{
if (i % 10 == 0)
{
throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
}
Console.WriteLine(i + 1);
Thread.Sleep(100);
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
});
if?(!exceptions.IsEmpty)
{
throw new AggregateException(exceptions);
}
}
外部處理方式
try{
ParallelDemo.CaptureTheLoopExceptions();
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
分區(qū)并行處理
當(dāng)循環(huán)操作很簡單,迭代范圍很大的時候,ParallelLoop提供一種分區(qū)的方式來優(yōu)化循環(huán)性能。下面的例子展示了分區(qū)循環(huán)的使用,同時也能比較幾種循環(huán)方式的執(zhí)行效率。
public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
{
Stopwatch watch0 = Stopwatch.StartNew();
Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
i =>
{
Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
Thread.Sleep(opDuration);
});
watch0.Stop();
Stopwatch watch1 = Stopwatch.StartNew();
Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
i =>
{
Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
Thread.Sleep(opDuration);
});
watch1.Stop();
Stopwatch watch2 = Stopwatch.StartNew();
Parallel.ForEach(Enumerable.Range(0, rangeSize),
i =>
{
Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
Thread.Sleep(opDuration);
});
watch2.Stop();
Stopwatch watch3 = Stopwatch.StartNew();
foreach (int i in Enumerable.Range(0, rangeSize))
{
Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
Thread.Sleep(opDuration);
}
watch2.Stop();
Console.WriteLine();
Console.WriteLine($"PartationParallelLoopWithBuffer => {watch0.ElapsedMilliseconds}ms");
Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
Console.WriteLine($"NormalParallelLoop => {watch2.ElapsedMilliseconds}ms");
Console.WriteLine($"NormalLoop => {watch3.ElapsedMilliseconds}ms");
}
在?I7-7700HQ?+?16GB?配置?VS調(diào)試模式下得到下面一組測試結(jié)果。
| 10000,1 | 10527 | 11799 | 11155 | 19434 |
| 10000,1 | 9513 | 11442 | 11048 | 19354 |
| 10000,1 | 9871 | 11391 | 14782 | 19154 |
| 100,1000 | 9107 | 5951 | 5081 | 100363 |
| 100,1000 | 9086 | 5974 | 5187 | 100162 |
| 100,1000 | 9208 | 5125 | 5255 | 100239 |
| 100,1 | 350 | 439 | 243 | 200 |
| 100,1 | 390 | 227 | 166 | 198 |
| 100,1 | 466 | 225 | 84 | 197 |
應(yīng)該根據(jù)不同的應(yīng)用場景選擇合適的循環(huán)策略,具體如何選擇,朋友們可自行體會~
原文地址:https://www.cnblogs.com/chenbaoshun/p/10572639.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結(jié)
以上是生活随笔為你收集整理的C#并行编程(3):并行循环的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 江湖召集:.NET开发者们看过来,这场长
- 下一篇: 浅谈C#在网络波动时防重复提交