今天我們就來談談平行擴展的關鍵組件之一PLINQ(Parallel LINQ)。微軟對PLINQ在Parallel FX中的定位是:PLINQ是TPL(Task Parallel Library)的一個高層應用。由於目前微軟對TPL研發的時間還比較短,這個社區預覽版的TPL版本的質量還是比較低的,而且微軟發布這個版本的目的也是為了更好的獲得開發社區的反饋信息,為了讓PLINQ有更高的質量,所以目前PLINQ還是基於ThreadPool的實現,而不是基於TPL的API的。不過這只是內部實現不同而已,以後正式發布的時候PLINQ的對外接口的變更應該不會太大。
如何使用PLINQ?
1 添加System.Threading.dll到引用中
2 通過調用System.Linq.ParallelQuery.AsParallel擴展方法,將數據封裝到IParallelEnumerable中。
基於聲明方式的數據並行性
調用AsParallel擴展方法可使得編譯器使用System.Linq.ParallelEnumerable版本的查詢運算符,而不是System.Linq.Enumerable的。熟悉LINQ的人都知道,查詢表達式在編譯時都將轉化成對擴展方法的調用。對於LINQ而言,所有的擴展方法都封裝在System.Linq.Enumerable靜態類中,該類定義的都是針對IEnumerable數據源的擴展方法。而對於PLINQ,所有的擴展方法都封裝在System.Linq.ParallelEnumerable靜態類中,而且該類針對的都是IParallelEnumerable數據源的擴展方法,是System.Linq.Enumerable靜態類擴展方法的鏡像,只不過是通過並行方式對查詢進行評估。IParallelEnumerable接口從IEnumerable接口繼承,所以PLINQ也具有LINQ的延遲執行的特點,以及執行foreach。
為了讓大家清晰知道系統是如何將使用System.Linq.Enumerable版本的查詢運算符變成System.Linq.ParallelEnumerable版本的,我們先來看看System.Linq.ParallelQuery.AsParallel方法:
public static IParallelEnumerable AsParallel(IEnumerable source)
很顯然這個方法就是將IEnumerable的數據源轉化成IParallelEnumerable以使得使用平行版本的運算。這就是平行架構中通過AsParallel的聲明來使用並行使用數據的方式,也是PLINQ的編程模型。
所以這種基於聲明方式的數據並行性使得從LINQ到PLINQ的轉化非常容易,例如我們有這樣的LINQ代碼片段:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words select Process(word)).ToArray();
我們很容易就可以將其變成PLINQ版本:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words.AsParallel() select Process(word)).ToArray();
當然,如果你是通過查詢操作(就是直接調用靜態擴展函數),而不是使用查詢表達式(有時候查詢表達式沒有提供相應的表達式語句,例如C#3.0中沒有提供Skip和Take相對應的查詢表達式語句,我們只能通過直接調用查詢操作函數)的情況下,將LINQ遷移到PLINQ,我們除了要調用AsParallel方法,還需要將直接調用Enumerable的方法改成對ParallelEnumerable的調用,例如:
IEnumerable data = ...;
var q = Enumerable.Select(Enumerable.OrderBy(
Enumerable.Where(data, (x) => p(x)),(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
要使用 PLINQ,必須按如下方式重新編寫該查詢:
IEnumerable data = ...;
var q = ParallelEnumerable.Select(ParallelEnumerable.OrderBy(
ParallelEnumerable.Where(data.AsParallel(), (x) => p(x)),
(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
注意:有些查詢運算符是二元的,使用兩個IEnumerable作為輸入參數(例如Join),最左邊數據源的類型決定了使用LINQ還是PLINQ,因此你只需要在第一個數據源上調用AsParallel便能使查詢並行查詢,例如:
IEnumerable leftData = ..., rightData = ...;
var q = from x in leftData.AsParallel()
join y in rightData on x.a == y.b
select f(x, y);
PLINQ查詢處理模型
1 管道式(Pipelined Processing)
該模型是將查詢線程(運行查詢的線程)和枚舉線程(進行迭代輸出結果的線程)分開,處理器在有元素可用時就運行枚舉將輸出應用於foreach循環。也就是說不需要等到所有查詢結果完成才進行枚舉輸出,只要查詢結果能產生一個最終輸出結果時就會進行枚舉輸出。簡單說就是邊查詢邊輸出,這個模型的好處就是允許對輸出做更多增量處理,從而減少為了存放結果所需的內存,壞處是由於中間結果需要更多的同步而降低性能。PLINQ缺省的是采用此模型。
2 准動態(stop-and-go Processing)
在這種模型下啟動枚舉的線程會聯結所有其他線程來執行查詢。在所有查詢結果完成之後才進行枚舉輸出。這種模型的效率比管道式稍微高一些,因為這種模型需要同步的系統開銷減少了。在對查詢進行ToArray,ToList或者排序聚合操作時,系統將自動轉為這種模型處理。因為這些操作都需要產生所有輸出。具體在代碼中是通過調用IParallelEnumerable接口的GetEnumerator的重載方法並且傳遞false參數來使用這種模型的,該方法如下:
IEnumerable GetEnumerator(bool usePipelining)
3 反轉枚舉(Inverted Enumeration)
該模型會為並行運行的PLINQ提供一個Lambda表達式,集合中的每個元素都運行一次。這是最高效的一種模型,因為它將高成本運算的控制反轉給Lambda函數了。但注意的是在Lambda函數中不能使用共享狀態,否則可能會導致系統崩潰,因為PLINQ不知道如何進行並發同步控制。但有些不同的是,此模型不能簡單使用foreach循環,而必須使用特殊的ForAll API.例如:
string[] words = new[] { "Welcome", "to", "Beijing","OK","Hua","Ying","Ni" ,"2008"};
var lazyBeeQuery = from word in words.AsParallel() select word;
lazyBeeQuery.ForAll<string>(word => { Console.WriteLine(word); });
在我的機器上(雙核)的輸出結果是:
Hua
Welcome
Ying
Ni
2008
to
Beijing
OK
細心的人可能會發現其順序和數組的順序不同,這就是PLINQ並行運行的結果,可能在您的機器上可能結果又不同。
同時AsParallel重載方法提供一個參數來控制查詢的並行度(就是多少個線程被用於查詢),該方法定義如下:
public static IParallelEnumerable AsParallel(
IEnumerable source,int degreeOfParallelism)
如果你希望在使用管道式處理時有一個單獨的線程專門用於枚舉輸出,你可以將degreeOfParallelism參數賦值為(Enviorment.ProcessCount-1)即可。
輸出結果順序
由於並行的原因輸出結果可能和原有的數據在數據源中的順序不一樣,例如:
string[] words = new[] { "Welcome", "to", "Beijing","OK","Hua","Ying","Ni" ,"2008"};
var lazyBeeQuery = from word in words.AsParallel() select word;
foreach (string word in lazyBeeQuery)
{
Console.WriteLine(word);
}
這時的輸出結果可能是:
Welcome
Hua
to
Ying
Beijing
Ni
OK
2008
如果我們希望輸出結果和原有的數據在數據源中的順序保持一致,可以使用AsParallel的帶有ParallelQueryOptions.PreserveOrdering參數的重載版本,例如上例中就可以改成如下就可以使輸出順序和原有結構一致:
var lazyBeeQuery = from word in words.AsParallel(ParallelQueryOptions.PreserveOrdering) select word;
注意:1 ParallelQueryOptions.PreserveOrdering參數的使用對ForAll API不起作用(目前是這樣,以後不知道是否會做改動)。
2 使用這個保留順序的選項會影響查詢的性能和擴展能力,因為PLINQ將從邏輯上在末尾增加一個排序操作,而排序是一個無法隨處理器數量的增加而顯著提高性能的運算符,所以要在必須的時候才用。
並發異常
在順序執行LINQ的時候,任何異常都會停止後續查詢的運行。但在PLINQ中,由於是並行運行的,某一線程產生了異常,系統會嘗試盡快終止其他線程的運行,在所有線程關閉之後,產生的所有異常將會放到System.Threading.AggregateException中,你可以通過InnerExceptions屬性來得到所有異常只讀集合ReadOnlyCollection。