PLINQ可以簡化對一個序列或一個組中所有成員應用同一個函數的過程,這個過程稱之為規約操作。類似Sum()函數就是一個規約操作。PLINQ提供一個可重載Aggregate的接口,這裡用戶可以定義自己的規約函數。
規約操作是對每一個成員進行的操作,當操作完成後有可能需要將操作結果進行匯總得到一個最終的結果,這個就是聚合的概念。
示例中要求計算 1 到 50000000中能被5整除的數除以PI以後得到的平均數。它可以用LINQ完成,也可以用PLINQ完成。
代碼示例:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plinq_calculate
{
class Program
{
static int NUM_INTS = 50000000;
static IEnumerable GenerateInputeData()
{
return Enumerable.Range(1, NUM_INTS);
}
static ParallelQuery GenerateInputeData4Parallel()
{
return ParallelEnumerable.Range(1, NUM_INTS);
}
static void Main(string[] args)
{
var seqTarget = GenerateInputeData();
Console.WriteLine("============================================================");
Console.WriteLine("TEST NORMAL LINQ");
Console.WriteLine("============================================================");
var swatchpn = Stopwatch.StartNew();
var seqQuery = (from intNum in seqTarget
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
swatchpn.Stop();
Console.WriteLine("LINQ Result: " + seqQuery + " LINQ Use Time: {0}", swatchpn.Elapsed);
var palTarget = GenerateInputeData4Parallel();
Console.WriteLine("\n\n");
Console.WriteLine("============================================================");
Console.WriteLine("TEST PARALLEL LINQ");
Console.WriteLine("============================================================");
var swatchp = Stopwatch.StartNew();
var palQuery = (from intNum in palTarget.AsParallel()
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
swatchp.Stop();
Console.WriteLine("PLINQ Result: " + palQuery + " LINQ Use Time: {0}", swatchp.Elapsed);
Console.ReadLine();
}
}
}
測試結果:
代碼示例會計算一個數組的標准偏差,偏度,和峰度來說明聚合的使用。
順便補補數學吧:
關於Aggregate 函數的參數說明參考
https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx
關於參數的簡單說明:
seed:是累加器初始化的值。 update accumulator function:對數組中每一個值進行運算,PLINQ中由於它是對數據源進行了分區然後並行運算的,這一步產生的結果其實是保存的每一個分區的計算結果。 combine accumulator function:將每一分區的計算結果進行累加,得到一個總的數組的累加結果。 result selector:對累加結果進行運算,得到最終的結果,也就是返回值。示例的重點並不是各種數字運算,而是說明Aggregate() 可以對數據源每一個元素運算後將結果進行匯總再次運算,它可以在一個步驟中完成,省去了分別編寫的麻煩。而且它對數據運算時是數據分區,任務並行的。<喎?http://www.Bkjia.com/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwPs/Cw+bKx7zGy+O1xLT6wuvKvsD9o7o8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plink_aggregate
{
class Program
{
static void Main(string[] args)
{
int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30};
var mean = inputInts.AsParallel().Average();
var standarddeviation = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1)))
);
var skewness = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2))
);
var kurtosis = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
);
Console.WriteLine("============================================================");
Console.WriteLine("TEST Parallel LINQ Calculate Result");
Console.WriteLine("============================================================");
Console.WriteLine("Mean : {0}", mean);
Console.WriteLine("Standard Deviaton : {0}", standarddeviation);
Console.WriteLine("Skewness : {0}", skewness);
Console.WriteLine("Kurtosis : {0}", kurtosis);
Console.ReadLine();
}
}
}
PLINQ同樣也可以和其他形式的並發任務一起使用。例如在計算 標准偏差,偏度和峰度的過程中。
實際的執行順序是 平均值 => 標准偏差 => 偏度 => 峰度
但根據運算的公式,完全可以把偏度和峰度進行並行化處理的。標准差是他們公共的輸入。
平均值 => 標准偏差 => 偏度
=> 峰度
它們完全可以使用ContinueWith操作,如果有超時控制或取消需要的話,可以使用WithCancellation() 接口。
代碼示例:
代碼中用函數將 PLINQ 的操作又進行了封裝,然後用Task的方式進行並行化的調用。deferredCancelTask 是一個搗亂任務,如果把注釋打開,在2秒時它會發出一個Cancel信號,取消任務的執行,並且在異常處理時打印任務的狀態。
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_4_parallel_task_with_plinq
{
class Program
{
private static ParallelQuery inputInts =
ParallelEnumerable.Range(1, 100000000);
private static double CalculateMean(System.Threading.CancellationToken ct)
{
return inputInts.AsParallel().WithCancellation(ct).Average();
}
private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
);
}
private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
);
}
private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
{
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
);
}
static void Main(string[] args)
{
Console.WriteLine("============================================================");
Console.WriteLine("TEST Parallel TASK work with PLINQ");
Console.WriteLine("============================================================");
var cts = new System.Threading.CancellationTokenSource();
var ct = cts.Token;
var TaskMean = new Task(()=> CalculateMean(ct), ct);
var TaskSTDev = TaskMean.ContinueWith((t) => { return CalculateStandardDeviation(ct, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
var TaskSkewness = TaskSTDev.ContinueWith((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
var TaskKurtosis = TaskSTDev.ContinueWith((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
TaskContinuationOptions.OnlyOnRanToCompletion);
//var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});
try
{
TaskMean.Start();
Task.WaitAll(TaskSkewness, TaskKurtosis);
Console.WriteLine("Mean : {0}", TaskMean.Result);
Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);
}
catch(AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
//Console.WriteLine(ex.ToString());
if (ex is TaskCanceledException)
{
Console.WriteLine("Mean Task: {0}", TaskMean.Status);
Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);
}
}
}
Console.ReadLine();
}
}
}