Microsoft .NET Framework 4 中引入的任務並行庫 (TPL) 能夠讓應用程序開發者創建解決方案, 以利用多核計算機中的並行處理能力。但是在許多情況下,垂直擴展的能力(添加多個核心)受到多種 因素的約束,包括成本和主機托管限制。在這種情況下,如果需要擴展能力,則可以將數據處理在服務 器陣列中分配;雲托管就是這樣的例子。在本文中,我將描述一套概念性解決方案的主要方面(包括實 施),通過使用 .NET Framework 4.5 的多項新功能完成上述情景。
基本前提條件
我將 要描述的方法要求使用除 TPL 以外的多種技術,包括:
任務並行庫 (TPL)
Windows Communication Foundation (WCF)
托管可擴展性框架 (MEF)
請注意,我僅在我要試圖解決的問題中討論這些方面。我假設你非常了解這些技術。
遠程任 務客戶端、任務協調器及任務執行節點
遠程任務客戶端是客戶端側層,將隱藏由於使用分布式 環境的語義而產生的復雜性。遠程任務客戶端直接與任務協調器交互,然後協調器成為底層基礎結構的 入口點。總體而言,任務協調器具有以下屬性:
任務協調器是與客戶端進行通訊的唯一聯絡點。
協調器公開必要的服務,以請求在可擴展的平台上執行任務,以及取消特定任務。
協調器處理任務執行請求的限制和隊列,支持環境的健康操作。
任務執行節點是任務運行流程的主機。由 TPL 執行的任務的實際實施駐留在任務執行節點。
以下是這些邏輯層和主要方面及信息流:
遠程任務客戶端請求執行一項或多項任務。
任務協調器向任務執行節點提交請求。
任務執行節點執行任務並更新任務協調器中每項請求的狀態。
任務協調器使用每項請求的執行結果,更新客戶端。
任務執行節點駐留在負載平衡器的後面,因此可以按需要添加更多節點,從而能夠實現水平擴展。
圖 1 顯示了邏輯層和信息流。
圖 1 水平擴展任務
注意任務執行節點如何更新任務協調器,然後任務協調器更新遠 程任務客戶端。我將要描述一項基於客戶端與任務協調器之間雙向通訊的實施,以及基於任務協調器與 任務執行節點之間雙向通訊的實施。根據 WCF 定義,這表示使用雙工通道,使任務執行節點回調任務 協調器,然後任務協調器執行相同操作以更新客戶端。我將展示使用 WebSockets 實現這種雙向通訊方 法。WebSockets 傳輸是作為 .NET Framework 4.5 的一個新綁定來實施的,可在 Windows 8 上使用。 有關此綁定的詳情,請訪問 bit.ly/SOLNiU。
客戶端與任務協調器
既然已經理解了三大 邏輯層——遠程任務客戶端、任務協調器及任務執行節點,現在讓我們開始討論遠程任務客戶端的實施 。注意當我在本文中使用“客戶端”一詞,我是指遠程任務客戶端。
如上所述,客戶端的價值 主張就是隱藏底層部件復雜性的能力。一方面,客戶端通過提供 API 來實現這一點,給人一種在本地 執行任務的印象,盡管事實上可能在其他地方執行。圖 2 中的代碼顯示了 RemoteTaskClient 類的公 共方法。
圖 2 RemoteTaskClient 類的公共方法
public class RemoteTaskClient<TResult> : IDisposable
{
public void AddRequest(string typeName,
string[] parameters, CancellationToken tk)
{...}
public void AddRequest(string typeName, string[] parameters)
{...}
public Task<TResult>[] SubmitRequests()
{...}
public RemoteTaskClient(string taskCoodinatorEndpointAddress)
{...}
public void Dispose()
{...}
}
您可以使用 AddRequest 方法添加遠程執行的請求。 對於每項請 求,您需要指定 typeName(這是實際實施的類型,包含一個委托,基礎結構將此委托當作 TPL 任務進 行遠程運行)及相關參數。 然後您可以通過 SubmitRequest 方法提交請求。 提交請求的結果是 TPL 任務陣列,每個請求一個任務。 這種方法允許您將所產生的 TPL 任務當作本地任務來管理。 例如, 您可以提交多個請求,然後等待請求完成,例如:
using (var c = new RemoteTaskClient<int>("..."))
{
c.AddRequest("...", null);
c.AddRequest("...", null);
var ts = c.SubmitRequests();
Task.WaitAll(ts);
foreach (var t in ts)
Console.WriteLine(t.Result);
}
在詳細討論 RemoteTaskClient 的實施之前,讓我們看看任務協調器公開的服務操作和數據約定。 在回顧 RemoteTaskClient 的實施之前理解這些約定將為您提供額外的上下文知識,因為客戶端的實施依賴於 這些服務。
圖 3 中的代碼顯示了任務協調器向客戶端公開的服務操作。 通過 SubmitRequest 操作,客戶端能夠請求執行一項或多項 TPL 任務。 客戶端也可以通過 CancelTask 操作,請求取消某 項尚未完成的 TPL 任務。 注:UpdateStatus 操作是一項回調。 通過在客戶端側實施這種回調約定, 任務協調器將更新客戶端的狀態。
圖 3 服務操作
[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskCoordinator
{
[OperationContract(IsOneWay = true)]
void SubmitRequest(List<STask> stask);
[OperationContract]
bool CancelTask(string Id);
}
public interface ITaskUpdateCallback
{
[OperationContract (IsOneWay = true)]
void UpdateStatus (string id, STaskStatus status, string result);
}
讓我們看 看代表了任務執行請求的數據約定。 這是客戶端發送給任務協調器的數據實體,然後任務協調器將向 任務執行節點提交請求,而請求就在任務執行節點執行。 圖 4 中顯示的 STask 類模擬了一項任務執 行請求。 客戶端使用 STaskTypeName 和 STaskParameters 屬性,可以設置想要執行的任務類型及相 關參數。 任務協調器將使用屬性 ID 作為唯一標識符,邏輯層可以使用此 ID 將請求與系統中運行的 實際 TPL 任務進行關聯。
圖 4 STask 類
[DataContract]
public class STask
{
[DataMember]
public string Id
{ get; set; }
[DataMember]
public string STaskTypeName
{ get; set; }
[DataMember]
public string[] STaskParameters
{ get; set; }
}
現在讓我們回到 RemoteTaskClient,討論我如何計劃將本地 TPL 任務與任務執行節點中的執行結果進行關聯。 TPL 有 一個很方便的類,TaskCompletionSource<TResult>,我可以用它來創建一個 TPL 任務並控制其 生命周期。 此機制讓我標記特定任務何時完成、取消或出錯。 其中的意義在於每項進入任務執行節點 (通過任務協調器)的請求必須與一個 TaskCompletionSource 實例相關聯。 為此,我實施了 ClientRequestInfo 類,如圖 5 所示。
圖 5 ClientRequestInfo 類
internal class ClientRequestInfo<TResult>
{
internal STask TaskExecutionRequest
{ get; set; }
internal TaskCompletionSource<TResult> CompletionSource
{ get; set; }
internal ClientRequestInfo(string typeName, string[] args)
{
TaskExecutionRequest = new STask()
{Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
STaskParameters = args };
CompletionSource = new TaskCompletionSource<TResult>();
}
}
圖 6 顯示了此類構造函數的實施。
圖 6 ClientRequestInfo 構造函數
ITaskCoordinator _client;
ConcurrentDictionary<string, ClientRequestInfo<TResult>>
_requests = new ConcurrentDictionary<string,
ClientRequestInfo<TResult>>();
public RemoteTaskClient(string taskCoordinatorEndpointAddress)
{
var factory = new DuplexChannelFactory<ITaskCoordinator>
(new InstanceContext(new CallbackHandler<TResult>(_requests)),
new NetHttpBinding(),
new EndpointAddress(taskCoordinatorEndpointAddress));
_client = factory.CreateChannel();
((IClientChannel) _client).Open();
}
注意:我正在打開通向任務協調器的雙工通 道,並創建 CallbackHandler 類型的回調實例。 CallbackHandler 收到 parameter _requests,裡面 包含 ClientRequestInfo 實例。 基本原理是 _requests 字典裝有客戶端請求的所有活動實例(以及 與其相關的 TaskCompletionSource 實例),而 CallbackHandler 將處理來自任務協調器的更新。 因 為多個服務請求將更新 _requests 字典,所以我需要保證線程安全性,因此需要創建此實例作為 ConcurrentDictionary 的實例。
圖 7 顯示了 CallbackHandler 類的實施。
圖 7 CallbackHandler 類
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
public void UpdateStatus(string id, STaskStatus status, Object result)
{
ClientRequestInfo<TResult> info;
if (_requests.TryRemove(id, out info))
{
switch (status)
{
case STaskStatus.
Completed: info.CompletionSource.SetResult(
(TResult)result);
break;
case STaskStatus.Canceled:
info.CompletionSource.SetCanceled();
break;
case STaskStatus.Faulted:
info.CompletionSource.SetException(
(Exception)result);
break;
}
}
}
internal CallbackHandler(ConcurrentDictionary<string,
ClientRequestInfo<TResult>> requests)
{
requests = requests;
}
}
接下來,讓我們看看 AddRequest 和 SubmitRequest 方法的實施,如圖 8 所示。
圖 8 AddRequest 和 SubmitRequest 方法
public void AddRequest(string typeName, string[] parameters,
CancellationToken tk)
{
var info = new ClientRequestInfo<TResult>(typeName, args);
_buffer.Add(info);
tk.Register(()=> _client.CancelTask (info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
{
_buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
}
public Task<TResult>[] SubmitRequests()
{
if (_buffer.Count == 0)
return null;
var req = _buffer.Select((r) =>
{
_requests.TryAdd(r.TaskExecutionRequest.Id, r);
return r.TaskExecutionRequest;
});
_client.SubmitRequest(req.ToList<STask>());
var ret = _buffer.Select(r =>
r.CompletionSource.Task).ToArray<Task<TResult>>();
_buffer.Clear();
return ret;
}
跟蹤客戶端請求
如上節所示,客戶端僅與任務協調器進行交互 ,因此任務協調器的責任是處理客戶端的請求,然後用 TPL 任務的執行結果更新客戶端。 對於客戶端 ,這需要以某種形式保留原來的請求。 還需要記錄相應的回調實例(允許與客戶端進行通訊);通向 與連接相關的任務執行節點的通道(正如下文所示,在取消情景中需要);唯一標識符,將所有與任務 執行節點單一調用相關的任務執行請求進行分組(以確定何時不再需要此通道);以及執行的狀態和結 果。 圖 9 顯示了 STaskInfo 類的定義,此實體將保存此信息。 此外,我還使用一個 ConcurrentDictionary<TKey,TValue> 實例,作為持久性機制。
圖 9 STaskInfo 和 CoordinatorContext 類
public class STaskInfo
{
public string ExecutionRequestId
{ get; set; }
public STask ClientRequest
{ get; set; }
public ITaskUpdateCallback CallbackChannel
{ get; private set; }
public ITaskExecutionNode ExecutionRequestChannel
{ get; set; }
public STaskInfo(ITaskUpdateCallback callback)
{
CallbackChannel = callback;
}
}
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo>
_submissionTracker =
new ConcurrentDictionary<string, STaskInfo>();
...
}
最後,注意 _submissionTracker 包含在 CoordinatorContext 類中。 我將使用這個 類來實施任務協調器的主要功能。
處理客戶端請求
任務協調器是客戶端的唯一入口點, 即任務協調器必須能夠處理盡可能多的客戶端請求,同時防止任務執行節點飽和(在資源方面)。 這 並非像看起來那麼容易。 為了更好地解釋潛在的變化,讓我們看看一個簡單的解決方案:
任務協調器公開服務操作,客戶端通過服務操作提交任務執行請求。
任務協調器向任務執行節點提交這些請求以進行執行,並記錄這些請求,即持續保持這種狀態。
圖 10 顯示了此提交過程的基本實施。
圖 10 實施提交過程
public class TaskCoordinatorService : ITaskCoordinator
{
...
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.SendTasksToTaskHandler(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
{
var clientFactory = //Client factory creation logic..
var channel = clientFactory.CreateChannel();
foreach (var stask in stasks)
_submissionTracker.TryAdd(stask.Id, stask);
try
{
((IClientChannel)channel).Open ();
channel.Start(stasks);
}
catch (CommunicationException ex)
{
// Error handling and logging ...
}
finally
{
if (((IClientChannel)channel).State != CommunicationState.Faulted)
((IClientChannel)channel).Close();
}
}
...
}
然而,這個簡單的實施在某些情景下並不順利:
如果客戶端在一個請求中提交大量任務,所有任務將最終進入一個任務執行節點,導致不能均勻利 用可用資源(假設有多個任務執行節點可用)。
在峰值負載情景下,如果執行 TPL 任務的數量超過了這些資源可以處理的限值,那麼系統可能會耗 盡任務執行節點中的可用資源。當作為 TPL 任務執行的任務綁定到特定資源(例如內存),那麼在峰 值情況下會增加系統無響應的風險,便會出現這種情況。
限制
解決此類難題的一種方法是“管理”經過系統的任務執行請求。在這種情況下,您可以 將任務協調器看作是限制控制器。但是在討論限制流程之前,讓我們回顧一下限制的語義,我將使用限 制語義連同限制流程一起降低這些風險。
可以通過對任務協調器在一個請求中向任務執行節點 提交的任務執行請求數量設置上限,降低第一個情景的風險。我將此限制稱為 maxSTasksPerRequest。使用此方法,負載平衡器算法將能夠在可用的任務執行節點之間進行負載 的平衡分布。
第二個情景更具挑戰。一個貌似合理的解決方案是對任務執行節點執行的任務數 設定上限。我將此限制稱為 maxNumberOfTasks。
除了此限制之外,此解決方案還可以設置另一 個限制,以根據類型限制所執行任務的數量。為了解釋這種方法有用的原因,讓我們考慮一種情景,即 任務執行節點部署了兩種類型的任務 T1 和 T2。T1 是綁定 CPU,T2 是綁定磁盤 I/O。在此情景中, 客戶端提交執行 T1 任務請求的吞吐量更有可能受到活動任務(受到相同類型約束的局限)的影響,因 此 T1 任務數量越多,影響越大。因為 T2 任務受不同約束的局限,因為對 T1 任務的影響並不相同。 能夠按類型限制任務執行表明我可以控制在任何特定時間可以運行多少個 T1 任務,以便最大程度利用 CPU 資源,以及總體吞吐量。我將此限制稱為 maxNumberOfTasksByType。
隊列和限制
理解了限制的語義以及限制如何能夠有效地保持任務執行節點的健康操作之後,讓我們看看當達到限制 指定的限值時會發生什麼情況,即實際的限制流程。
一種辦法是引發例外情況。然而,這會影 響解決方案的總體吞吐量,因為客戶端會產生檢查特定錯誤或故障然後重新提交請求的開銷,直至任務 協調器成功處理請求為止。一種替代方法是使用服務器側隊列,暫時不向客戶端發出請求,一個類似監 視器的進程(提交器進程)定期讀取隊列的請求,然後提交給任務執行節點。我將使用提交器流程執行 實際的限制,因為提交器考慮以下的規則來讀取請求隊列:
對可以取消 maxSTasksPerRequest 隊列的請求數量設置上限。
如果達到 maxNumberOfTasks 限制,則停止取消隊列請求,請求隊列將保持現狀。
如果達到 maxNumberOfTasksByType 限制,則取消隊列,然後重新將請求加入請求隊列中。將請求 重新加入隊列可以繼續處理其他類型的任務。這種戰略為隊列中所有任務的執行提供了平等的機會。但 是在某些情況下,您可以考慮使用優先級隊列。您可以在 bit.ly/NF0xQq找到很好的參考材料。
圖 11 闡明了這一過程。
圖 11 提交過程
我將描述此過程的實施,首先顯示 SubmitRequest 服務操作的代碼 (請看 圖 12),此服務操作收到客戶端的請求後,將請求重新加入請求隊列。
圖 12 SubmitRequest 服務操作
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
{
public void SubmitRequest(List<STask> stasks)
{
CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
}
...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
{
var callback =
OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
foreach (var stask in stasks)
_requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
}
...
}
接下來,讓我們看看 提交器流程的實施,如圖 13 所示。
圖 13 提交器實施
public static class CoordinatorContext
{
...
static CoordinatorContext()
{
Submitter(...);
}
private static async void Submitter(int interval)
{
while (true)
{
await Task.Delay(interval);
SendTaskRequestToTaskExecutionNode(
GetTasksFromRequestQ());
}
}
...
}
在 圖 12 和 圖 13中,您可以看到服務操作將請求重新加入(寫入)隊列,然後把提 交器任務從請求隊列中取消隊列(讀取)。 在此情景中,您需要確保底層數據結構——隊列——是線 程安全的。 幸運的是,有一個專為此目的而建的類 ConcurrentQueue<T>。 因此我將使用此類 型的一個實例作為請求的底層存儲庫。
public static class CoordinatorContext
{
...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
new ConcurrentQueue<STaskInfo>();
...
}
現在,讓我們回顧一下 GetTasksFromRequestQ 方法的 實施,此方法在執行間隔期間讀取任務。 正是通過這種方法實施限制流程,並應用我之前描述的限制 。 圖 14 顯示了這種流程的實施。
圖 14 GetTasksFromRequestQ 實施
public static class CoordinatorContext
{
...internal static List<STaskInfo> GetTasksFromRequestQ()
{
var ret = new List<STaskInfo>();
var maxSTasksPerRequest = //From a configuration
var maxNumberOfTasks = //From a configuration
var count = // Count of submitted or executing tasks
var countByType = // Enumerable of count by type
for (int i = 0; i < maxSTasksPerRequest; i++)
{
STaskInfo info;
if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
return ret;
var countTT = // Count of submitted or executing tasks of
// the type of the current item
if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
{ _requestQ.Enqueue(info); }
else ret.Add(info);
}
return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
// Logic to read from a configuration repository the value by task type name
}
...
}
圖 14 中的實施目標 是獲得允許此流程評估限制條件的數量。 圖 15 顯示了可以按照 _submissionTracker 執行的貌似合 理的 LINQ 查詢,以及一個列表(包含返回項目 (ret) 以獲得這些值)。 注意這種方法可以成功,但 會降低性能。 如是這樣,您也可以實施一套線程安全的計數器,計數器遞增或遞減提交跟蹤器實例中 增加或刪除的項目,並使用這些計數器,而不是直接查詢並發字典。
圖 15 限制值
var countByType = (from t in _submissionTracker.Values
group t by t.ClientRequest.STaskTypeName into g
select new
{
TypeName = g.Key,
Count = g.Count()
});
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
where tt.TypeName == info.ClientRequest.STaskTypeName
select tt.Count).SingleOrDefault()+
ret.Where((rt) => rt.ClientRequest.STaskTypeName ==
info.ClientRequest.STaskTypeName)
.Count();
向任務執行節點發送 請求並處理結果
到目前為止,我已經討論任務協調器如何管理請求。 讓我們看看任務協調器如 何向任務執行節點提交請求,現在考慮一下限制流程。 為了提供更好的上下文,讓我們首先回顧一下 任務執行節點公開的服務操作(通過負載平衡器):
[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
public interface ITaskExecutionNode
{
[OperationContract]
void Start(List<STask> stask);
[OperationContract]
void Cancel(string Id);
}
正如 名稱所示,這些操作的目的是開啟一個任務執行請求列表,並請求取消特定任務。 服務約定利用相同 的回調約定,以通過約定的實施,更新任務協調器。
圖 16 顯示了更新的 SendTaskToTaskExecutionNode 方法實施,在此方法中,任務協調器將 STaskInfo 實例保存在 _submissionTracker,並調用任務執行節點中的 Start 服務操作。
圖 16 SendTaskToTaskExecutionNode 方法和支持方法
internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
{
if (staskInfos.Count() == 0)
return;
var channel = new DuplexChannelFactory<ITaskExecutionNode>(
new InstanceContext(new CallbackHandler()),
new NetHttpBinding(), new EndpointAddress(“http://.../”))
.CreateChannel();
try
{
var requestId = Guid.NewGuid().ToString();
var reqs = staskInfos.Select(s => AddRequestToTracker (requestId,s, channel))
.Where(s => s != null);
((IChannel)channel).Open();
channel.Start(reqs.ToList<STask>());
}
catch (CommunicationException ex)
{
foreach (var stask in staskInfos)
HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
}
}
private static STask AddRequestToTracker(string requestId,
STaskInfo info, ITaskExecutionNode channel)
{
info.ExecutionRequestId = requestId;
info.ExecutionRequestChannel = channel;
if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
return info.ClientRequest;
HandleClientUpdate (info.ClientRequest.Id, STaskStatus.Faulted,
new Exception(“Failed to add “));
return null;
}
注:SendTaskToTaskExecutionNode 方法創建一個回調實例,以處理任務執行節點中任 務執行的結果:
[ServiceBehavior (ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler : ITaskUpdateCallback
{
public void UpdateStatus(string id, STaskStatus status, string result)
{
CoordinatorContext.HandleClientUpdate (id, status, result);
}
}
CallbackHandler 通過調用 HandleClientUpdate 方法處理回調操作。 此方法從 submitterTracker 檢索並刪除相應的 STaskInfo 實例,並對客戶端執行回調,以更新結果。 此外,如果這是組中的最後一個請求,則關閉任務協調器 與任務執行節點之間的通道。 圖 17 顯示了 HandleClientUpdate 方法的實施。
圖 17 HandleClientUpdate 方法和支持方法
internal async static void HandleClientUpdate(
string staskId, STaskStatus status, object result)
{
STaskInfo info;
if (! _submissionTracker.TryGetValue(staskId, out info))
throw new Exception(“Could not get task from the tracker”);
try
{
await Task.Run(() =>
info.CallbackChannel.UpdateStatus (info.ClientRequest.Id, status, result));
RemoveComplete (info.ClientRequest.Id);
}
catch(AggregateException ex)
{
// ...
}
}
private static void RemoveComplete(string staskId)
{
STaskInfo info;
if (!_submissionTracker.TryRemove(staskId, out info))
throw new Exception(“Failed to be removed from the tracking collection”);
if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
info.ExecutionRequestId).Count() == 0)
CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
}
private static void CloseTaskRequestChannel(IChannel channel)
{
if (channel != null && channel.State != CommunicationState.Faulted)
channel.Close();
}
任務實施器
在客戶端代碼中, typeName 是在添加請求時必需的一個參數。 這個值最終傳送到任務執行節點。 typeName 的值是一個 接口的實施的類型名稱,此接口公開一個函數委托,此函數委托將一個當作並行任務運行的功能進行封 裝,並駐留在所有任務執行節點。 我將這個接口稱為 IRunnableTask。 此接口的實施器應當從客戶端 接收一個取消令牌作為參數以及一系列參數。 此委托還會返回任務的結果。 這就是該接口:
public interface IRunnableTask
{
Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}
在任務執行節點中啟動任務
總體 而言,任務執行節點負責將任務執行請求轉變成 TPL 可以執行的實際任務,即啟動 TPL 任務。 圖 18 顯示了這種流程的實施,我將在下文討論。
圖 18 啟動任務
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{
public void Start(List<STask> stasks)
{
var callback =
OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
foreach (var t in stasks)
TaskExecutionContext.Start(t,callback);
}
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
try
{
// Step 1.a
var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
// Step 1.b
var rtask = from t in rtasks
where t.Value.GetType().FullName == stask.STaskTypeName
select t.Value;
// Step 2
var cs = new CancellationTokenSource();
var ct = cs.Token;
TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
// Step 3
Task<Object>
.Run(rtask.First().Run(ct, stask.STaskParameters), ct)
.ContinueWith(tes => UpdateStatus(tes, stask, callback));
}
catch (Exception ex)
{
...
}
}
...
}
第 1 步(a 和 b): 在此階段,任務執行節 點需要創建一個 IRunnableTask 實例,此實例將返回一個委托,當作客戶端請求的任務類型來運行。 為此,我利用 MEF 和 .NET Framework 4.5 中的一個新功能,以實現無屬性的配置方法。 圖 19中的 代碼創建一個容器實例,導出“extensions”目錄中 IRunnableTask 的所有實施。欲知 MEF 及無屬性 配置方法的詳情,請參閱 2012 年 6 月 MSDN 雜志文章《無屬性 MEF 配置方法》 msdn.microsoft.com/magazine/jj133818。
圖 19 創建容器
internal static class CompositionUtil
{
private readonly static Lazy<CompositionContainer> _container =
new Lazy<CompositionContainer>(() =>
{
var builder = new RegistrationBuilder();
builder.ForTypesDerivedFrom<IRunnableTask>()
.Export<IRunnableTask>()
.SetCreationPolicy(CreationPolicy.NonShared);
var cat = new DirectoryCatalog(“extensions”, builder);
return new CompositionContainer(cat, true, null);
}
,true);
internal static CompositionContainer ContainerInstance
{
get { return _container.Value; }
}
}
接下來,讓我們回到圖 18 中的代碼。 此代 碼使用容器獲得 IRunnableTask 類型的導出,然後選擇類型名稱匹配客戶端請求的實例。 注:我作出 重要假設,即只有一個任務實例對應客戶端請求的類型。 因此,我使用 LINQ 查詢返回的第一個實例 。
步驟 2: 實際創建 TPL 任務之前,此代碼創建取消令牌源和取消令牌。 我將會在 ConcurrentDictionary<TKey,TValue> 一個實例中記錄取消源。 當客戶端請求取消時,任務執 行節點將使用這個取消源列表。 以下為此實例的定義:
public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string,
CancellationTokenSource> _cancellationSources =
new ConcurrentDictionary<string, CancellationTokenSource>();
...
}
步驟 3: 這時,我用所創建的取消 令牌,運行任務。 此任務之後是一個持續任務。持續任務是必要的,因為一旦完成 TPL 任務(無論成 功或出錯),必須通過服務調用,用執行結果更新任務協調器。 如 圖 20 所示,我將任務協調器的更 新流程封裝到委托中,此委托將 TPL 任務作為參數,任務執行請求及回調實例接收到任務協調器。
圖 20 封裝更新流程
private static Action<Task<Object>, STask,
ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
{
try
{
STaskStatus s;
Object r = null;
switch (t.Status)
{
case TaskStatus.Canceled: s = STaskStatus.Canceled;
break;
case TaskStatus.Faulted:
s = STaskStatus.Faulted;
r = t.Exception.Flatten();
break;
case TaskStatus.RanToCompletion:
s = STaskStatus.Completed;
r = t.Result;
break;
default:
s = STaskStatus.Faulted;
r = new Exception("Invalid Status");
break;
}
CancellationTokenSource cs;
TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
cb.UpdateStatus(st.Id, s, r);
}
catch (Exception ex)
{
// Error handling
}
};
請求和處理取消
TPL 提供了實施任務取消的機制。 為此,此委托 封裝作為 TPL 任務運行的實際流程,需要響應取消請求並終止執行。 欲知任務取消的詳情,請參閱 MSDN 庫文章《任務取消》bit.ly/NYVTO0。
IRunnableTask 接口中的其中一個參數是取 消令牌。 任務執行節點將為每個任務創建一個令牌,由接口的實施器來決定何時檢查取消請求並平穩 終止流程。 圖 21中的代碼顯示了一個簡單任務,此任務計算一個范圍內偶數的數量,同時檢查是否請 求了取消。
圖 21 檢查取消
public class MySimpleCTask : IRunnableTask
{
public Func<Object> Run(Nullable<CancellationToken> ct,
params string[] taskArgs)
{
var j = int.Parse(taskArgs[0]);
var z = 0;
return (() =>
{
for (int i = 0; i < j; i++)
{
if (i % 2 != 0)
{
z++;
ct.Value.ThrowIfCancellationRequested();
}
}
return z;
});
}
}
正如我討論客戶端時所示,您可以添加一個帶有取消令牌的請求,然後客戶端在內部執 行必要的訂閱。 所以當提出取消時,取消請求被發送到任務協調器。 收到取消請求後,任務協調器檢 查此請求是否提交到任務執行節點,然後發送取消請求。 然後任務執行節點尋找對應客戶端 ID 所請 求的任務的取消源。 向任務執行節點提交取消請求相對簡單,您只需找到響應請求的通道,因為任務 協調器最初通過此通道提交任務執行請求。 這些通道需要對回調保持開啟,回調會更新執行請求的狀 態。
圖 22 顯示了在任務協調器中實施服務操作。
圖 22 在任務協調器中實施服務操作
public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
{
return CoordinatorContext.CancelTask(Id);
}
...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
Id, out info) && info.ExecutionRequestChannel != null)
{
info.ExecutionRequestChannel.Cancel(Id);
return true;
}
return false;
}
...
}
最後,圖 23 顯示了在任務執行節點中實施服務操作。
圖 23 在任務執行節點 中實施服務操作
class CancellationHandler : ICancellationHandler
{
public void Cancel(STask stask)
{
TaskExecutionContext.CanceTask(stask.Id);
}
}
public static class TaskExecutionContext
{
...
internal static void CancelTask(string Id)
{
CancellationTokenSource tknSrc;
if (_cancellationSources.TryGetValue(Id, out tknSrc))
tknSrc.Cancel(); }
...
}
任務協調器的擴展能力及其他考慮因素
值得注意的是,此實施假設任務 協調器在單一節點上運行,但是可以擴展任務協調器(這至少需要進行以下的變更):
需要引 入一個負載平衡器,以評估任務協調器。
如上所述,限制方法的關鍵是准確統計正在運行的任務數 量(總數和字節數)。 在一個以上節點作為任務協調器運行的情景中,這些計數器將需要集中維護( 例如在數據庫中),同時仍然能夠以同步方式更新或讀取(避免爭用條件、死鎖等等)。
最後 ,注意如同任何開發方法,需要從風險和數值方面權衡其他可能滿足您需求並且現成可用的備選方法。 例如,您可能想考慮 Microsoft HPC 服務器之類的技術作為貌似合理的解決方案,以應付多種情景, 您可以根據本文描述的方法進行解決。
優化資源
TPL 提供了必要的基礎結構,以便最大 程度優化多核計算機的 CPU 資源使用率,也可用於實施在多個計算機邊界之間進行擴展的方法。 這可 以有助於工作負載自動化和批處理情景,不僅在一台多核服務器中需要並行性,在多台服務器中也需要 並行性。
為了實現這種水平擴展能力,需要考慮多種構架因素。 主要因素是: 需要在現有資 源中平衡負載,同時能夠在現有服務器場添加更多資源,以及根據需要執行的任務的語義來限制資源。 微軟開發工具的技術提供必要的構建基塊,以實施考慮了上述主要因素的構架。
下載代碼示例 :http://archive.msdn.microsoft.com/mag201210TPL