C# 多线程六之Task(任务)三之任务工厂
1、知識(shí)回顧,簡(jiǎn)要概述
前面兩篇關(guān)于Task的隨筆,C# 多線程五之Task(任務(wù))一?和?C# 多線程六之Task(任務(wù))二,介紹了關(guān)于Task的一些基本的用法,以及一些使用的要點(diǎn),如果都看懂了,本文將介紹另一個(gè)Task的特殊用法,前面介紹了,如何通過一個(gè)父任務(wù)創(chuàng)建多個(gè)子任務(wù),且這些子任務(wù)都必須要支持取消的例子,常規(guī)做法是,通過new 一個(gè)Task數(shù)組對(duì)象,然后在該對(duì)象的內(nèi)部創(chuàng)建多個(gè)Task任務(wù),然后給這些任務(wù)指定TaskCreationOptions.AttachedToParent,這樣所有的子任務(wù)都關(guān)聯(lián)到了父任務(wù),接著給這些子任務(wù),綁定一個(gè)CancellationToken類實(shí)例,當(dāng)其中一個(gè)子任務(wù)發(fā)生異常時(shí),調(diào)用CancellationToken類實(shí)例的Cancel方法,將其余的子任務(wù)全都取消,大致代碼如下:
static void Main(string[] args){var parentTask = new Task<int[]>(() =>{var results = new int[3];var cancelTokenSource = new CancellationTokenSource();var childTasks = new Task[] {new Task(() => results[0] = ChildThreadOne(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),new Task(() => results[1] = ChildThreadTwo(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),new Task(() => results[2] = ChildThreadThree(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),};//開啟所有的子任務(wù)childTasks.ForEach(f => { f.Start(); });//如果有子任務(wù)發(fā)生異常,那么通過取消信號(hào)量終止所有的任務(wù)childTasks.ForEach(f =>{f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});return results;});parentTask.Start();parentTask.ContinueWith(x =>{Console.WriteLine("當(dāng)父任務(wù)執(zhí)行完畢時(shí),CLR會(huì)喚起一個(gè)新線程,將父任務(wù)的返回值(子任務(wù)的返回值)輸出,所以這里不會(huì)有任何的線程發(fā)生阻塞");foreach (var re in parentTask.Result){Console.WriteLine("子任務(wù)的返回值分別為:{0}", re);}});Console.WriteLine("主線程不會(huì)阻塞,它會(huì)繼續(xù)執(zhí)行");Console.ReadKey();//必須加這行代碼,因?yàn)門ask時(shí)線程池線程,屬于后臺(tái)線程 }/// <summary>/// 子任務(wù)一/// </summary>static int ChildThreadOne(CancellationToken token){Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)一完成了計(jì)算任務(wù),并返回值:{0}", 6);return 6;}/// <summary>/// 子任務(wù)二/// </summary>static int ChildThreadTwo(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();throw new Exception("模擬拋出異常");}/// <summary>/// 子任務(wù)三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)三完成了計(jì)算任務(wù),并返回值:{0}", 6);return 6;}}/// <summary>/// Linq擴(kuò)展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}這里需要注意,這里給父任務(wù)parentTask開啟了三個(gè)子任務(wù),并且通過TaskCreationOptions.AttachedToParent指定了所有的子任務(wù)不能獨(dú)立于父任務(wù)運(yùn)行,并且給所有的子任務(wù),傳遞了CancellationToken信號(hào)量,當(dāng)其中一個(gè)子任務(wù)發(fā)生異常時(shí),所有其余的子任務(wù)都終止,但是你必須知道的是,你沒有判斷哪個(gè)任務(wù)會(huì)被終止,因?yàn)槿绻恢付ň€程優(yōu)先級(jí),哪怕制定了優(yōu)先級(jí),你也無(wú)法確定的判斷某個(gè)計(jì)算任務(wù)在什么時(shí)候會(huì)調(diào)度完,所以我給正常的執(zhí)行的任務(wù),Sleep了三秒,拋出異常的任務(wù)Sleep了兩秒,所以所有的子線程都無(wú)法執(zhí)行完畢.
?
2、代碼重構(gòu)
?ok,雖然上面的代碼很好的完成了我們?cè)诖a層面的需求,但是處于對(duì)代碼的重用性考慮,有沒有發(fā)現(xiàn)這個(gè)問題:
這塊操作,可以重構(gòu)的,因?yàn)樗械膮?shù)都一樣,當(dāng)然你可以去抽象一個(gè)共有的方法,里面放一個(gè)Func委托,當(dāng)然把參數(shù)抽象出來(lái),形成一個(gè)公共的方法,像下面這樣做:
class Program{private static CancellationTokenSource cancelTokenSource = new CancellationTokenSource();private static TaskCreationOptions taskCreationOptions = TaskCreationOptions.AttachedToParent;static void Main(string[] args){var parentTask = new Task<int[]>(() =>{var results = new int[3];var childTasks = new Task[] {ExecuteChildThread(task=> results[0]=ChildThreadOne(cancelTokenSource.Token)),ExecuteChildThread(task=> results[1]=ChildThreadTwo(cancelTokenSource.Token)),ExecuteChildThread(task=> results[2]=ChildThreadThree(cancelTokenSource.Token))};//開啟所有的子任務(wù)childTasks.ForEach(f => { f.Start(); });//如果有子任務(wù)發(fā)生異常,那么通過取消信號(hào)量終止所有的任務(wù)childTasks.ForEach(f =>{f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});return results;});parentTask.Start();parentTask.ContinueWith(x =>{Console.WriteLine("當(dāng)父任務(wù)執(zhí)行完畢時(shí),CLR會(huì)喚起一個(gè)新線程,將父任務(wù)的返回值(子任務(wù)的返回值)輸出,所以這里不會(huì)有任何的線程發(fā)生阻塞");foreach (var re in parentTask.Result){Console.WriteLine("子任務(wù)的返回值分別為:{0}", re);}});Console.WriteLine("主線程不會(huì)阻塞,它會(huì)繼續(xù)執(zhí)行");Console.ReadKey();//必須加這行代碼,因?yàn)門ask時(shí)線程池線程,屬于后臺(tái)線程 }/// <summary>/// 子任務(wù)一/// </summary>static int ChildThreadOne(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)一完成了計(jì)算任務(wù),并返回值:{0}", 6);return 6;}/// <summary>/// 子任務(wù)二/// </summary>static int ChildThreadTwo(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)二完成了計(jì)算任務(wù),并返回值:{0}", 6);return 6;}/// <summary>/// 子任務(wù)三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)三完成了計(jì)算任務(wù),并返回值:{0}", 6);return 6;}/// <summary>/// 創(chuàng)建一個(gè)通用的子線程方法,里面封裝了所有子線程的需要設(shè)置的公共參數(shù)/// </summary>/// <param name="func"></param>/// <returns></returns>static Task<int> ExecuteChildThread(Func<CancellationToken, int> func){var t=new Task<int>(()=>func.Invoke(cancelTokenSource.Token), cancelTokenSource.Token, taskCreationOptions);return t;}}/// <summary>/// Linq擴(kuò)展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}ok,通過對(duì)子任務(wù)的抽象,你可以這么干,但是MS提供了更好的辦法,你又何必重復(fù)造輪子呢?而且這里存在著潛在的多線程爭(zhēng)用問題,
所有的線程都用到了這兩個(gè)全局變量,最好加個(gè)鎖,但是加了鎖之后,性能就會(huì)受到影響.
但是奇怪的是,我無(wú)法重現(xiàn),如果你能重現(xiàn)那是最好的,下面就開始介紹Ms提供的任務(wù)工廠
?
3、任務(wù)工廠實(shí)戰(zhàn)
下面再次對(duì)上面的方法進(jìn)行重構(gòu),用任務(wù)工廠的方式,首先使用TaskFactory任務(wù)工廠的前提你必須清楚,就是創(chuàng)建的子任務(wù),必須是一組共享配置的子任務(wù)對(duì)象集,所以,如果當(dāng)中如果某個(gè)子任務(wù)需要使用特殊的配置,那就不能使用任務(wù)工廠,也不是不能使用,就是那個(gè)子任務(wù)你必須獨(dú)立出來(lái),不能放到任務(wù)工廠里面.ok,了解了前提條件后,開始實(shí)踐,代碼如下:
class Program{static void Main(string[] args){var parentTask=Task.Run(()=> {var cts = new CancellationTokenSource();//通過TaskFactory設(shè)置子任務(wù)的公共參數(shù)var tf = new TaskFactory<int>(cts.Token,TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);//通過TaskFactory設(shè)置所有的子任務(wù),這些子任務(wù)共享上面公共參數(shù)var childTasks = new Task<int>[] {tf.StartNew(() => ChildThreadOne(cts.Token)),tf.StartNew(() => ChildThreadTwo(cts.Token)),tf.StartNew(() => ChildThreadThree(cts.Token))};//如果子任務(wù)發(fā)生異常,則向余下沒有執(zhí)行完畢的子任務(wù)傳遞取消執(zhí)行的信號(hào),如果有子任務(wù)執(zhí)行完畢了,那就沒有辦法了childTasks.ForEach(f =>{f.ContinueWith(childTask => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});//遍歷所有通過TaskFactory創(chuàng)建的子任務(wù),然后篩選出沒有被取消和沒有發(fā)生異常的子任務(wù),或者這些任務(wù)中的最大返回值//這個(gè)任務(wù)不阻塞線程,只有當(dāng)所有的子任務(wù)執(zhí)行完畢之后,CLR會(huì)喚起線程池中的一個(gè)新線程來(lái)執(zhí)行這個(gè)操作//通過給喚起子線程設(shè)置CancellationToken.None,來(lái)達(dá)到這個(gè)線程不會(huì)被任何因素來(lái)取消該線程的目的var tfTask = tf.ContinueWhenAll(childTasks,completedTasks => completedTasks.Where(completedTask => !completedTask.IsCanceled && !completedTask.IsFaulted).Max(completedTask => completedTask.Result), CancellationToken.None);//輸出所有符合要求的子任務(wù)集合的返回值集合中的最大值,并指定該任務(wù),在tfTask任務(wù)的基礎(chǔ)上同步執(zhí)行的效果通過TaskContinuationOptions.ExecuteSynchronouslytfTask.ContinueWith(childTasksCompleteTask =>{Console.WriteLine("The Max Return Value is {0}", childTasksCompleteTask.Result);},TaskContinuationOptions.ExecuteSynchronously);});Console.WriteLine("主線程繼續(xù)做它的事情");Console.ReadKey();//必須加這行代碼,因?yàn)門ask時(shí)線程池線程,屬于后臺(tái)線程 }/// <summary>/// 子任務(wù)一/// </summary>static int ChildThreadOne(CancellationToken token){var returnValue = 6;Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)一完成了計(jì)算任務(wù),并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務(wù)二/// </summary>static int ChildThreadTwo(CancellationToken token){var returnValue = 66;Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)二完成了計(jì)算任務(wù),并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務(wù)三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作throw new Exception("模擬拋出異常");}}/// <summary>/// Linq擴(kuò)展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}因?yàn)槲医o異常線程設(shè)置了2秒的休眠時(shí)間,正常子線程設(shè)置了3秒的休眠時(shí)間,所以所有的線程都沒有執(zhí)行完畢,就被取消掉了.如果修改下正常線程的休眠時(shí)間為1秒,將會(huì)得到以下的輸出:
so,TaskFactory完美的完成了它的任務(wù),且不會(huì)有任務(wù)線程發(fā)生阻塞的情況。
?
4、如何解決任務(wù)工廠拋出的異常
我發(fā)現(xiàn)一個(gè)很奇怪的問題,就是當(dāng)當(dāng)外部通過一個(gè)Task.Run創(chuàng)建的父任務(wù),無(wú)法獲取TaskFactory下子任務(wù)集群拋出的異常,代碼如下:
class Program{static void Main(string[] args){var pTask = Task.Run(() =>{var cts = new CancellationTokenSource();//通過TaskFactory設(shè)置子任務(wù)的公共參數(shù)var tf = new TaskFactory<int>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);//通過TaskFactory設(shè)置所有的子任務(wù),這些子任務(wù)共享上面公共參數(shù)var childTasks = new Task<int>[] {tf.StartNew(() => ChildThreadOne(cts.Token)),tf.StartNew(() => ChildThreadTwo(cts.Token)),tf.StartNew(() => ChildThreadThree(cts.Token))};});pTask.ContinueWith(tasks =>{var exceptions = tasks.Exception;foreach (var ex in exceptions.InnerExceptions){Console.WriteLine(ex);}},TaskContinuationOptions.OnlyOnFaulted);Console.WriteLine("主線程繼續(xù)做它的事情");Console.ReadKey();//必須加這行代碼,因?yàn)門ask時(shí)線程池線程,屬于后臺(tái)線程 }/// <summary>/// 子任務(wù)一/// </summary>static int ChildThreadOne(CancellationToken token){var returnValue = 6;Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)一完成了計(jì)算任務(wù),并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務(wù)二/// </summary>static int ChildThreadTwo(CancellationToken token){var returnValue = 66;Thread.Sleep(3000);//模擬長(zhǎng)時(shí)間計(jì)算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務(wù)二完成了計(jì)算任務(wù),并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務(wù)三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長(zhǎng)時(shí)間計(jì)算操作throw new Exception("模擬拋出異常");}}/// <summary>/// Linq擴(kuò)展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}很其怪,不過這說(shuō)明,外部的父任務(wù),無(wú)法和TaskFactory建立關(guān)聯(lián),如果你們能找到方法,歡迎在下面評(píng)論區(qū)評(píng)論,因?yàn)檫@個(gè)所以,要處理子任務(wù)拋出的異常.只能通過過濾異常子任務(wù),然后在子任務(wù)里單獨(dú)記錄日志的方式,去處理:
暫時(shí)沒有想到更好的辦法.
?
轉(zhuǎn)載于:https://www.cnblogs.com/GreenLeaves/p/10088635.html
總結(jié)
以上是生活随笔為你收集整理的C# 多线程六之Task(任务)三之任务工厂的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第四课:PHP 变量
- 下一篇: 进程初识和multiprocessing