什麼是計算限制的異步操作,當線程在要使用CPU進行計算的時候,那麼就叫計算限制。
而對應的IO限制就是線程交給IO設備(鍵鼠,網絡,文件等)。
第25章線程基礎講了用專用的線程進行計算限制的操作,但是創建專用線程開銷大,而且太多的線程也浪費內存資源,那麼本章就討論一種更好的方法,即線程池技術。
CLR線程池
CLR包含了代碼來管理它自己的線程池。線程池是應用程序能使用的線程集合,每個CLR一個線程池,這個線程池由CLR上所有的AppDomain共享。
CLR初始化時線程池中沒有線程。
在線程池內部維護著一個操作請求隊列,應用程序執行異步操作時,就調用某方法將一個記錄項追加到線程池隊列中。
線程池的代碼從這個隊列中提取記錄項,將這個記錄項派發給一個線程池線程,如果線程池沒有線程,就創建一個新線程。
當線程池線程完成任務後,線程不會被銷毀。相反,線程會返回線程池,在那裡進入空閒狀態,等待響應另一個請求。、
由於線程池不銷毀自身,並且再次做異步操作不用創建新的線程,所以不再產生額外的性能損失。
如果應用程序向線程池發送了很多記錄項到線程池隊列,線程池最開始會嘗試只用一個線程服務所有記錄項,然而如果添加記錄項的速度超過了線程池線程處理記錄項的速度,就會創建額外的線程。
如果不再往線程池中發送請求,池中存在大量什麼都不做的線程。那麼這些閒置的線程池線程會在一段時間後自己醒來終止自己並釋放資源。
寫到這裡就應該很清楚了,如果垃圾回收器是幫我們自動回收垃圾,那麼線程池技術就是幫我們自動管理線程。
用線程池技術執行簡單的計算限制操作
不多做解釋,直接上代碼更易懂,可對比上一章的用專用線程來進行計算限制的異步操作的代碼,這樣更易於理解:
static void Main(string[] args) { ThreadPool.QueueUserWorkItem(線程回調函數, "hello"); Console.WriteLine("記錄項進入線程池隊列"); Console.Read(); } private static void 線程回調函數(Object 狀態參數) { Thread.Sleep(10000); if (狀態參數.GetType() == typeof(string)) { Console.WriteLine("這是一個字符串"); } else { Console.WriteLine("未識別"); } }
執行上下文
每個線程都關聯一個執行上下文數據結構。
執行上下文包括的東西有
安全設置(壓縮站、Thread的Principal屬性和Windows身份),
宿主設置(參見System.Threading.HostExecutionContextManager)
以及邏輯調用上下文數據(參見Sysyem.Runtime.Remoting.Messaging.CallContext的LogicalSetData和LogicalGetData方法)。
線程執行代碼時,一些操作可能會用到執行上下文結構。
而每當一個線程使用另一個線程執行任務時,前者的執行上下文會復制給後者的執行上下文。這樣就確保了執行任何操作都使用相同的安全設置和宿主設置。還確保了在線程中邏輯調用上下文中存儲的任何數據都適用於被使用的另一個線程。
默認情況下,CLR自動造成初始線程的執行上下文復制到任何輔助線程。
這會造成性能影響,因為收集上下文信息並復制到輔助線程,花費不少時間,如果輔助線程中還有輔助線程,那麼開銷更大。
System.Threading命名空間有一個ExecutionContext類,也就是執行上下文類,它允許你控制線程的執行上下文是否復制到另一個線程。
常用的方法有三個,SuppressFlow(取消復制執行上下文),RestoreFlow(恢復復制執行上下文),IsFlowSuppressed(是否上下文復制被取消).
上代碼,更簡單:
static void Main(string[] args) { CallContext.LogicalSetData("操作", "將一個鍵值對放入執行上下文中"); ThreadPool.QueueUserWorkItem( state=>Console.WriteLine("第一次"+CallContext.LogicalGetData("操作")) ); ExecutionContext.SuppressFlow();//取消執行上下文在異步線程間的復制 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("第二次" + CallContext.LogicalGetData("操作")) ); ExecutionContext.RestoreFlow();//恢復執行上下文在異步線程間的復制 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("第三次" + CallContext.LogicalGetData("操作")) ); Console.Read(); }
代碼運行結果如下:
因為是異步操作,所以執行順序不同,但是我們這裡僅僅關注執行結果就行了,第二次確實沒有將執行上下文復制到另一個線程中。
另外這裡不僅僅是指線程池,專用線程也是一樣的。
協作式取消和超時
.NET提供了標准的協作式取消操作模式,意味著要取消的操作必須顯示支持取消。
也就是說無論執行操作的代碼,還是取消操作的代碼,都必須使用本節提到的類型。
取消操作首先要創建一個CancellationTokenSource對象。
這個對象包含了和管理取消有關的所有狀態。可從此對象的Token屬性獲取一個或多個CancellationToken實例,並傳給操作,使操作可以取消。
而CancellationToken是輕量級的值類型,包含單個私有字段即對其CancellationTokenSource對象的引用。
在計算限制操作的循環中,可以定時調用CancellationToken的IsCancellationRequested屬性,了解循環是否應該提前終止,從而終止計算限制的操作。
以下為演示代碼:
static void Main(string[] args) { var cts = new CancellationTokenSource(); ThreadPool.QueueUserWorkItem(state => Farm(cts.Token, 850));//Farm一個治療指環 Console.WriteLine("按回車取消Farm"); Console.ReadLine(); cts.Cancel();// 取消Farm操作 Console.Read(); } //Farm指定數量的錢就返回 private static void Farm(CancellationToken token,int money) { var currentMoney = 0; while (currentMoney < money) { if (token.IsCancellationRequested) { Console.WriteLine("確定取消Farm"); break; } currentMoney += 50; Console.WriteLine("Troy已經Farm了" + currentMoney + "金"); Thread.Sleep(1000);//一秒鐘補一個兵 } }
上效果圖:
而如果要Farm操作不允許被取消,可以傳CancellationToken.None。
可通過調用CancellationToken的Register方法登記一個或多個在取消操作時調用的函數。、
可通過CancellationTokenSource的CreateLinkedTokenSource函數鏈接其他CancellationTokenSource對象來創建一個新的對象A,如果任意一個被鏈接的對象取消,那麼A也會被取消。
可通過傳給CancellationTokenSource的構造器延時變量,表示在指定的一段時間後CancellationTokenSource自動取消。
任務
ThreadPool的QueueUserWorkItem方法發起一次異步的計算限制操作,然而並沒有機制讓我們知道什麼時候這個操作完成,也沒有機制在操作完成時獲取返回值。
為了克服這個限制並解決其它一些問題,微軟引入了任務的概念。(通過System.Threading.Task命名空間中的類型來使用任務)
以下代碼為線程池玩法和任務玩法的對比
ThreadPool.QueueUserWorkItem(線程回調函數, "hello");//線程池玩法 new Task(線程回調函數, "hello").Start();//任務的玩法1 Task.Run(() => 線程回調函數("hello"));//任務的玩法2
在構造Task對象的時候,還可以傳遞CancellationToken,用於任務取消,也可以傳遞TaskCreationOptions標志來控制Task的執行方式。
接下來就寫段代碼,看看人物是如何等待任務完成並獲取結果的
static void Main(string[] args) { Task<Tuple<Boolean, String>> myTask = new Task<Tuple<bool, string>>(賞金任務, 100); myTask.Start(); Thread.Sleep(10000); Console.WriteLine("任務進行中"); myTask.Wait();//顯示等待任務結束 Console.WriteLine("任務結果為:" + myTask.Result.Item2); Console.ReadLine(); } private static Tuple<Boolean, String> 賞金任務(object state) { Console.WriteLine("Troy接手了這個賞金任務,並獲取了{0}金",state.ToString()); return new Tuple<bool, string>(true, "成功"); }
Tuple<Boolean, string>為任務返回的結果類型,給Task的泛型變量就應該和所調用函數的返回值一樣。
結果如下:
從這個結果我們了解到任務確實是異步執行了,並且確實返回了正確的結果給myTask.Result。
當調用Wait()函數時當前線程會阻塞,直到任務結束。(如果沒用start,直接wait,那麼任務也會執行。只不過此時線程不會被阻塞,它會直接執行任務並立即返回)
除了等待單個任務,實際上Task還提供了WaitAny和WaitAll兩個靜態方法來阻塞線程,等待一個Task數組,直到數組中的所有Task完成。
取消任務
前面講到任務也可以傳CancellationToken,用於取消任務。
在其它的地方一樣,不過在判斷任務是否取消的地方,應該用CancellationToken對象的ThrowIfCancellationRequested()方法而不是用IsCancellationRequested進行判斷。
原因是不像線程池的QueueUserWorkItem,任務有辦法表示完成,也可以返回一個值,所以需要采用一種方式將已完成的任務和出錯的任務區分開。
而讓任務拋出異常,就可以知道任務沒有一直運行到結束。
上代碼:
static void Main(string[] args) { var cts = new CancellationTokenSource(); Task<Tuple<Boolean, String>> myTask = Task.Run(()=>賞金任務(cts.Token,100), cts.Token ); Thread.Sleep(5000); cts.Cancel(); try { Console.WriteLine("任務結果為:" + myTask.Result.Item2); } catch (AggregateException ex) { //將任何OperationCanceledException對象都視為已處理 //其他任何異常都造成拋出一個新的AggregateException //其中只包含未處理的異常 ex.Handle(e => e is OperationCanceledException);//對異常集合的每個異常都調用處理程序 Console.WriteLine("取消任務"); } catch { Console.WriteLine("未知異常"); } Console.Read(); } private static Tuple<Boolean, String> 賞金任務(CancellationToken ct,object state) { for (int i = 0; i < 100; i++) { ct.ThrowIfCancellationRequested(); Console.WriteLine("Troy接手了這個賞金任務,並獲取了{0}金", state.ToString()); Thread.Sleep(1000); } return new Tuple<Boolean, String>(true, "成功"); }
上結果:
任務完成時自動啟動新任務
伸縮性好的軟件不應該使用線程阻塞。
調用Wait或者在任務尚未完成時查詢任務的Result屬性,極有可能造成線程池創建新的線程。
以下方法可以知道任務在什麼時候結束,且不使用阻塞。
Task<Tuple<Boolean, String>> myTask = Task.Run(()=>賞金任務(cts.Token,100), cts.Token );//創建並啟動一個任務 Task myTask1 = myTask.ContinueWith(task => Console.WriteLine("任務結果為:" + task.Result.Item2));
且還可以傳給它TaskContinuationOptions位標志來控制繼續的任務。默認情況下,不指定任何TaskContinuationOptions位標志,那麼無論第一個任務取消還是失敗,都會繼續執行第二個任務。
任務調用的函數中創建的任務,被稱為子任務,有一些關於父任務和子任務的處理,用TaskContinuationOptions或之前介紹的TaskCreationOptions來控制。
實際上Task對象內部有一個ContinueWith任務的集合,也就是說一個Task可以多次ContinueWith,這個Task在任務完成後會執行所有的ContinueWith任務。
任務的內部揭秘
每個Task對象內部都有一組字段,這些字段構成了任務的狀態。
任務雖然很有用,但是它也是有代價的。必須為所有這些狀態分配內存,如果不需要任務的附加功能(也就是知道何時結束且可以返回值),那麼使用ThreadPool的QueueUserWorkItem能獲得更好的資源利用率。
Task對象的只讀屬性Status返回一個TaskStatus枚舉值,該枚舉值表明了任務正處於一個怎樣的狀態。
當任務創建後,狀態為Created,啟動後為WatingToRun,實際在一個線程中運行後為Running,停止運行並等待任何子任務時為WaitingForChildrenToComplete。
完成時進入一下狀態:RanToCompletion(完成),Canceled(取消),Faulted(出錯)。
如果任務出錯,可查詢任務的Exception屬性獲取任務拋出的未處理異常。其總是返回一個AggregateException對象,其InnerExceptions集合包含了所有未處理異常。
調用ContinueWith,ContinueWhenAll,ContinueWhenAny或FromAsync等方法創建的Task對象處於WatingForActivation狀態。該狀態表示任務隱式創建,並會自動開始。
任務工廠
有時需要創建一組共享相同配置的Task對象。為避免機械地將相同的參數傳遞給每個Task的構造器,可創建一個任務工廠來封裝通用配置。
而TaskFactory類型就是這個目的。
創工廠類時,要向構造器傳遞所有要創建的任務都具有的默認值,也就是CancellationToken,TaskScheduler,TaskCreationOption和TaskContinuationOptions。
來個簡單演示:
var tf = new TaskFactory<Int32>( cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default ); //用任務工廠創建三個任務 var childTasks = new[] { tf.StartNew(()=> { Console.WriteLine("任務1");return 1; }), tf.StartNew(()=> { Console.WriteLine("任務2");return 2; }), tf.StartNew(()=> { Console.WriteLine("任務3");return 3; }) }; tf.ContinueWhenAll(childTasks, completedTask => completedTask .Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result), CancellationToken.None) .ContinueWith(t => Console.WriteLine("最後的任務返回結果為" + t.Result), TaskContinuationOptions.ExecuteSynchronously); Console.Read();
這只是最基礎的用法,取消的時候只需要傳給他一個Token,那麼一旦取消,整個task數組中的任務都會取消。
任務調度器
任務基礎結構非常靈活,其中TaskScheduler對象功不可沒。
此對象負責執行被調度的任務。FCL提供了兩個派生自TaskScheduler的類型:線程池任務調度器和同步上下文調度器。默認情況下所有應用程序使用的都是線程池任務調度器。
同步上下文任務調度器適合提供了圖形用戶界面的應用程序。它將所有任務都調度給程序的GUI線程,使所有任務代碼都能成功更新UI組件。該調度不使用線程池。
可執行TaskScheduler的靜態FromCurrentSynchronizationContext()方法來獲取對同步上下文任務調度器的引用。
這個玩法貌似我用不到,做web的啊,而且看了例子很簡單,所以這裡就不寫了。
Parallel的靜態For,ForEach和Invoke方法
Parallel就是並行的意思。
主要是用於將一些常見的for或者foreach循環用任務進行多線程化,以提升性能。
System.Threading.Tasks.Parallel類封裝了這些情形,例如下面的代碼:
for (int i = 0; i < 1000; i++) DoSomething(i);//for循環做某事 Parallel.For(0, 1000, i => DoSomething(i));//Parallel替代方案,線程池並行處理工作 foreach (var item in collection) DoSomething(item);//foreach循環做某事 Parallel.ForEach(collection, l => DoSomething(l));//Parallel替代方案,線程池並行處理工作 //如果可以用For而不是ForEach,那麼就用For,因為更快 //順序執行所有方法 Method1(); Method2(); Method3(); //Parallel替代方案,順序執行所有方法 Parallel.Invoke(() => Method1(), () => Method2(), () => Method3());
如果調用線程在線程池執行完任務之前執行完了自己的那部分工作,那麼調用線程會掛起,等待任務完成。
然而調用Parallel的方法時,請注意所做的工作一定要能並行執行,如果必須要順序執行,那麼還是用原來的for循環比較好。
如果有大量的工作項(也就是循環次數很多),或者是每次循環做的事情涉及大量工作,那麼用Parallel性能會得到很大提升,反之,性能可能得不償失。
Parallel的方法都可以接收一個ParallelOptions對象,這個對象可以對Parallel的工作方式做一些配置。
還可以傳遞一個ParallelloopState對象來控制循環任務的執行。
此對象的Stop方法,讓循環停止,Break讓循環不再處理後面的工作。
並行語言集成查詢(PLINQ)
LINQ提供了一簡捷的語法來查詢數據集合。然而其只能一個線程順序處理數據集合中的所有項。這就是順序查詢。
而要提高性能,可以使用PLINQ,也就是並行LINQ。它將順序查詢轉換為並行查詢。
靜態System.Linq.ParallelEnumerable類(在System.Core.dll中定義)實現了PLINQ的所有功能,所以必須通過C#的using指令將System.Linq命名空間導入源代碼。
而所有的Where,Select之類方法的並行版本,都是System.Linq.ParallelQuery<T>類型的擴展方法。
下面舉個簡單的例子:
List<string> nameList = new List<string>() { "Troy", "小二", "小三", "小四" }; var query = from name in nameList.AsParallel()//啟用查詢的並行化,將其轉換為ParallelQuery<string> let myName = "我叫" + name where name == "Troy" select myName; Parallel.ForEach(query, l => Console.WriteLine(l)); //query.ForAll(l => Console.WriteLine(l));//也可以用這行代碼替代上一句,ParallelQuery有個ForAll方法,為每個查詢的結果執行內容 Console.Read();
以上例子只是為了演示玩法,並不考慮效率。
通過上面的例子其實可以發現,PLINQ和LINQ沒有任何區別,只要將集合調用AsParallel()即可。
而如果要講並行查詢再轉換為並行查詢,那麼可以用AsSequential()。
上面的這個例子用順序查詢實際上快得多。而且Console內部會對線程同步,確保每次只有一個線程來訪問控制台窗口,所以這裡用並行操作實際上還會損壞性能。
用PLINQ因為是並行處理數據,所以返回的都是無序結構,如果要保持順序,那麼應該調用AsOrdered方法,調用後會成組處理數據,然後組合並後保持順序,可以想象這也會損耗性能。
而且以下操作符也會聲稱不排序的操作:Distinct,Except,Intersect,Union,Join,GroupBy,GroupJoin和ToLookup.如果這些操作後還要排序,那麼又要調用AsOrdered方法。
同事PLINQ提供了一些額外的方法:
WithCancellation(允許取消),
WithDegreeOfParallelism(指定最多的線程數),
WithExecutionMode(傳遞ParallelExecutionMode標志),
WithMergeOptions(PLINQ讓多個線程處理數據後會合並,所以可傳參ParallelMergeOptions位標志,控制結果的緩沖和合並方式。有緩沖傾向於加快速度,無緩沖傾向於節約內存)。
執行定時計算限制操作
System.Threading命名空間有一個Timer類,可執行定時操作。
在內部,線程池為所有Timer對象都只使用一個線程,此線程知道下一個Timer對象在什麼時候到期。到期後,線程會被喚醒,在內部調用線程池的QueueUserWorkItem,將工作項加入線程池隊列。
這個點就不多講了,很常見。(垃圾回收那一章講過如果Timer對象在代碼上看起來沒被使用會導致被回收,所以要有變量保持Timer對象存活)
如果要定時執行某操作,可以使用Task的靜態Delay方法和C#的async和await關鍵字。(下一章會講到,這裡只給一個簡單例子)
static void Main(string[] args) { asyncDoSomething(); Console.Read(); } private static async void asyncDoSomething() { while (true) { Console.WriteLine("time is {0}", DateTime.Now); //不阻塞線程的前提下延遲兩秒 await Task.Delay(2000);//await允許線程返回 //2秒後某個線程會在await後介入並繼續循環 } }
線程池如何管理線程
CLR允許開發人員設置線程池要創建的最大線程數。(然而如果設定了這個值,那麼就可能發生饑餓和死鎖)。
默認的最大線程數目前為1000個左右。
可通過Threadpool類的一些靜態方法如GetMaxThreads,SetMinThreads來限制線程池的線程數,然而作者並不建議這麼做。
Threadpool.QueueUserWorkItem方法和Timer類總是將工作項放在一個線程池全局隊列中的(用的先入先出模式),所以多個工作者線程可能同時從這個隊列中取工作項。為了保證多個工作者線程不會取到一個工作項,所以實際上所有工作者線程都競爭同一個線程同步鎖。
而對於任務,非工作者線程調用一個任務時(用非默認的TaskScheduler任務調度器),任務被放進全局隊列。
而工作者線程調度Task時,都有自己的本地隊列。工作者線程准備處理工作項時,先檢查本地隊列,由於工作者線程是唯一允許訪問自身的本地隊列,所以這裡不需要線程同步鎖。(所以在本地隊列刪除和增加Task的速度很快,本地隊列的處理用的是後入先出模式)
如果某個工作者線程的本地隊列空了,那麼它會從其它隊列找工作項去執行,並要求獲取一個線程同步鎖。
如果所有工作者線程的本地隊列都空了,那麼這個時候才檢查全局隊列。
如果全局隊列也空了,那麼工作者線程會進入睡眠,等待事情發生。
如果睡眠事件太長,會自動喚醒,並銷毀自身。