程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> .net線程池內幕,.net線程內幕

.net線程池內幕,.net線程內幕

編輯:C#入門知識

.net線程池內幕,.net線程內幕


本文通過對.NET4.5的ThreadPool源碼的分析講解揭示.NET線程池的內幕,並總結ThreadPool設計的好與不足。

線程池的作用
線程池,顧名思義,線程對象池。Task和TPL都有用到線程池,所以了解線程池的內幕有助於你寫出更好的程序。由於篇幅有限,在這裡我只講解以下核心概念:

  • 線程池的大小
  • 如何調用線程池添加任務
  • 線程池如何執行任務

Threadpool也支持操控IOCP的線程,但在這裡我們不研究它,涉及到task和TPL的會在其各自的博客中做詳解。

線程池的大小
不管什麼池,總有尺寸,ThreadPool也不例外。ThreadPool提供了4個方法來調整線程池的大小:

  • SetMaxThreads
  • GetMaxThreads
  • SetMinThreads
  • GetMinThreads

SetMaxThreads指定線程池最多可以有多少個線程,而GetMaxThreads自然就是獲取這個值。SetMinThreads指定線程池中最少存活的線程的數量,而GetMinThreads就是獲取這個值。
為何要設置一個最大數量和有一個最小數量呢?原來線程池的大小取決於若干因素,如虛擬地址空間的大小等。比如你的計算機是4g內存,而一個線程的初始堆棧大小為1m,那麼你最多能創建4g/1m的線程(忽略操作系統本身以及其他進程內存分配);正因為線程有內存開銷,所以如果線程池的線程過多而又沒有被完全使用,那麼這就是對內存的一種浪費,所以限制線程池的最大數是很make sense的。
那麼最小數又是為啥?線程池就是線程的對象池,對象池的最大的用處是重用對象。為啥要重用線程,因為線程的創建與銷毀都要占用大量的cpu時間。所以在高並發狀態下,線程池由於無需創建銷毀線程節約了大量時間,提高了系統的響應能力和吞吐量。最小數可以讓你調整最小的存活線程數量來應對不同的高並發場景。

如何調用線程池添加任務
線程池主要提供了2個方法來調用:QueueUserWorkItem和UnsafeQueueUserWorkItem。
兩個方法的代碼基本一致,除了attribute不同,QueueUserWorkItem可以被partial trust的代碼調用,而UnsafeQueueUserWorkItem只能被full trust的代碼調用。

1 public static bool QueueUserWorkItem(WaitCallback callBack)
2 {
3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
4 return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true);
5 }

QueueUserWorkItemHelper首先調用ThreadPool.EnsureVMInitialized()來確保CLR虛擬機初始化(VM是一個統稱,不是單指java虛擬機,也可以指CLR的execution engine),緊接著實例化ThreadPoolWorkQueue,最後調用ThreadPoolWorkQueue的Enqueue方法並傳入callback和true。

 1 [SecurityCritical]
 2 public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
 3 {
 4 ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null;
 5 if (!forceGlobal)
 6 queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals;
 7 if (this.loggingEnabled)
 8 FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback);
 9 if (queueThreadLocals != null)
10 {
11 queueThreadLocals.workStealingQueue.LocalPush(callback);
12 }
13 else
14 {
15 ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead;
16 while (!comparand.TryEnqueue(callback))
17 {
18 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null);
19 for (; comparand.Next != null; comparand = this.queueHead)
20 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand);
21 }
22 }
23 this.EnsureThreadRequested();
24 }

ThreadPoolWorkQueue主要包含2個“queue”(實際是數組),一個為QueueSegment(global work queue),另一個是WorkStealingQueue(local work queue)。兩者具體的區別會在Task/TPL裡講解,這裡暫不解釋。
由於forceGlobal是true,所以執行到了comparand.TryEnqueue(callback),也就是QueueSegment.TryEnqueue。comparand先從隊列的頭(queueHead)開始enqueue,如果不行就繼續往下enqueue,成功後再賦值給queueHead。
讓我們來看看QueueSegment的源代碼:

 1 public QueueSegment()
 2 {
 3 this.nodes = new IThreadPoolWorkItem[256];
 4 }
 5 
 6 public bool TryEnqueue(IThreadPoolWorkItem node)
 7 {
 8 int upper;
 9 int lower;
10 this.GetIndexes(out upper, out lower);
11 while (upper != this.nodes.Length)
12 {
13 if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
14 {
15 Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node);
16 return true;
17 }
18 }
19 return false;
20 }

這個所謂的global work queue實際上是一個IThreadPoolWorkItem的數組,而且限死256,這是為啥?難道是因為和IIS線程池(也只有256個線程)對齊?使用interlock和內存寫屏障volatile.write來保證nodes的正確性,比起同步鎖性能有很大的提高。最後調用EnsureThreadRequested,EnsureThreadRequested會調用QCall把請求發送至CLR,由CLR調度ThreadPool。

線程池如何執行任務
線程被調度後通過ThreadPoolWorkQueue的Dispatch方法來執行callback。

 1 internal static bool Dispatch()
 2 {
 3 ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue;
 4 int tickCount = Environment.TickCount;
 5 threadPoolWorkQueue.MarkThreadRequestSatisfied();
 6 threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18);
 7 bool flag1 = true;
 8 IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null;
 9 try
