介紹
並發與協調運行時(Concurrency and Coordination Runtime,CCR)是一個.NET平台 上的異步信息傳遞類庫,提供了一套細小而強大的基礎功能,能夠使用不同的方式來組織 應用程序。應用程序通過有效使用CCR可以獲得更好的響應能力,以及更好的伸縮性及容 錯性。而它最神奇的地方則在於,開發人員獲得這些便利的同時,還減少(甚至完全消除 )了對線程、鎖、互斥體(mutex)或其他同步元素的直接操作(或捕獲錯誤)。
如果您的應用程序是單線程的,CCR可以使您的程序提高響應能力,並且更充分利用 CPU核心——同時讓您的代碼庫從概念上保持整潔。如果您的程序已經啟用了多線程,那 麼CCR能夠在簡化您代碼庫的同時,保持(甚至改進)程序的吞吐量。
簡單說來,CCR提供了如下功能:
一個簡單而高性能的消息傳遞實現。使用非常輕量級且類型安全的通道,以面向 Action的視角把各種對象連接了起來。
一套基礎的調度約束。調度幾乎就是CCR的全部。您可以創建各種任務,把消息傳遞給 進程中的其他組件,並且通過一些叫做仲裁器(Arbiter)的對象來聲明一些約束,以此 對請求進行處理。在執行您的代碼之前,CCR能夠保證這些約束已經得到滿足。
為失敗處理提供了一種更好的模型。CCR提供了“Causality”,意味著為一系列相關 異步子任務提供一個上下文,這樣某個任務失敗時(例如拋出了異常),那麼這樣的失敗 能在單獨的地方進行隔離處理,這與發起任務的線程並沒有關系。
更好的使用已有的(或將來會有的)計算能力。CCR能夠使用現有的線程池進行調度, 如果您需要的話,也可以使用它自帶的實現,這在某些情況下能夠帶來更好的性能。自然 ,這個機制只會對您的代碼產生細微的影響。
使異步IO操作得以簡化。提高獨立進程的伸縮性與性能的關鍵,最終往往歸結於高效 的I/O密集型操作。I/O密集型操作往往會比計算密集型操作要慢許多倍,而一個阻塞的 I/O操作會浪費有效的資源(在這裡就是線程),使它們無法處理其他等待的任務。使用 異步的I/O操作能夠釋放這些資源,讓它們去處理其他任務,直到I/O操作完成。然而,一 系列的異步操作需要將其“開始”及“完成”分開,這使編碼難度變得很大。CCR使用特 別的C#迭代器實現,使開發人員能夠輕松控制此類操作。
通過“異步消息傳遞”機制,我們的組件通過發送數據的方式與另一個組件進行通信 ,更值得一提的是,數據與後續回復沒有確定的臨時關系。一個已發送的消息可能需要在 未來某個時候才會得到處理,也只有到那個時候消息才會得到回復。
這種異步消息傳遞模型是大部分情況下跨進程(inter-process)計算的基礎,不過與 之相比,在現實應用中使用CCR來進行純粹的進程內部(intra-process)通信往往可以得 到更好的保證,不像前者在很多情況下都可能失敗。因此,CCR不僅可以用於低級I/O操作 ,對於構建伸縮性強的分布式系統也可以提供很好的輔助。
CCR的基礎類型
CCR由幾種基礎類型組成:
任務(Task):任意一段用於執行的代碼。
任務隊列(TaskQueue):確切地說,是“分發隊列(DispatcherQueue)”。一個待 執行任務的列表。一個CLR線程池或CCR線程池(即Dispatcher)的線程會從隊列中取出任 務並加以執行。
端口(Port):進程內部連接各組件的消息隊列。簡單來說這只是個鏈表(linked- list),生成者在端口內放置消息。CCR提供了泛型的重載,可以提供類型安全的操作。
仲裁器(Arbiters):CCR的基礎元素,它們將端口和任務連接起來。它們定義了一些 約束,用於在消息到達端口後確定需要創建的任務,以及使用哪個任務隊列進行接受。 CCR中內置了多種仲裁器,其中大部分可以進行組合,以此獲得更靈活的功能。
了解了這些基本概念之後,我們來看一些簡單的CCR代碼。首先,我們來定義一個簡單 的C#控制台應用程序,它會用來運行所有的示例。注意在這個程序中,我們使用了CCR自 定義的線程池(Dispatcher)並與我們的任務隊列綁定。這意味著隊列中任務會被自定義 線程池中的線程執行。
static void Main(string[] args)
{
using (var dr = new Dispatcher())
{
using (var taskQueue = new DispatcherQueue("samples", dr))
{
// Examples will go here...
// Need a blocking call to prevent the application
// exiting.
Console.ReadLine();
}
}
}
盡管示例中只使用了一個任務隊列,但是在實際應用中還是建議使用多個隊列。CCR在 獲取任務時會使用輪詢策略來訪問多個任務隊列,避免任何一個隊列處於饑餓狀態。
首先,我們直接把一個任務放入隊列。這是CCR中執行一個任務最簡單的方法,我們這 裡連端口都沒有用到。Arbiter類包含了一系列簡化開發的方法,例如FromHandler直接從 一個委托對象來創建任務——在這裡我們使用匿名方法來構建該對象。這樣任務就被放入 了任務隊列,可以由分發器來執行了。
// Enqueue a task directly
taskQueue.Enqueue(Arbiter.FromHandler(() =>
Console.WriteLine("Hello, world.")));
大多數情況下我們不太會如此直接地向隊列中放入任務,一般來說總有一個端口在工 作。在下一段代碼中,我們會定義一個端口,一個仲裁器,然後向端口裡發送消息。這個 示例中我們使用String強類型的端口,這樣委托的簽名也需要接受一個字符串。
// Post a message to a port to schedule a task.
var port = new Port<string>();
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
port.Post("Hello (again), world");
這裡發生了一些不那麼簡單的東西,需要花一點時間才能了解整個過程。 port.Receive()調用創建了一個可能是最簡的仲裁器,或者說是一個“接受者”,一個消 息達到端口時它即會生效。消息到達之後,便會創建一個任務,這個任務的功能是調用委 托對象,並使用剛才的消息作為參數。 Arbiter.Activate()調用將創建的任務和特定的 任務隊列綁定在一起。
要理解CCR仲裁器,最關鍵的一點是它們永遠不會阻塞線程。一旦接收器無法獲得數據 時,線程就會被釋放,可用於處理其他正在等待的任務。
仲裁器可以在任何時間創建,這是CCR中一個重要的概念。因此,如下所示,即使我們 把上面示例中最後兩行代碼的次序交換,其效果也是一樣的。
// Post a message to a port to schedule a task.
var port = new Port();
port.Post("Hello (again), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
現在我們來少許修改一下示例——我們向端口中放入兩條消息再使接收器生效,來看 看會發生什麼……
// Post a message to a port to schedule a task.
var port = new Port();
port.Post("Hello (again), world");
port.Post("Hello (thrice), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
現在如果您運行程序,就會發現只打印了一條消息。這是因為port.Receive()調用是 一個擴展方法,它簡化了以下語法,但是並不完全相等:
Arbiter.Activate(
taskQueue,
Arbiter.Receive(false, port, Console.WriteLine));
這裡最為關鍵的是傳遞給Arbiter.Receive()調用的第一個(Boolean)參數。它表明 這個接收器是臨時的,處理完一條消息後就會拋棄。如果我們希望處理所有達到該端口的 消息,我們可以將這個參數設為true。
// Post a message to a port to schedule a task.
var port = new Port();
port.Post("Hello (again), world");
port.Post("Hello (thrice), world");
Arbiter.Activate(
taskQueue,
Arbiter.Receive(true, port, Console.WriteLine));
上面的代碼有時候會打印出奇怪的結果——兩行內容的順序不一致了。這究竟是怎麼 回事呢?
在CCR中,一旦某個仲裁器(這裡是個“接受”操作)被滿足之後,它會創建一個任務 來處理相關消息。除非這個仲裁器被嵌套在另一個更大的組合內,這些任務都會被放入任 務隊列中等待執行。在我們上面的示例中,這個持久地接受器會立即分別為兩條消息產生 一個任務。當存在可用線程的時候,這兩個任務將並發被處理,因此並不保證兩者的順序 。
CCR線程池的實現與CLR線程池有幾點不同。最重要的一點是它包含固定數量的線程 ,這在創建時便確定下來。如果線程執行的操作不會阻塞,那麼就不會有什麼問題。但是 如果您必須發起阻塞的請求,那還是使用CLR線程池對任務隊列進行調度為好,因為它能 夠動態的增長和收縮。這樣的任務隊列可以使用 DispatcherQueue默認構造函數來創建。
有幾種辦法可以保證消息的順序。可能最簡單的方法就是在循環中使用一個臨時接收 器,這樣就能一次只處理一條消息。幸運的是,CCR包含了一個叫做迭代式任務 (Iterative Task)的強大的機制,可以讓我們較為自然的實現這個要求。這需要使用C# 迭代器功能,我們來看一個示例:
首先,我們將目前的Arbiter.Activate替換成以下調用:
Arbiter.Activate(
taskQueue,
new Arbiter<Arbiter<string>>(port, ProcessMessages));
這段代碼建立了一個名為ProcessMessages的迭代式任務,定義如下:
static IEnumerator<ITask> ProcessMessages(Port<string> port)
{
while (true)
yield return port.Receive(Console.WriteLine);
}
這個方法為一個無限循環,等待(但不阻塞)接受操作以獲得滿足。接受到消息時委 托將被調用,並繼續循環。如果我們希望在端口接受到一個空字符串時跳出循環,我們可 以編寫如下代碼(請注意我們使用了Lambda表達式構建了一個匿名委托來處理消息):
static IEnumerator> ProcessMessages(Port port)
{
bool fDone = false;
while (!fDone)
{
yield return port.Receive(message =>
{
if (String.IsNullOrEmpty(message))
fDone = true;
else
Console.WriteLine(message);
});
}
Console.WriteLine("Finished");
}
迭代器是CCR工具箱中非常強大的工具——它大大簡化了順序調用的異步操作的編碼工 作,使非阻塞操作的使用非常接近於同步調用方式。例如,一個級別高的任務可以返回如 下的ProcessMessages():
static IEnumerator<ITask> TopLevelTask(Port<static> port)
{
// Yield to the nested task
yield return new IterativeTask
Console.WriteLine("Finished nested task.");
}
到目前為止,我們只看到了簡單接受器的使用——當單個消息到達單個端口時仲裁器 便會安排一個任務。那麼現在就可以來看一下更高級的仲裁方式了——其提供了一種黏著 劑,可以將接受器進行嵌套以建立更強大的功能。
“選擇器(Choice)”是其中最常用的功能之一,它會從多個接受器選擇一個,並且 只選擇其中一個接受器來處理。例如,以下迭代任務會在處理前等待一個字符串或一個信 號。
static IEnumerator<ITask> ProcessChoice(PortSet<string, EmptyValue> portSet)
{
bool fDone = false;
while (!fDone)
{
yield return portSet.Choice(
message => Console.WriteLine(message),
signal => fDone = true);
}
Console.WriteLine("Finished");
}
選擇器一般用於確定一個異步操作的成功或者失敗,不過您也可以用它來選擇一個或 任意數量的選擇器。
端口集(PortSet)是對一個或多個獨立端口的包裝,使它們能作為一個整體來接受消 息。一個典型的示例便是CCR中的SuccessFailurePort,它繼承了PortSet。
另一個常用的仲裁器是級聯(Join)。它會在兩個內嵌的接受器都得到滿足的情況下 被激活。下面的示例便演示了這種方式:
var port1 = new Port<int>();
var port2 = new Port<int>();
port1.Post(1);
port2.Post(3);
Arbiter.Activate(
taskQueue,
Arbiter.JoinedReceive(false, port1, port2, (x, y) =>
{
Console.WriteLine(x + y);
}));
在資源有限的情況下,級聯可以非常有效地控制訪問。第一個端口包含了對資源的請 求,另一個則包含了有效的資源。使用級聯之後,我們可以限制請求只在有空閒資源的時 候才進行處理。另一個高級別的仲裁方式是“交織(Interleave)”。這在概念上與讀寫 鎖(read-write lock)較為接近,只能在非阻塞的異步語法中使用。讀取任務能夠與其 他讀取任務同時運行,但是寫入任務(它比讀取的優先級高)只能在沒有其他任務執行的 情況下進行。以下是這種仲裁器的聲明,它用於保護某種概念上的“緩存”:
var updatePort = new Port<UpdateCache>();
var queryPort = new Port<QueryCache>();
var stopPort = new Port<Shutdown>();
var interleave = new Interleave(
new TeardownReceiverGroup(
Arbiter.Receive(false, stopPort, ClearDownCache)),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, updatePort, OnUpdateCache)),
new ConcurrentReceiverGroup(
Arbiter.Receive(true, queryPort, OnQueryCache)));
Arbiter.Activate(taskQueue, interleave);
在這裡,持久接受器被放入合適的組中,這便開始了“交織”仲裁。任何屬於 ConcurrentReceiverGroup()的接受器能夠讓關聯的任務之間並發執行。相反, ExclusiveReceiverGroup中的接受器只能獨立於其他接受器執行。此外對於放入該組的接 受器,我們可以限制它們完全按照消息傳遞的順序來執行任務。任何屬於 TeardownReceiverGroup組中的接受器會在關閉“交織”仲裁時,也就是在最後被調用— —因此這樣的接受器不能是持久化的。
“交織”仲裁使用輪詢的方式對各接受器進行較為公平的調度。此外關於執行的順序 ,即使在ExclusiveReceiverGroup內部也是各端口獨立的。對於發送至相互無關的端口的 兩條消息,它們的執行順序並不確保與它們的到達順序相同。
之前提到過,CCR迭代任務能讓我們用接近於普通阻塞式同步操作的方式,來編寫邏輯 上是順序執行的非阻塞異步操作。這樣的異步操作一般為I/O密集型操作,可能是一個Web 請求,一個數據庫操作,或基礎文件I/O等。由於現在我們可以更好地控制這些操作,現 在編寫這種異步I/O操作變得愈發簡單,並且能有效地提高應用程序的吞吐量和伸縮性。
在APM世界中連接BeginXXX和EndXXX的重要模式為AsyncCallback委托,它的形式是:
public delegate void AsyncCallback(IAsyncResult ar);
CCR便基於此,Port<IAsyncResult> 的Post操作與之正好吻合。使用這種這種 方式,我們可以用如下的迭代任務進行異步文件復制:
static IEnumerator<ITask> Copy(FileStream source, FileStream target)
{
var buffer = new byte byte[128 * 1024];
var port = new Port
var bytesRead = 0;
do
{
source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
yield return Arbiter.Receive(
false, port, iar => bytesRead = source.EndRead (iar));
target.BeginWrite(buffer, 0, bytesRead, port.Post, null);
yield return Arbiter.Receive(
false, port, iar => target.EndWrite(iar));
} while (bytesRead > 0);
}
從根本上說,這些異步操作在完成時都會向我們的端口中傳遞一個IAsyncResult對象 。在這裡,我們會等待端口的接受操作得到滿足,這意味著異步操作已經完成,可以接著 下一步繼續開始。這完全是一個異步實現(我們完全可能使用少量的線程來執行數千個這 樣的操作),但是我們這些代碼的意圖可謂一目了然 ——我們讀取一塊數據,再寫入一 塊數據,反復不斷直到完成。
為了使文件復制的示例盡可能保持簡單,上面的代碼省略了處理異常的代碼,不過強 壯的代碼必須能夠處理讀寫操作可能引發的異常。在這裡無法運用普通的 try/catch,因 為這種異常處理方式具有線程線程相關性,而CCR任務可能被運行於任何可用的線程之上 ——事實上對於這種迭代任務,任務的每個“步驟”都可能與之前在不同的線程上執行。
在CCR中處理異常有兩種基本的方式。第一種是顯式地在每個操作中將錯誤進行捕獲, 並由端口傳播出去。然而這會讓調用者和被調用者雙方造成可觀的代碼膨脹。如果要在前 例中進行顯式地錯誤處理,文件訪問方式就會發生根本性的改變。下面的代碼展示了文件 讀取方面需要進行的改變:
static IEnumerator<ITask> Copy(
FileStream source, FileStream target,
SuccessFailurePort resultPort)
{
var buffer = new byte [128 * 1024];
var port = new Port<IAsyncResult>();
var bytesRead = 0;
do
{
// Deal with a failure on the BeginRead
try
{
source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
}
catch (Exception e)
{
resultPort.Post(e);
yield break;
}
// Deal with a failure on the EndRead
yield return Arbiter .Receive(
false, port, iar =>
{
try
{
bytesRead = source.EndRead(iar);
}
catch (Exception e)
{
resultPort.Post(e);
}
});
// Stop processing if there is a failure.
if (bytesRead == 0)
yield break;
// And so on for the write...
} while (bytesRead > 0);
resultPort.Post(new SuccessResult ());
}
很顯然,這個方式十分笨重,而且非常容易出錯——這不是代碼中應該使用的做法, 我們要把錯誤處理路徑獨立出來。
值得慶幸的是,CCR為錯誤處理提供了一種干淨而強大的支持,這種機制被稱作為“因 果關系(Causality)”。它不僅讓我們的代碼從顯式處理異常的繁雜中釋放出來,還在 聲明後繼續保持原有任務的執行路徑。這使我們能夠建立統一的錯誤處理代碼來應對任意 復雜的異步操作。
建立Causality的典型方式是在初始化分發器之後為它的異常端口附加一個委托對象, 以此來通知CCR處理異常的方式。
var exceptionPort = new Port<Exception>();
Arbiter.Activate(taskQueue, exceptionPort.Receive(e =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Caught " + e.ToString());
Console.ResetColor();
}));
Dispatcher.AddCausality(new Causality("Example", exceptionPort));
這樣,我們可以用普通方式創建任務或傳遞消息了,Causality會捕獲任何任務在執行 過程中產生的異常,並由Causality交給異常端口,並在相關的委托對象上執行。
結論
CCR使您的應用程序能夠用它提供的這些方式來表現,這包括一些基礎的數據依賴關系 ,在運行時調度數據以使用有效的CPU核。這些作法直接將您從顯式的線程和鎖控制中釋 放開來,同時讓您的應用程序能夠完全利用日益強大的多核運算資源。