在面對相互獨立的數據或者相互獨立的任務時,也許正是Parallel登場的時候。
比如說有一個盒子的集合,分別讓盒子旋轉一定的角度。
void RotateBox(IEnumerable<Box> boxes, float degree){Parallel.ForEach(boxes, box => box.Rotate(degree));}
如果並行任務中的一個任務出現異常,需要結束出問題的任務呢?
Parallel.ForEach為我們提供了一個重載方法,可以控制任務是否繼續。
void RotateBoxes(IEnumerable<Box> boxes){Parallel.ForEach(boxes, (box, state) => {if(!box.IsInvertible){state.Stop();}else{box.Invert();}});}
如果想取消整個並行任務呢?
Parallel.ForEach也為我們提供一個重載方法,可以接收一個ParallelOption的形參,通過設置ParallelOption的CancellationToken屬性來取消整個並行過程。
void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token){Paralle.ForEach(boxes,new ParallelOptions{CancellationToken=token},box => box.Rotate(degrees));}
在使用的時候,一般先定義個全局CancellationTokenSource類型的變量。
static CancellationTokenSource token = new CancellationTokenSource();
然後,在某個並行任務中設置取消。
token.Cancel();
最後,再把這個token賦值給以上方法的CancellationToken屬性。
各個並行任務如何共享狀態,共享一個變量呢?
int InvertBoxes(IEnumerable<Box> boxes){object mutex = new object();//用來鎖int nonInvertibleCount = 0; //各任務共享的變量Paralle.ForEach(boxes, box =>{if(box.IsInvertible){box.Invert();}else{lock(mutex){++nonInvertibleCount;}}});return nonInvertibleCount;}
可見,對於各並行線程共享的變量,需要加一個線程鎖,以防止多個線程同時操作共享變量。
另外,Parallel.ForEach提供了一個重載,其中localFinally形參接收一個委托類型,通過該委托讓並行任務共享一個變量。比如:
static int ParallelSum(IEnumerable<int> values){object mutex = new object();int result = 0;Parallel.ForEach(source: values,LocalInit: () => 0,body: (item, state, localVlaue) => localValue + item,localFinally: localValue => {lock(mutex){result += localValue;}});return result;}
當然,也別忘了PLINQ也支持並行:
static int ParalleSum(IEnumerable<int> values){return values.AsParallel().Sum();}
PLINQ的Aggregate方法也可實現:
static int ParallelSum(IEnumerable<int> values){return values.AsParallel().Aggregate(seed: 0,func: (sum, item) => sum + item;);}
以上,是對相互獨立數據的處理。
那麼,如何處理相互獨立的多個任務呢?
通過Parallel.Invoke方法可以實現。
static voiD ProcessArray(double[] array){Parallel.Invoke(() => ProcessPartialArray(array, 0, array.Length/2),() => ProcessPartialArray(array, array.Length/2, array.Length));}static void ProcessPartialArray(double[] array, int begin, int end){}
使用Parallel.Invoke方法還可以讓一個Action或這方法執行多次。
static void Do20(Action action){//讓某個Action執行20次Action[] actions = Enumerable.Repeat(action, 20).ToArray();Parallel.Invoke(actions);}
Parallel.Invoke方法也提供了重載,接收ParallelOption類型的形參,用來控制取消整個並行過程。
static void Do20(Action action){//讓某個Action執行20次Action[] actions = Enumerable.Repeat(action, 20).ToArray();Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);}
參考資料:C#並發編程經典實例