程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> Parallel Programming-實現並行操作的流水線(生產者、消費者),parallelprogramming

Parallel Programming-實現並行操作的流水線(生產者、消費者),parallelprogramming

編輯:C#入門知識

Parallel Programming-實現並行操作的流水線(生產者、消費者),parallelprogramming


本文介紹如何使用C#實現並行執行的流水線(生產者消費者):

 

一、流水線示意圖

上圖演示了流水線,action1接收input,然後產生結果保存在buffer1中,action2讀取buffer1中由action1產生的數據,以此類推指導action4完成產生Output。

以上也是典型的生產者消費者模式。

上面的模式如果使用普通常規的串行執行是很簡單的,按部就班按照流程圖一步一步執行即可。如果為了提高效率,想使用並行執行,也就是說生產者和消費者同時並行執行,該怎麼辦麼?

二、實現並行流水線

2.1 代碼

class PiplelineDemo
    {
        private int seed;
        public PiplelineDemo()
        {
            seed = 10;
        }

        public void Action1(BlockingCollection<string> output)
        {
            try
            {
                for (var i = 0; i < seed; i++)
                {
                    output.Add(i.ToString());//initialize data to buffer1
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Action2(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            try
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    var itemToInt = int.Parse(item);
                    output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Action3(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            try
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    output.Add(item);//set data into buffer3
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Pipeline()
        {
            var buffer1 = new BlockingCollection<string>(seed);
            var buffer2 = new BlockingCollection<string>(seed);
            var buffer3 = new BlockingCollection<string>(seed);
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var stage1 = taskFactory.StartNew(() => Action1(buffer1));
            var stage2 = taskFactory.StartNew(() => Action2(buffer1, buffer2));
            var stage3 = taskFactory.StartNew(() => Action3(buffer2, buffer3));

            Task.WaitAll(stage1, stage2, stage3);
            foreach(var item in buffer3.GetConsumingEnumerable())//print data in buffer3
            {
                Console.WriteLine(item);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            new PiplelineDemo().Pipeline();
            Console.Read();
        }
    }

2.2 運行結果

預期打印出了0-9自我相乘的結果。

2.3 代碼解釋

代碼本身的邏輯和本文開始的流程圖是一一對應的。

BlockingCollection<T>是.Net裡面的一個線程安全集合。實現了IProducerConsumerCollection<T>.

GetConsumingEnumberable是一個非常強大的東東,專門寫一片文章介紹介紹。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved