C#並行編程-PLINQ:聲明式數據並行
背景
通過LINQ可以方便的查詢並處理不同的數據源,使用Parallel LINQ (PLINQ)來充分獲得並行化所帶來的優勢。
PLINQ不僅實現了完整的LINQ操作符,而且還添加了一些用於執行並行的操作符,與對應的LINQ相比,通過PLINQ可以獲得明顯的加速,但是具體的加速效果還要取決於具體的場景,不過在並行化的情況下一段會加速。
如果一個查詢涉及到大量的計算和內存密集型操作,而且順序並不重要,那麼加速會非常明顯,然而,如果順序很重要,那麼加速就會受到影響。
AsParallel() 啟用查詢的並行化
下面貼代碼,看下效果,詳情見注釋:
復制代碼
class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條數據 可以修改數據量查看Linq和Plinq的性能*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
/*采用LINQ查詢符合條件的數據*/
Stopwatch sw = new Stopwatch();
sw.Restart();
var productListLinq = from product in products
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用Linq 查詢得出數量為:{0}", productListLinq.Count());
sw.Stop();
Console.WriteLine("采用Linq 耗時:{0}", sw.ElapsedMilliseconds);
/*采用PLINQ查詢符合條件的數據*/
sw.Restart();
var productListPLinq = from product in products.AsParallel() /*AsParallel 試圖利用運行時所有可用的邏輯內核,從而使運行的速度比串行的版本要快 但是需要注意開銷所帶來的性能損耗*/
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用PLinq 查詢得出數量為:{0}", productListPLinq.Count());
sw.Stop();
Console.WriteLine("采用PLinq 耗時:{0}", sw.ElapsedMilliseconds);
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}
復制代碼
當前模擬的數據量比較少,數據量越多,采用並行化查詢的效果越明顯
AsOrdered()與orderby
AsOrdered:保留查詢的結果按源序列排序,在並行查詢中,多條數據會被分在多個區域中進行查詢,查詢後再將多個區的數據結果合並到一個結果集中並按源序列順序返回。
orderby:將返回的結果集按指定順序進行排序
下面貼代碼方便大家理解:
class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<string> products = new ConcurrentQueue<string>();
products.Enqueue("E");
products.Enqueue("F");
products.Enqueue("B");
products.Enqueue("G");
products.Enqueue("A");
products.Enqueue("C");
products.Enqueue("SS");
products.Enqueue("D");
/*不采用並行化 其數據輸出結果 不做任何處理 */
var productListLinq = from product in products
where (product.Length == 1)
select product;
string appendStr = string.Empty;
foreach (string str in productListLinq)
{
appendStr += str + " ";
}
Console.WriteLine("不采用並行化 輸出:{0}", appendStr);
/*不采用任何排序策略 其數據輸出結果 是直接將分區數據結果合並起來 不做任何處理 */
var productListPLinq = from product in products.AsParallel()
where (product.Length == 1)
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq)
{
appendStr += str + " ";
}
Console.WriteLine("不采用AsOrdered 輸出:{0}", appendStr);
/*采用 AsOrdered 排序策略 其數據輸出結果 是直接將分區數據結果合並起來 並按原始數據順序排序*/
var productListPLinq1 = from product in products.AsParallel().AsOrdered()
where (product.Length == 1)
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq1)
{
appendStr += str + " ";
}
Console.WriteLine("采用AsOrdered 輸出:{0}", appendStr);
/*采用 orderby 排序策略 其數據輸出結果 是直接將分區數據結果合並起來 並按orderby要求進行排序*/
var productListPLinq2 = from product in products.AsParallel()
where (product.Length == 1)
orderby product
select product;
appendStr = string.Empty;
foreach (string str in productListPLinq2)
{
appendStr += str + " ";
}
Console.WriteLine("采用orderby 輸出:{0}", appendStr);
Console.ReadLine();
}
}
在PLINQ查詢中,AsOrdered()和orderby子句都會降低運行速度,所以如果順序並不是必須的,那麼在請求特定順序的結果之前,將加速效果與串行執行的性能進行比較是非常重要的。
在PLINQ查詢中,AsOrdered()和orderby子句都會降低運行速度,所以如果順序並不是必須的,那麼在請求特定順序的結果之前,將加速效果與串行執行的性能進行比較是非常重要的。
指定執行模式 WithExecutionMode
對串行化代碼進行並行化,會帶來一定的額外開銷,Plinq查詢執行並行化也是如此,在默認情況下,執行PLINQ查詢的時候,.NET機制會盡量避免高開銷的並行化算法,這些算法有可能會將執行的性能降低到地獄串行執行的性能。
.NET會根據查詢的形態做出決策,並不開了數據集大小和委托執行的時間,不過也可以強制並行執行,而不用考慮執行引擎分析的結果,可以調用WithExecutionMode方法來進行設置。、
下面貼代碼,方便大家理解
復制代碼
class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條數據*/
Parallel.For(0, 6000000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
/*采用並行化整個查詢 查詢符合條件的數據*/
Stopwatch sw = new Stopwatch();
sw.Restart();
var productListLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用並行化整個查詢 查詢得出數量為:{0}", productListLinq.Count());
sw.Stop();
Console.WriteLine("采用並行化整個查詢 耗時:{0}", sw.ElapsedMilliseconds);
/*采用默認設置 由.NET進行決策 查詢符合條件的數據*/
sw.Restart();
var productListPLinq = from product in products.AsParallel().WithExecutionMode(ParallelExecutionMode.Default)
where (product.Name.Contains("1") && product.Name.Contains("2") && product.Category.Contains("1") && product.Category.Contains("2"))
select product;
Console.WriteLine("采用默認設置 由.NET進行決策 查詢得出數量為:{0}", productListPLinq.Count());
sw.Stop();
Console.WriteLine("采用默認設置 由.NET進行決策 耗時:{0}", sw.ElapsedMilliseconds);
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}通過PLINQ執行歸約操作
PLINQ可以簡化對一個序列或者一個組中所有成員應用一個函數的過程,這個過程稱之為歸約操作,如在PLINQ查詢中使用類似於Average,Max,Min,Sum之類的聚合函數就可以充分利用並行所帶來好處。
並行執行的規約和串行執行的規約的執行結果可能會不同,因為在操作不能同時滿足可交換和可傳遞的情況下產生攝入,在每次執行的時候,序列或組中的元素在不同並行任務中分布可能也會有區別,因而在這種操作的情況下可能會產生不同的最終結果,因此,一定要通過對於的串行版本來興義原始的數據源,這樣才能幫助PLINQ獲得最優的執行結果。
下面貼代碼:
復制代碼
class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<int> products = new ConcurrentQueue<int>();
/*向集合中添加多條數據*/
Parallel.For(0, 6000000, (num) =>
{
products.Enqueue(num);
});
/*采用LINQ 返回 IEumerable<int>*/
var productListLinq = (from product in products
select product).Average();
Console.WriteLine("采用Average計算平均值:{0}", productListLinq);
/*采用PLINQ 返回 ParallelQuery<int>*/
var productListPLinq = (from product in products.AsParallel()
select product).Average();
Console.WriteLine("采用Average計算平均值:{0}", productListPLinq);
Console.ReadLine();
}
}
復制代碼
如上述代碼所示
在LINQ版本中,該方法會返回一個 IEumerable<int>,即調用 Eumerable.Range方法生成指定范圍整數序列的結果,
在PLINQ版本中,該方法會返回一個 ParallelQuery<int>,即調用並行版本中System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通過這種方法得到的結果序列也是並行序列,可以再PLINQ中並行運行。
如果想對特定數據源進行LINQ查詢時,可以定義為 private IEquatable<int> products
如果想對特定數據源進行PLINQ查詢時,可以定義為 private ParallelQuery<int> products
並發PLINQ任務
復制代碼
class MRESDemo
{
/*code:釋迦苦僧*/
static void Main()
{
ConcurrentQueue<Product> products = new ConcurrentQueue<Product>();
/*向集合中添加多條數據*/
Parallel.For(0, 600000, (num) =>
{
products.Enqueue(new Product() { Category = "Category" + num, Name = "Name" + num, SellPrice = num });
});
CancellationTokenSource cts = new CancellationTokenSource();
/*創建tk1 任務 查詢 符合 條件的數據*/
Task<ParallelQuery<Product>> tk1 = new Task<ParallelQuery<Product>>((ct) =>
{
Console.WriteLine("開始執行 tk1 任務", products.Count);
Console.WriteLine("tk1 任務中 數據結果集數量為:{0}", products.Count);
var result = products.AsParallel().Where(p => p.Name.Contains("1") && p.Name.Contains("2"));
return result;
}, cts.Token);
/*創建tk2 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/
Task<ParallelQuery<Product>> tk2 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
Console.WriteLine("開始執行 tk2 任務", products.Count);
Console.WriteLine("tk2 任務中 數據結果集數量為:{0}", tk.Result.Count());
var result = tk.Result.Where(p => p.Category.Contains("1") && p.Category.Contains("2"));
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
/*創建tk3 任務,在執行tk1任務完成 基於tk1的結果查詢 符合 條件的數據*/
Task<ParallelQuery<Product>> tk3 = tk1.ContinueWith<ParallelQuery<Product>>((tk) =>
{
Console.WriteLine("開始執行 tk3 任務", products.Count);
Console.WriteLine("tk3 任務中 數據結果集數量為:{0}", tk.Result.Count());
var result = tk.Result.Where(p => p.SellPrice > 1111 && p.SellPrice < 222222);
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
tk1.Start();
Task.WaitAll(tk1, tk2, tk3);
Console.WriteLine("tk2任務結果輸出,篩選後記錄總數為:{0}", tk2.Result.Count());
Console.WriteLine("tk3任務結果輸出,篩選後記錄總數為:{0}", tk3.Result.Count());
tk1.Dispose();
tk2.Dispose();
tk3.Dispose();
cts.Dispose();
Console.ReadLine();
}
}
class Product
{
public string Name { get; set; }
public string Category { get; set; }
public int SellPrice { get; set; }
}