【转】细说.NET中的多线程 (三 使用Task)
上一節(jié)我們介紹了線程池相關(guān)的概念以及用法。我們可以發(fā)現(xiàn)ThreadPool. QueueUserWorkItem是一種起了線程之后就不管了的做法。但是實(shí)際應(yīng)用過程,我們往往會有更多的需求,比如如何更簡單的知道線程池里面的某些線程什么時(shí)候結(jié)束,線程結(jié)束后如何執(zhí)行別的任務(wù)。Task可以說是ThreadPool的升級版,在線程任務(wù)調(diào)度,并行編程中都有很大的作用。
創(chuàng)建并且初始化Task
使用lambda表達(dá)式創(chuàng)建Task
| 1 2 3 4 | Task.Factory.StartNew(() => Console.WriteLine("Hello from a task!")); ? var?task =?new?Task(() => Console.Write("Hello")); task.Start(); |
用默認(rèn)參數(shù)的委托創(chuàng)建Task
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | using?System; using?System.Threading.Tasks; ? namespace?MultiThread { ????class?ThreadTest ????{ ????????static?void?Main() ????????{ ????????????var?task = Task.Factory.StartNew(state => Greet("Hello"),?"Greeting"); ????????????Console.WriteLine(task.AsyncState);???// Greeting ????????????task.Wait(); ????????} ? ????????static?void?Greet(string?message) { Console.Write(message); } ? ????} } |
這種方式的一個(gè)優(yōu)點(diǎn)是,task.AsyncState作為一個(gè)內(nèi)置的屬性,可以在不同線程中獲取參數(shù)的狀態(tài)。
System.Threading.Tasks.TaskCreateOptions
創(chuàng)建Task的時(shí)候,我們可以指定創(chuàng)建Task的一些相關(guān)選項(xiàng)。在.Net 4.0中,有如下選項(xiàng):
LongRunning
用來表示這個(gè)Task是長期運(yùn)行的,這個(gè)參數(shù)更適合block線程。LongRunning線程一般回收的周期會比較長,因此CLR可能不會把它放到線程池中進(jìn)行管理。
PreferFairness
表示讓Task盡量以公平的方式運(yùn)行,避免出現(xiàn)某些線程運(yùn)行過快或者過慢的情況。
AttachedToParent
表示創(chuàng)建的Task是當(dāng)前線程所在Task的子任務(wù)。這一個(gè)用途也很常見。
下面的代碼是創(chuàng)建子任務(wù)的示例:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | using?System; using?System.Threading; using?System.Threading.Tasks; ? namespace?MultiThread { ????class?ThreadTest ????{ ????????public?static?void?Main(string[] args) ????????{ ????????????Task parent = Task.Factory.StartNew(() => ????????????{ ????????????????Console.WriteLine("I am a parent"); ? ????????????????Task.Factory.StartNew(() =>????????// Detached task ????????????????{ ????????????????????Console.WriteLine("I am detached"); ????????????????}); ? ????????????????Task.Factory.StartNew(() =>????????// Child task ????????????????{ ????????????????????Console.WriteLine("I am a child"); ????????????????}, TaskCreationOptions.AttachedToParent); ????????????}); ? ????????????parent.Wait(); ? ????????????Console.ReadLine(); ????????} ? ????} } |
如果你等待你一個(gè)任務(wù)結(jié)束,你必須同時(shí)等待任務(wù)里面的子任務(wù)結(jié)束。這一點(diǎn)很重要,尤其是你在使用Continue的時(shí)候。(后面會介紹)
等待Task
在ThreadPool內(nèi)置的方法中無法實(shí)現(xiàn)的等待,在Task中可以很簡單的實(shí)現(xiàn)了:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | using?System; using?System.Threading; using?System.Threading.Tasks; ? namespace?MultiThread { ????class?ThreadTest ????{ ????????static?void?Main() ????????{ ????????????var?t1 = Task.Run(() => Go(null)); ????????????var?t2 = Task.Run(() => Go(123)); ????????????Task.WaitAll(t1, t2);//等待所有Task結(jié)束 ????????????//Task.WaitAny(t1, t2);//等待任意Task結(jié)束 ????????} ? ????????static?void?Go(object?data)???// data will be null with the first call. ????????{ ????????????Thread.Sleep(5000); ????????????Console.WriteLine("Hello from the thread pool! "?+ data); ????????} ????} } |
注意:
當(dāng)你調(diào)用一個(gè)Wait方法時(shí),當(dāng)前的線程會被阻塞,直到Task返回。但是如果Task還沒有被執(zhí)行,這個(gè)時(shí)候系統(tǒng)可能會用當(dāng)前的線程來執(zhí)行調(diào)用Task,而不是新建一個(gè),這樣就不需要重新創(chuàng)建一個(gè)線程,并且阻塞當(dāng)前線程。這種做法節(jié)省了創(chuàng)建新線程的開銷,也避免了一些線程的切換。但是也有缺點(diǎn),當(dāng)前線程如果和被調(diào)用的Task同時(shí)想要獲得一個(gè)lock,就會導(dǎo)致死鎖。
Task異常處理
當(dāng)?shù)却粋€(gè)Task完成的時(shí)候(調(diào)用Wait或者或者訪問Result屬性的時(shí)候),Task任務(wù)中沒有處理的異常會被封裝成AggregateException重新拋出,InnerExceptions屬性封裝了各個(gè)Task沒有處理的異常。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | using?System; using?System.Threading.Tasks; ? namespace?MultiThreadTest { ????class?Program ????{ ????????static?void?Main(string[] args) ????????{ ????????????int?x = 0; ????????????Task<int> calc = Task.Factory.StartNew(() => 7 / x); ????????????try ????????????{ ????????????????Console.WriteLine(calc.Result); ????????????} ????????????catch?(AggregateException aex) ????????????{ ????????????????Console.Write(aex.InnerException.Message);??// Attempted to divide by 0 ????????????} ????????} ????} } |
對于有父子關(guān)系的Task,子任務(wù)未處理的異常會逐層傳遞到父Task,并且最后包裝在AggregateException中。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | using?System; using?System.Threading.Tasks; ? namespace?MultiThreadTest { ????class?Program ????{ ????????static?void?Main(string[] args) ????????{ ????????????TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; ????????????var?parent = Task.Factory.StartNew(() => ????????????{ ????????????????Task.Factory.StartNew(() =>???// Child ????????????????{ ????????????????????Task.Factory.StartNew(() => {?throw?null; }, atp);???// Grandchild ????????????????}, atp); ????????????}); ? ????????????// The following call throws a NullReferenceException (wrapped ????????????// in nested AggregateExceptions): ????????????parent.Wait(); ????????} ????} } |
取消Task
如果想要支持取消任務(wù),那么在創(chuàng)建Task的時(shí)候,需要傳入一個(gè)CancellationTokenSouce
示例代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | using?System; using?System.Threading; using?System.Threading.Tasks; ? namespace?MultiThreadTest { ????class?Program ????{ ????????static?void?Main(string[] args) ????????{ ????????????var?cancelSource =?new?CancellationTokenSource(); ????????????CancellationToken token = cancelSource.Token; ? ????????????Task task = Task.Factory.StartNew(() => ????????????{ ????????????????// Do some stuff... ????????????????token.ThrowIfCancellationRequested();??// Check for cancellation request ????????????????// Do some stuff... ????????????}, token); ????????????cancelSource.Cancel(); ? ????????????try ????????????{ ????????????????task.Wait(); ????????????} ????????????catch?(AggregateException ex) ????????????{ ????????????????if?(ex.InnerException?is?OperationCanceledException) ????????????????????Console.Write("Task canceled!"); ????????????} ? ????????????Console.ReadLine(); ????????} ????} } |
任務(wù)的連續(xù)執(zhí)行
Continuations
任務(wù)調(diào)度也是常見的需求,Task支持一個(gè)任務(wù)結(jié)束之后執(zhí)行另一個(gè)任務(wù)。
| 1 2 | Task task1 = Task.Factory.StartNew(() => Console.Write("antecedant..")); Task task2 = task1.ContinueWith(task =>Console.Write("..continuation")); |
Continuations 和Task<TResult>
Task也有帶返回值的重載,示例代碼如下:
| 1 2 3 4 | Task.Factory.StartNew<int>(() => 8) ????.ContinueWith(ant => ant.Result * 2) ????.ContinueWith(ant => Math.Sqrt(ant.Result)) ????.ContinueWith(ant => Console.WriteLine(ant.Result));???// output 4 |
子任務(wù)
前面提到了,當(dāng)你等待一個(gè)任務(wù)的時(shí)候,同時(shí)需要等待它的子任務(wù)完成。
下面代碼演示了帶子任務(wù)的Task:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | using?System; using?System.Threading.Tasks; using?System.Threading; ? namespace?MultiThreadTest { ????class?Program ????{ ????????public?static?void?Main(string[] args) ????????{ ????????????Task<int[]> parentTask = Task.Factory.StartNew(() => ????????????{ ????????????????int[] results =?new?int[3]; ? ????????????????Task t1 =?new?Task(() => { Thread.Sleep(3000); results[0] = 0; }, TaskCreationOptions.AttachedToParent); ????????????????Task t2 =?new?Task(() => { Thread.Sleep(3000); results[1] = 1; }, TaskCreationOptions.AttachedToParent); ????????????????Task t3 =?new?Task(() => { Thread.Sleep(3000); results[2] = 2; }, TaskCreationOptions.AttachedToParent); ? ????????????????t1.Start(); ????????????????t2.Start(); ????????????????t3.Start(); ? ????????????????return?results; ????????????}); ? ????????????Task finalTask = parentTask.ContinueWith(parent => ????????????{ ????????????????foreach?(int?result?in?parent.Result) ????????????????{ ????????????????????Console.WriteLine(result); ????????????????} ????????????}); ? ????????????finalTask.Wait(); ????????????Console.ReadLine(); ????????} ????} } |
這段代碼的輸出結(jié)果是: 1,2,3
FinalTask會等待所有子Task結(jié)束后再執(zhí)行。
TaskFactory
關(guān)于TaskFactory,上面的例子中我們使用了System.Threading.Tasks .Task.Factory屬性來快速的創(chuàng)建Task。當(dāng)然你也可以自己創(chuàng)建TaskFactory,你可以指定自己的TaskCreationOptions,TaskContinuationOptions來使得通過你的Factory創(chuàng)建的Task默認(rèn)行為不同。
.Net中有一些默認(rèn)的創(chuàng)建Task的方式,由于TaskFactory創(chuàng)建Task的默認(rèn)行為不同可能會導(dǎo)致一些不容易發(fā)現(xiàn)的問題。
如在.NET 4.5中,Task加入了一個(gè)Run的靜態(tài)方法:
Task.Run(someAction);
如果你用這個(gè)方法代替上面例子中的Task.Factory.StartNew,就無法得到正確的結(jié)果。原因是Task.Run創(chuàng)建Task的行為默認(rèn)是默認(rèn)是拒絕添加子任務(wù)的。上面的代碼等價(jià)于:
????Task.Factory.StartNew(someAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
你也可以創(chuàng)建具有自己默認(rèn)行為的TaskFactory。
?
無論ThreadPool也好,或者Task,微軟都是在想進(jìn)辦法來實(shí)現(xiàn)線程的重用,來節(jié)省不停的創(chuàng)建銷毀線程帶來的開銷。線程池內(nèi)部的實(shí)現(xiàn)可能在不同版本中有不同的機(jī)制。如果可能的話,使用線程池來管理線程仍然是建議的選擇。
?
我們主要介紹了一下Task的基本用法,在我們編程過程中,有一些使用Task來提升程序性能的場景往往是很相似的,微軟為了簡化編程,在System.Threading.Tasks.Parallel中封裝了一系列的并行類,內(nèi)部也是通過Task來實(shí)現(xiàn)的。
Parallel的For,Foreach,Invoke 方法
在編程過程中,我們經(jīng)常會用到循環(huán)語句:
?
| 1 2 3 4 | for?(int?i = 0; i < 10; i++) { ????DoSomeWork(i); } |
如果循環(huán)過程中的工作可以是并行的話,那么我們可以用如下語句:
?
| 1 | Parallel.For(0, 10, i => DoSomeWork(i)); |
我們也經(jīng)常會使用Foreach來遍歷某個(gè)集合:
?
| 1 2 3 4 | foreach?(var?item?in?collection) { ????DoSomeWork(item); } |
如果我們用一個(gè)線程池來執(zhí)行里面的任務(wù),那么我們可以寫成:
?
| 1 | Parallel.ForEach(collection, item => DoSomeWork(item)); |
最后,如果你想并行的執(zhí)行幾個(gè)不同的方法,你可以:
?
| 1 | Parallel.Invoke(Method1, Method2, Method3); |
如果你看下后臺的實(shí)現(xiàn),你會發(fā)現(xiàn)基本都是基于Task的線程池,當(dāng)然你也可以通過手動(dòng)創(chuàng)建一個(gè)Task集合,然后等待所有的任務(wù)結(jié)束來實(shí)現(xiàn)同樣的功能。上面的Parallel.For和Parallel.Forach方法并不意味著你可以尋找你代碼里面所有用到For和Foreach方法,并且替代他們,因?yàn)槊恳粋€(gè)任務(wù)都會分配一個(gè)委托,并且在線程池里執(zhí)行,如果委托里面的任務(wù)是線程不安全的,你可能還需要lock來保證線程安全,使用lock本身就會造成性能上的損耗。如果每一個(gè)任務(wù)都是需要長時(shí)間執(zhí)行并且線程安全的,Parallel會給你帶來不錯(cuò)的性能提升。對于短任務(wù),或者線程不安全的任務(wù),你需要權(quán)衡下,你是否真的需要使用Parallel。
作者:獨(dú)上高樓
出處:http://www.cnblogs.com/myprogram/
本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。
總結(jié)
以上是生活随笔為你收集整理的【转】细说.NET中的多线程 (三 使用Task)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网店推好评返现卡被罚1万元!你会因为红包
- 下一篇: 【转】一个ASP.NET MVC中aja