本文通過對.NET4.5的ThreadPool源碼的分析講解揭示.NET線程池的內幕,並總結ThreadPool設計的好與不足。
線程池的作用
線程池,顧名思義,線程對象池。Task和TPL都有用到線程池,所以了解線程池的內幕有助於你寫出更好的程序。由於篇幅有限,在這裡我只講解以下核心概念:
Threadpool也支持操控IOCP的線程,但在這裡我們不研究它,涉及到task和TPL的會在其各自的博客中做詳解。
線程池的大小
不管什麼池,總有尺寸,ThreadPool也不例外。ThreadPool提供了4個方法來調整線程池的大小:
SetMaxThreads指定線程池最多可以有多少個線程,而GetMaxThreads自然就是獲取這個值。SetMinThreads指定線程池中最少存活的線程的數量,而GetMinThreads就是獲取這個值。
為何要設置一個最大數量和有一個最小數量呢?原來線程池的大小取決於若干因素,如虛擬地址空間的大小等。比如你的計算機是4g內存,而一個線程的初始堆棧大小為1m,那麼你最多能創建4g/1m的線程(忽略操作系統本身以及其他進程內存分配);正因為線程有內存開銷,所以如果線程池的線程過多而又沒有被完全使用,那麼這就是對內存的一種浪費,所以限制線程池的最大數是很make sense的。
那麼最小數又是為啥?線程池就是線程的對象池,對象池的最大的用處是重用對象。為啥要重用線程,因為線程的創建與銷毀都要占用大量的cpu時間。所以在高並發狀態下,線程池由於無需創建銷毀線程節約了大量時間,提高了系統的響應能力和吞吐量。最小數可以讓你調整最小的存活線程數量來應對不同的高並發場景。
如何調用線程池添加任務
線程池主要提供了2個方法來調用:QueueUserWorkItem和UnsafeQueueUserWorkItem。
兩個方法的代碼基本一致,除了attribute不同,QueueUserWorkItem可以被partial trust的代碼調用,而UnsafeQueueUserWorkItem只能被full trust的代碼調用。
1 public static bool QueueUserWorkItem(WaitCallback callBack) 2 { 3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; 4 return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true); 5 }
QueueUserWorkItemHelper首先調用ThreadPool.EnsureVMInitialized()來確保CLR虛擬機初始化(VM是一個統稱,不是單指java虛擬機,也可以指CLR的execution engine),緊接著實例化ThreadPoolWorkQueue,最後調用ThreadPoolWorkQueue的Enqueue方法並傳入callback和true。
1 [SecurityCritical] 2 public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) 3 { 4 ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null; 5 if (!forceGlobal) 6 queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; 7 if (this.loggingEnabled) 8 FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback); 9 if (queueThreadLocals != null) 10 { 11 queueThreadLocals.workStealingQueue.LocalPush(callback); 12 } 13 else 14 { 15 ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead; 16 while (!comparand.TryEnqueue(callback)) 17 { 18 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null); 19 for (; comparand.Next != null; comparand = this.queueHead) 20 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand); 21 } 22 } 23 this.EnsureThreadRequested(); 24 }
ThreadPoolWorkQueue主要包含2個“queue”(實際是數組),一個為QueueSegment(global work queue),另一個是WorkStealingQueue(local work queue)。兩者具體的區別會在Task/TPL裡講解,這裡暫不解釋。
由於forceGlobal是true,所以執行到了comparand.TryEnqueue(callback),也就是QueueSegment.TryEnqueue。comparand先從隊列的頭(queueHead)開始enqueue,如果不行就繼續往下enqueue,成功後再賦值給queueHead。
讓我們來看看QueueSegment的源代碼:
1 public QueueSegment() 2 { 3 this.nodes = new IThreadPoolWorkItem[256]; 4 } 5 6 public bool TryEnqueue(IThreadPoolWorkItem node) 7 { 8 int upper; 9 int lower; 10 this.GetIndexes(out upper, out lower); 11 while (upper != this.nodes.Length) 12 { 13 if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower)) 14 { 15 Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node); 16 return true; 17 } 18 } 19 return false; 20 }
這個所謂的global work queue實際上是一個IThreadPoolWorkItem的數組,而且限死256,這是為啥?難道是因為和IIS線程池(也只有256個線程)對齊?使用interlock和內存寫屏障volatile.write來保證nodes的正確性,比起同步鎖性能有很大的提高。最後調用EnsureThreadRequested,EnsureThreadRequested會調用QCall把請求發送至CLR,由CLR調度ThreadPool。
線程池如何執行任務
線程被調度後通過ThreadPoolWorkQueue的Dispatch方法來執行callback。
1 internal static bool Dispatch() 2 { 3 ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue; 4 int tickCount = Environment.TickCount; 5 threadPoolWorkQueue.MarkThreadRequestSatisfied(); 6 threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18); 7 bool flag1 = true; 8 IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null; 9 try 10 { 11 ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue(); 12 while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum) 13 { 14 try 15 { 16 } 17 finally 18 { 19 bool missedSteal = false; 20 threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal); 21 if (callback == null) 22 flag1 = missedSteal; 23 else 24 threadPoolWorkQueue.EnsureThreadRequested(); 25 } 26 if (callback == null) 27 return true; 28 if (threadPoolWorkQueue.loggingEnabled) 29 FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback); 30 if (ThreadPoolGlobals.enableWorkerTracking) 31 { 32 bool flag2 = false; 33 try 34 { 35 try 36 { 37 } 38 finally 39 { 40 ThreadPool.ReportThreadStatus(true); 41 flag2 = true; 42 } 43 callback.ExecuteWorkItem(); 44 callback = (IThreadPoolWorkItem) null; 45 } 46 finally 47 { 48 if (flag2) 49 ThreadPool.ReportThreadStatus(false); 50 } 51 } 52 else 53 { 54 callback.ExecuteWorkItem(); 55 callback = (IThreadPoolWorkItem) null; 56 } 57 if (!ThreadPool.NotifyWorkItemComplete()) 58 return false; 59 } 60 return true; 61 } 62 catch (ThreadAbortException ex) 63 { 64 if (callback != null) 65 callback.MarkAborted(ex); 66 flag1 = false; 67 } 68 finally 69 { 70 if (flag1) 71 threadPoolWorkQueue.EnsureThreadRequested(); 72 } 73 return true; 74 }
while語句判斷如果執行時間少於30ms會不斷繼續執行下一個callback。這是因為大多數機器線程切換大概在30ms,如果該線程只執行了不到30ms就在等待中斷線程切換那就太浪費CPU了,浪費可恥啊!
Dequeue負責找到需要執行的callback:
1 public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal) 2 { 3 callback = (IThreadPoolWorkItem) null; 4 missedSteal = false; 5 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue; 6 workStealingQueue1.LocalPop(out callback); 7 if (callback == null) 8 { 9 for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); comparand = this.queueTail) 10 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand); 11 } 12 if (callback != null) 13 return; 14 ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current; 15 int num = tl.random.Next(current.Length); 16 for (int length = current.Length; length > 0; --length) 17 { 18 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]); 19 if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal)) 20 break; 21 ++num; 22 } 23 }
因為我們把callback添加到了global work queue,所以local work queue(workStealingQueue.LocalPop(out callback))找不到callback,local work queue查找callback會在task裡講解。接著又去global work queue查找,先從global work queue的起始位置查找直至尾部,因此global work quque裡的callback是FIFO的執行順序。
1 public bool TryDequeue(out IThreadPoolWorkItem node) 2 { 3 int upper; 4 int lower; 5 this.GetIndexes(out upper, out lower); 6 while (lower != upper) 7 { 8 // ISSUE: explicit reference operation 9 // ISSUE: variable of a reference type 10 int& prevUpper = @upper; 11 // ISSUE: explicit reference operation 12 int newUpper = ^prevUpper; 13 // ISSUE: explicit reference operation 14 // ISSUE: variable of a reference type 15 int& prevLower = @lower; 16 // ISSUE: explicit reference operation 17 int newLower = ^prevLower + 1; 18 if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower)) 19 { 20 SpinWait spinWait = new SpinWait(); 21 while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null) 22 spinWait.SpinOnce(); 23 this.nodes[lower] = (IThreadPoolWorkItem) null; 24 return true; 25 } 26 } 27 node = (IThreadPoolWorkItem) null; 28 return false; 29 }
使用自旋鎖和內存讀屏障來避免內核態和用戶態的切換,提高了獲取callback的性能。如果還是沒有callback,那麼就從所有的local work queue裡隨機選取一個,然後在該local work queue裡“偷取”一個任務(callback)。
拿到callback後執行callback.ExecuteWorkItem(),通知完成。
總結
ThreadPool提供了方法調整線程池最少活躍的線程來應對不同的並發場景。ThreadPool帶有2個work queue,一個golbal一個local。執行時先從local找任務,接著去global,最後才會去隨機選取一個local偷一個任務,其中global是FIFO的執行順序。Work queue實際上是數組,使用了大量的自旋鎖和內存屏障來提高性能。但是在偷取任務上,是否可以考慮得更多,隨機選擇一個local太隨意。首先要考慮偷取的隊列上必須有可執行任務;其次可以選取一個不在調度中的線程的local work queue,這樣降低了自旋鎖的可能性,加快了偷取的速度;最後,偷取的時候可以考慮像golang一樣偷取別人queue裡一半的任務,因為執行完偷到的這一個任務之後,下次該線程再次被調度到還是可能沒任務可執行,還得去偷取別人的任務,這樣既浪費CPU時間,又讓任務在線程上分布不均勻,降低了系統吞吐量!
另外,如果禁用log和ETW trace,可以使ThreadPool的性能更進一步。