示例中要求計算 1 到 50000000中能被5整除的數除以PI以後得到的平均數。它可以用LINQ完成,也可以用PLINQ完成。
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plinq_calculate
class Program
static int NUM_INTS = 50000000;
static IEnumerable GenerateInputeData()
return Enumerable.Range(1, NUM_INTS);
static ParallelQuery GenerateInputeData4Parallel()
return ParallelEnumerable.Range(1, NUM_INTS);
static void Main(string[] args)
var seqTarget = GenerateInputeData();
Console.WriteLine("TEST NORMAL LINQ");
var swatchpn = Stopwatch.StartNew();
var seqQuery = (from intNum in seqTarget
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
Console.WriteLine("LINQ Result: " + seqQuery + " LINQ Use Time: {0}", swatchpn.Elapsed);
var palTarget = GenerateInputeData4Parallel();
Console.WriteLine("TEST PARALLEL LINQ");
var swatchp = Stopwatch.StartNew();
var palQuery = (from intNum in palTarget.AsParallel()
where ((intNum % 5) == 0)
select (intNum / Math.PI)).Average();
Console.WriteLine("PLINQ Result: " + palQuery + " LINQ Use Time: {0}", swatchp.Elapsed);
關於Aggregate 函數的參數說明參考
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_2_plink_aggregate
class Program
static void Main(string[] args)
int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30};
var mean = inputInts.AsParallel().Average();
var standarddeviation = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1)))
var skewness = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2))
var kurtosis = inputInts.AsParallel().Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
Console.WriteLine("TEST Parallel LINQ Calculate Result");
Console.WriteLine("Mean : {0}", mean);
Console.WriteLine("Standard Deviaton : {0}", standarddeviation);
Console.WriteLine("Skewness : {0}", skewness);
Console.WriteLine("Kurtosis : {0}", kurtosis);
PLINQ同樣也可以和其他形式的並發任務一起使用。例如在計算 標准偏差,偏度和峰度的過程中。
實際的執行順序是 平均值 => 標准偏差 => 偏度 => 峰度
平均值 => 標准偏差 => 偏度
=> 峰度
它們完全可以使用ContinueWith操作,如果有超時控制或取消需要的話,可以使用WithCancellation() 接口。
代碼中用函數將 PLINQ 的操作又進行了封裝,然後用Task的方式進行並行化的調用。deferredCancelTask 是一個搗亂任務,如果把注釋打開,在2秒時它會發出一個Cancel信號,取消任務的執行,並且在異常處理時打印任務的狀態。
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;
namespace Sample6_4_parallel_task_with_plinq
class Program
private static ParallelQuery inputInts =
ParallelEnumerable.Range(1, 100000000);
private static double CalculateMean(System.Threading.CancellationToken ct)
return inputInts.AsParallel().WithCancellation(ct).Average();
private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
return inputInts.AsParallel().WithCancellation(ct).Aggregate(
0d, // seed
// update accumulator function
// An accumulator function to be invoked on each element in a partition
(subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),
// combine accumulator function
// An accumulator function to be invoked on the yielded accumulator result from each partition.
(total, thisTask) => total + thisTask,
// result selector
// A function to transform the final accumulator value into the result value.
(finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
(3 * Math.Pow((inputInts.Count() - 2), 2)) /
((inputInts.Count() - 2) * (inputInts.Count() - 3))
static void Main(string[] args)
Console.WriteLine("TEST Parallel TASK work with PLINQ");
var cts = new System.Threading.CancellationTokenSource();
var ct = cts.Token;
var TaskMean = new Task(()=> CalculateMean(ct), ct);
var TaskSTDev = TaskMean.ContinueWith((t) => { return CalculateStandardDeviation(ct, t.Result); },
var TaskSkewness = TaskSTDev.ContinueWith((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
var TaskKurtosis = TaskSTDev.ContinueWith((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
//var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});
Task.WaitAll(TaskSkewness, TaskKurtosis);
Console.WriteLine("Mean : {0}", TaskMean.Result);
Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);
catch(AggregateException aex)
foreach (var ex in aex.InnerExceptions)
if (ex is TaskCanceledException)
Console.WriteLine("Mean Task: {0}", TaskMean.Status);
Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);