在上一篇文章演示了並行的流水線操作(生產者和消費者並行同時執行),C#是通過BlockingCollection這個線程安全的對象作為Buffer,並且結合Task來實現的。但是上一篇文章有個缺陷,在整個流水線上,生產者和消費者是唯一的。本文將演示多個消費者多個生產者同時並行執行。
與前一篇文章演示的流水線思想類似,不同之處就是本文的topic:消費者和生產者有多個,以buffer1為例,起生產者有兩個,消費者有兩個,現在有三個緯度的並行:
class PiplelineDemo { private int seed; public PiplelineDemo() { seed = 10; } public void Action11(BlockingCollection<string> output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action12(BlockingCollection<string> output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action21(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action22(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action31(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Action32(BlockingCollection<string> input, BlockingCollection<string> output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Pipeline() { var buffer1 = new BlockingCollection<string>(seed * 2); var buffer2 = new BlockingCollection<string>(seed * 2); var buffer3 = new BlockingCollection<string>(seed * 2); var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var stage11 = taskFactory.StartNew(() => Action11(buffer1)); var stage12 = taskFactory.StartNew(() => Action12(buffer1)); Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) => { buffer1.CompleteAdding(); }); var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2)); var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2)); Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) => { buffer2.CompleteAdding(); }); var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3)); var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3)); Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) => { buffer3.CompleteAdding(); }); Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32); foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3 { Console.WriteLine(item); } } }