問:我目前正在使用 Microsoft .NET Framework ThreadPool,在使用過程中我遇到了一種情況,不知道該如何解決。我需要處理一個較大批次的已在隊列中的工作項,在第一個批次開始處理後第二個批次(規模稍小)到達。最初,較大批次中的一些工作將被分配給 ThreadPool 中的所有工作線程。但是,當第二批到達後,我想平均分配,使每一批都得到同樣的服務,而不是先到的批次得到所有線程。
當其中一批完成時,我希望無論哪一個仍需處理的批次,都能夠獲得所有工作線程的關注。我該如何在 ThreadPool 頂層對此類批處理功能進行分層呢?
答:在以前的專欄中,我曾介紹過如何在現有 .NET ThreadPool 的頂層對各種類型的功能進行分層。在 2004 年 10 月刊的《MSDN 雜志》中,我介紹了如何使 ThreadPool 支持等待排隊的工作項(請參見“ThreadPoolWait 和 HandleLeakTracker”)。在 2004 年 11 月刊中,我介紹了如何添加對工作項優先級的支持(請參見“ThreadPoolPriority 和 MethodImplAttribute”)。在 2006 年 3 月刊中,我介紹了如何添加取消支持(請參見“可以取消的線程池”)。將來,我可以回顧 2009 年 1 月這一期,並提到我曾介紹過如何在 ThreadPool 中添加輪詢調度支持。
您需要解決的問題首先是要了解 ThreadPool 分派工作的原理。在內部,它維護一個已被加入其隊列的工作項隊列。當池中的某個線程可以執行工作時,它會返回到此工作隊列並提取下一個項目。此處理順序並未記錄在案,因此是非常不可靠的(因為它可能而且很可能會在未來版本中有所變更)。
現在它可以通過一種非常簡單的方式來實現:先進先出 (FIFO) 隊列。因此,要排隊的第一個工作將成為被線程所選中的第一個工作。在您的批處理情況中,這意味著第一批中的所有工作在隊列中都將排在第二批所有工作的前面。因此,第一批的所有工作都將在第二批的所有工作之前進行分派。對於某些情況這是合理且最佳的方法。而對於您的情況,還需要進行更多的控制。
從 ThreadPool 獲得這種控制的最簡單方法之一是將您自己的委托替換為用戶真正需要執行的委托。例如,假設您想捕獲已排隊工作中拋出的所有未處理異常並針對每個異常引發一個事件。要執行此操作,您可以編寫如圖 1 所示的代碼。然後,使用 ExceptionThreadPool.QueueUserWorkItem 而不是使用 ThreadPool.QueueUserWorkItem。此工作仍將由 ThreadPool 來執行,但池中的線程實際執行的是您排隊的委托,而非用戶提供的委托。接下來在調用您的委托時將會調用用戶提供的委托,而且會捕獲所有異常並引發目標事件。
圖 1 填充 ThreadPool
public static class ExceptionThreadPool { public static void QueueUserWorkItem( WaitCallback callback, object state) { ThreadPool.QueueUserWorkItem(delegate { try { callback(state); } catch (Exception exc) { var handler = UnhandledException; if (handler != null) handler(null, new UnhandledExceptionEventArgs(exc, false)); } }); } public static event UnhandledExceptionEventHandler UnhandledException; }
請注意,此方法雖然功能很強大,但卻需要付出一定的代價:需要分配額外的委托、需要調用額外的委托等等。成本是否過高只能根據您和您所處的環境來確定,但與從頭編寫自己的線程池相比,這種分層通常更具成本效益。
當然,這是一個非常簡單的示例,但您也可以用來處理很復雜的工作。我之前引用的優先級池示例將用戶提供的委托存儲在它自己的數據結構中。然後它將返回的替代委托在池中排隊並在這些數據結構中搜索要執行的正確委托,而且更傾向於首先執行優先級較高的委托。您可以使用類似的方法來解決您的批處理困境。
您可以花些時間想象一下,如果不是只有單個隊列,而是每個批次都有一個隊列的情形。各個批次會將工作置入其自身的相關隊列中。然後,您可以使用池中的線程在所有隊列之間進行輪詢。排列到實際 ThreadPool 隊列中的委托將返回到您的數據結構中,它會從下一個要檢查的隊列開始查找工作。如果發現,則從該隊列開始執行。否則,它將繼續查找下一個隊列。
通過這種方式,您可以在各個批次之間提供公平的調度。如果只有一批,則線程將始終從該隊列提取工作。如果有多批,則線程將訪問每個隊列,並給予大致相同的待遇。圖 2 為您提供了此解決方案的概觀。
圖 2 RoundRobinThreadPool 方法
為使工作進行下去,您首先需要一個數據結構來存儲用戶提供的委托。我所采用的表現形式如圖 3 所示。此數據結構包含三個屬性。前兩個屬性您應該很熟悉;它們是用戶為 QueueUserWorkItem 提供的狀態,所以您理所當然需要對其進行緩存。不過,第三個屬性可能會有些陌生。在 .NET 中與每個執行線程都相關聯的是 System.Threading.ExecutionContext,它代表一些信息,例如當前用戶、與邏輯執行線程相關聯的任何狀態、代碼訪問安全信息等。這種跨異步執行點的上下文流動非常重要。
圖 3 捕獲工作項
internal class WorkItem { public WaitCallback WaitCallback; public object State; public ExecutionContext Context; private static ContextCallback _contextCallback = state => { var item = (WorkItem)state; item.WaitCallback(item.State); }; public void Execute() { if (Context != null) ExecutionContext.Run(Context, _contextCallback, this); else WaitCallback(State); } }
例如,如果您要模擬特定的 Windows 身份標識並調用 ThreadPool.QueueUserWorkItem,則您排隊的工作應使用相同的 Windows 身份標識來執行。否則將會出現潛在的安全漏洞。從 .NET Framework 2.0 開始,此上下文會默認自動跨代碼中的所有異步點流動:ThreadPool.QueueUserWorkItem、新建線程、異步委托調用等。
但是對於此處討論的實現,您應遵守一組不同的規則。通過從委托的執行順序更改其排隊順序,此 ExecutionContext 流與用戶提供的工作項之間不再存在直接的對應關系。因此,您的實現需要正確緩存 ExecutionContext 與用戶提供的委托,然後使用捕獲的上下文來運行該委托。
現在,您已經有了一個工作項,讓我們來看一看它將被放入的隊列(如圖 4 所示)。RoundRobinThreadPool.Queue 數據結構本身相當簡單。在內部,它包含一個用來存儲為其提供的所有工作項的 Queue<WorkItem>、一個對所關聯的 RoundRobinThreadPool 實例的引用,以及一個表示 Dispose 方法是否在隊列中被調用過的布爾值。然後,它會為 QueueUserWorkItem 方法提供與 ThreadPool 相同的簽名。
圖 4 輪詢隊列
public sealed class RoundRobinThreadPool { private List<Queue> _queues; ... public sealed class Queue : IDisposable { internal Queue(RoundRobinThreadPool pool) { _pool = pool; } internal Queue<WorkItem> _workItems = new Queue<WorkItem>(); private RoundRobinThreadPool _pool; internal bool _disposed; public void QueueUserWorkItem(WaitCallback callback) { QueueUserWorkItem(callback, null); } public void QueueUserWorkItem(WaitCallback callback, object state) { if (_disposed) throw new ObjectDisposedException(GetType().Name); var item = new WorkItem { Context = ExecutionContext.Capture(), WaitCallback = callback, State = state }; lock (_pool._queues) _workItems.Enqueue(item); _pool.NotifyNewWorkItem(); } public void Dispose() { if (!_disposed) { lock (_pool._queues) { if (_workItems.Count == 0) _pool.RemoveQueueNeedsLock(this); } _disposed = true; } } } }
調用 QueueUserWorkItem 時,用戶提供的回調和狀態(以及當前的 ExecutionContext)將被捕獲到 WorkItem 中。此工作隨後將被存儲到泛型隊列中,並且相關池會得到新工作已到達的通知。您務必要注意到這裡使用了一個鎖來保護工作項隊列,因為 QueueUserWorkItem 可以同時從多個線程進行調用,而您需要確保維持不變性。
還要注意,被鎖定的對象是一個來自池的全局隊列列表。對於整個實現,我使用的是一個粗粒度鎖。更有效的實現可能會使用更細粒度的鎖定,例如針對每個隊列使用單個鎖,而不是整個 RoundRobinThreadPool 使用一個。為了便於實現和出於方便考慮,我選擇使用單一鎖。
當不再需要此隊列時會使用 Dispose 方法。在典型的批處理方案中,將會創建一個隊列、將工作排列到該隊列中,然後釋放隊列。如果 Dispose 方法只是將此隊列從池中刪除,那麼即使在其中仍有要處理的工作項,該隊列仍可能會被刪除。
這時 Dispose 會做兩件事情。首先,它會檢查是否存在任何剩余的工作項。如果不存在,隊列會調用池以將自身刪除。其次,它將自身標記為已釋放。您稍後會看到當池遇到已釋放但尚未刪除的池時,它是如何處理這一情況的。
圖 5 顯示實現的其余部分,即 RoundRobinThreadPool 類本身。該池包含四個字段:
池(同時還作為先前提及的鎖)所維護的各個隊列的列表。
池的默認隊列。
代表要在其中搜索工作的下一個隊列的整數。
已實際排列到 ThreadPool 中的回調委托。
圖 5 RoundRobinThreadPool
public sealed class RoundRobinThreadPool { private List<Queue> _queues; private Queue _defaultQueue; private int _nextQueue; private WaitCallback _callback; public RoundRobinThreadPool() { _queues = new List<Queue>(); _callback = DequeueAndExecuteWorkItem; _nextQueue = 0; _defaultQueue = CreateQueue(); } public Queue CreateQueue() { var createdQueue = new Queue(this); lock (_queues) _queues.Add(createdQueue); return createdQueue; } public void QueueUserWorkItem(WaitCallback callback) { QueueUserWorkItem(callback, null); } public void QueueUserWorkItem(WaitCallback callback, object state) { _defaultQueue.QueueUserWorkItem(callback, state); } private void RemoveQueueNeedsLock(Queue queue) { int index = _queues.IndexOf(queue); if (_nextQueue >= index) _nextQueue--; _queues.RemoveAt(index); } private void NotifyNewWorkItem() { ThreadPool.UnsafeQueueUserWorkItem(_callback, null); } private void DequeueAndExecuteWorkItem(object ignored) { WorkItem item = null; lock (_queues) { var searchOrder = Enumerable.Range(_nextQueue, _queues.Count - _nextQueue). Concat(Enumerable.Range(0, _nextQueue)); foreach (int i in searchOrder) { var items = _queues[i]._workItems; if (items.Count > 0) { item = items.Dequeue(); _nextQueue = i; if (queue._disposed && items.Count == 0) RemoveQueueNeedsLock(_queues[i]); break; } } _nextQueue = (_nextQueue + 1) % _queues.Count; } if (item != null) item.Execute(); } ... // RoundRobinThreadPool.Queue and .WorkItem, already shown }
初始化 RoundRobinThreadPool 時,將會配置所有這些狀態。特別是,會通過調用 CreateQueue 方法來初始化默認隊列。此 CreateQueue 方法與公開的、允許開發人員向池中添加其他隊列(例如,當需要自己獨立隊列的新工作批次到達時)的方法完全相同。它只實例化新 RoundRobinThreadPool.Queue 實例(圖 3 中探討的類型)、將其添加到隊列列表中,然後再返回它。
為了方便使用,RoundRobinThreadPool 公開了其自身的 QueueUserWorkItem 方法;這些方法只針對在池實例化時創建的默認隊列。
下面介紹 NotifyNewWorkItem 方法。您可能會記得,如果針對某個隊列調用了 QueueUserWorkItem,則在存儲工作項之後,該隊列會在池中調用此 NotifyNewWorkItem 方法。此方法只是委托給實際的 ThreadPool,它會提交一個將會返回給 DequeueAndExecuteWorkItem 方法(稍後介紹)的委托,顧名思義,此方法將會取消排隊並執行輪詢池中的工作項。
請注意,NotifyNewWorkItem 調用的是 ThreadPool.UnsafeQueueUserWorkItem,而不是 ThreadPool.QueueUserWorkItem。"Unsafe" 前綴只是意味著它不是流動 ExecutionContext;不這樣做可得到一些性能優勢。由於此實現已開始對 ExecutionContext 流進行手動處理,因此 ThreadPool 也不需要嘗試執行此操作。
DequeueAndExecuteWorkItem 是真正的奇妙所在。此方法首先生成隊列搜索順序。搜索順序從要檢查的下一個隊列開始一直到列表結束,然後再循環回來,從列表的起始處到隊列起始處。為了簡化實現,故使用 LINQ Enumerable.Range 方法生成了兩個列表,然後再使用 LINQ Enumerable.Concat 方法將其連接在一起。
確定了搜索順序後,即可開始查找工作項。每個隊列都會按指定的順序進行檢查,找到工作項後,即會將其刪除並更新下一個指針。然後會使用圖 3 中所示的 Execute 方法來調用此工作項。
此處有一行特別有趣的代碼,它用來檢查剛剛從中檢索到某個項目的隊列是否已釋放且為空。如果池發現了此類隊列,就會意識到不會再有任何項目添加到其中(因為它已被釋放),因此不需要再保留這個池。此時,將會使用 RemoveQueueNeedsLock 把目標隊列從隊列列表中刪除,並可能會更新下一個隊列指針以防止其超出范圍。
請注意,此方法沒有在內部使用鎖,而是訪問共享狀態;因此我用 "NeedsLock" 後綴命名此方法,以提醒我自己在持有鎖時需要調用此方法。您會注意到,針對 RemoveQueueNeedLock 的兩個調用站點(一個在隊列的 Dispose 方法中,另一個在池的 DequeueAndExecuteWorkItem 方法中)在持有隊列鎖的時候都會調用此方法。
隨著此實現的完成,您現在可以在代碼中對此進行測試。在以下示例中,我創建了 RoundRobinThreadPool 的一個靜態實例。如果有一批工作到達並需要進行處理,這時將會創建一個新的隊列、將所有工作都排列到此隊列中,然後釋放隊列:
private static RoundRobinThreadPool _pool = new RoundRobinThreadPool(); ... private void ProcessBatch(Batch b) { using(var queue = _pool.CreateQueue()) { foreach(var item in b) { queue.QueueUserWorkItem(() => ProcessItem(item)); } } }
即使會首先安排第一批到達的所有工作,但只要第二批到達,它也將會得到基本相同的處理資源。
可以在此實現中添加更多的點綴性功能,從性能角度來看,實現很可能會大為改觀。但是,幾乎不需要任何代碼您即可創建一個抽象,從而利用 .NET ThreadPool 及其提供的所有能力和功能,而您仍可以獲得您所尋求的基於批次的公平性。