10 {
11 ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue();
12 while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum)
13 {
14 try
15 {
16 }
17 finally
18 {
19 bool missedSteal = false;
20 threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal);
21 if (callback == null)
22 flag1 = missedSteal;
23 else
24 threadPoolWorkQueue.EnsureThreadRequested();
25 }
26 if (callback == null)
27 return true;
28 if (threadPoolWorkQueue.loggingEnabled)
29 FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback);
30 if (ThreadPoolGlobals.enableWorkerTracking)
31 {
32 bool flag2 = false;
33 try
34 {
35 try
36 {
37 }
38 finally
39 {
40 ThreadPool.ReportThreadStatus(true);
41 flag2 = true;
42 }
43 callback.ExecuteWorkItem();
44 callback = (IThreadPoolWorkItem) null;
45 }
46 finally
47 {
48 if (flag2)
49 ThreadPool.ReportThreadStatus(false);
50 }
51 }
52 else
53 {
54 callback.ExecuteWorkItem();
55 callback = (IThreadPoolWorkItem) null;
56 }
57 if (!ThreadPool.NotifyWorkItemComplete())
58 return false;
59 }
60 return true;
61 }
62 catch (ThreadAbortException ex)
63 {
64 if (callback != null)
65 callback.MarkAborted(ex);
66 flag1 = false;
67 }
68 finally
69 {
70 if (flag1)
71 threadPoolWorkQueue.EnsureThreadRequested();
72 }
73 return true;
74 }

while語句判斷如果執行時間少於30ms會不斷繼續執行下一個callback。這是因為大多數機器線程切換大概在30ms,如果該線程只執行了不到30ms就在等待中斷線程切換那就太浪費CPU了,浪費可恥啊!
Dequeue負責找到需要執行的callback:

 1 public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
 2 {
 3 callback = (IThreadPoolWorkItem) null;
 4 missedSteal = false;
 5 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue;
 6 workStealingQueue1.LocalPop(out callback);
 7 if (callback == null)
 8 {
 9 for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); comparand = this.queueTail)
10 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand);
11 }
12 if (callback != null)
13 return;
14 ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current;
15 int num = tl.random.Next(current.Length);
16 for (int length = current.Length; length > 0; --length)
17 {
18 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]);
19 if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal))
20 break;
21 ++num;
22 }
23 }

因為我們把callback添加到了global work queue,所以local work queue(workStealingQueue.LocalPop(out callback))找不到callback,local work queue查找callback會在task裡講解。接著又去global work queue查找,先從global work queue的起始位置查找直至尾部,因此global work quque裡的callback是FIFO的執行順序。

 1 public bool TryDequeue(out IThreadPoolWorkItem node)
 2 {
 3 int upper;
 4 int lower;
 5 this.GetIndexes(out upper, out lower);
 6 while (lower != upper)
 7 {
 8 // ISSUE: explicit reference operation
 9 // ISSUE: variable of a reference type
10 int& prevUpper = @upper;
11 // ISSUE: explicit reference operation
12 int newUpper = ^prevUpper;
13 // ISSUE: explicit reference operation
14 // ISSUE: variable of a reference type
15 int& prevLower = @lower;
16 // ISSUE: explicit reference operation
17 int newLower = ^prevLower + 1;
18 if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower))
19 {
20 SpinWait spinWait = new SpinWait();
21 while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null)
22 spinWait.SpinOnce();
23 this.nodes[lower] = (IThreadPoolWorkItem) null;
24 return true;
25 }
26 }
27 node = (IThreadPoolWorkItem) null;
28 return false;
29 }

使用自旋鎖和內存讀屏障來避免內核態和用戶態的切換,提高了獲取callback的性能。如果還是沒有callback,那麼就從所有的local work queue裡隨機選取一個,然後在該local work queue裡“偷取”一個任務(callback)。
拿到callback後執行callback.ExecuteWorkItem(),通知完成。

總結
ThreadPool提供了方法調整線程池最少活躍的線程來應對不同的並發場景。ThreadPool帶有2個work queue,一個golbal一個local。執行時先從local找任務,接著去global,最後才會去隨機選取一個local偷一個任務,其中global是FIFO的執行順序。Work queue實際上是數組,使用了大量的自旋鎖和內存屏障來提高性能。但是在偷取任務上,是否可以考慮得更多,隨機選擇一個local太隨意。首先要考慮偷取的隊列上必須有可執行任務;其次可以選取一個不在調度中的線程的local work queue,這樣降低了自旋鎖的可能性,加快了偷取的速度;最後,偷取的時候可以考慮像golang一樣偷取別人queue裡一半的任務,因為執行完偷到的這一個任務之後,下次該線程再次被調度到還是可能沒任務可執行,還得去偷取別人的任務,這樣既浪費CPU時間,又讓任務在線程上分布不均勻,降低了系統吞吐量!

另外,如果禁用log和ETW trace,可以使ThreadPool的性能更進一步。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved