程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> C++實現一個多線程同步方式的協同工作程序示例,工作程序示例

C++實現一個多線程同步方式的協同工作程序示例,工作程序示例

編輯:C++入門知識

C++實現一個多線程同步方式的協同工作程序示例,工作程序示例


多線程並發程序與協同程序其實是不同的概念。多線程並發是多個執行序同時運行,而協同程序是多個執行序列相互協作,同一時刻只有一個執行序列。今天想到的是將兩者結合起來,拿現實生活中的例子來說,假設一個班級有100個學生,一個老師要批改100個學生的作業,有時老師太忙或者趕時間會叫幾個同學幫忙批改,等所有同學都批改完後都交到老師手中,老師在下次上課的時候將作業本一起發給班上的學生。。。。其實在並發編程的時候也可以借鑒這一個思想和模式,特別是網絡服務器開發的過程中,並發與協同經常出現,於是今天寫了一個簡單的程序模擬了這種情形,當然這個程序本身並沒有任何意義,只是記錄下這種思想,個人一直都覺得,程序開發中,思想是最為重要的,用什麼語言來實現只是表現上不同,今天記錄下來,日後的開發過程中,在適當地方以此思想為基礎,根據項目需要進行拓展!

  1 //--------------------------------------------------------------
  2   開發工具:Visual Studio 2012
  3 //---------------------------------------------------------------
  4 //C++
  5 #include <iostream>
  6 #include <memory>
  7 #include <thread>
  8 #include <mutex>
  9 #include <condition_variable>
 10 #include <queue>
 11 #include <vector>
 12 
 13 using namespace std;
 14 
 15 //windows
 16 #include <windows.h>
 17 
 18 
 19 /************************************************
 20     [示例]實現一個多線程方式下的協同工作程序
 21 
 22     當一個線程(相對的主線程)在完成一個任務的時
 23     候,有時候為了提高效率,可以充分利用多核CPU的
 24     優勢可以將手中的任務分成多個部分,分發給比較
 25     空閒的輔助線程來幫助處理,並且主線程要等待所
 26     有的輔助線程都處理完成後,對所有任務進行一次
 27     匯總,才能進行下一步操作,此時就需要一個同步的
 28     多線程協同工作類。
 29 *************************************************/
 30 
 31 
 32 //定義一個求累積和的任務類
 33 class CSumTask
 34 {
 35 public:
 36     CSumTask(double dStart,double dEnd);
 37     ~CSumTask();
 38     double DoTask();
 39     double GetResult();
 40 private:
 41     double m_dMin;
 42     double m_dMax;
 43     double m_dResult;
 44 };
 45 
 46 CSumTask::CSumTask(double dStart,double dEnd):m_dMin(dStart),m_dMax(dEnd),m_dResult(0.0)
 47 {
 48 
 49 }
 50 CSumTask::~CSumTask()
 51 {
 52 
 53 }
 54 double CSumTask::DoTask()
 55 {
 56     
 57     for(double dNum = m_dMin;dNum <= m_dMax;++dNum)
 58     {
 59         m_dResult += dNum;
 60     }
 61     return m_dResult;
 62 }
 63 
 64 double CSumTask::GetResult()
 65 {
 66     return m_dResult;
 67 }
 68 
 69 
 70 //定義一個任務管理者
 71 class CTaskManager
 72 {
 73 public:
 74     CTaskManager();
 75     ~CTaskManager();
 76     size_t Size();
 77     void AddTask(const std::shared_ptr<CSumTask> TaskPtr);
 78     std::shared_ptr<CSumTask> PopTask();
 79 protected:
 80     std::queue<std::shared_ptr<CSumTask>> m_queTask;
 81 };
 82 
 83 CTaskManager::CTaskManager()
 84 {
 85 
 86 }
 87 
 88 CTaskManager::~CTaskManager()
 89 {
 90 
 91 }
 92 
 93 size_t CTaskManager::Size()
 94 {
 95     return m_queTask.size();
 96 }
 97 
 98 void CTaskManager::AddTask(const std::shared_ptr<CSumTask> TaskPtr)
 99 {
100     m_queTask.push(std::move(TaskPtr));
101 }
102 
103 std::shared_ptr<CSumTask> CTaskManager::PopTask()
104 {
105     std::shared_ptr<CSumTask> tmPtr = m_queTask.front();
106     m_queTask.pop();
107     return tmPtr;
108 }
109 
110 
111 //協同工作線程管理類,負責創建協同工作線程並接受來自主線程委托的任務進行處理
112 class CWorkThreadManager
113 {
114 public:
115     CWorkThreadManager(unsigned int uiThreadSum );
116     ~CWorkThreadManager();
117     bool AcceptTask(std::shared_ptr<CSumTask> TaskPtr);
118     bool StopAll(bool bStop);
119     unsigned int ThreadNum();
120 protected:
121     std::queue<std::shared_ptr<CSumTask>> m_queTask;
122     std::mutex m_muTask;
123     int m_iWorkingThread;
124     int m_iWorkThreadSum;
125     std::vector<std::shared_ptr<std::thread>> m_vecWorkers;
126 
127     void WorkThread(int iWorkerID);
128     bool m_bStop;
129     std::condition_variable_any m_condPop;
130     std::condition_variable_any m_stopVar;
131 };
132 
133 CWorkThreadManager::~CWorkThreadManager()
134 {
135 
136 }
137 unsigned int CWorkThreadManager::ThreadNum()
138 {
139     return m_iWorkThreadSum;
140 }
141 
142 CWorkThreadManager::CWorkThreadManager(unsigned int uiThreadSum ):m_bStop(false),m_iWorkingThread(0),m_iWorkThreadSum(uiThreadSum)
143 {
144     //創建工作線程
145     for(int i = 0; i < m_iWorkThreadSum;++i)
146     {
147         std::shared_ptr<std::thread> WorkPtr(new std::thread(&CWorkThreadManager::WorkThread,this,i+1)); 
148         m_vecWorkers.push_back(WorkPtr);
149     }
150     
151 }
152 
153 bool CWorkThreadManager::AcceptTask(std::shared_ptr<CSumTask> TaskPtr)
154 {
155     std::unique_lock<std::mutex>    muLock(m_muTask);
156     if(m_iWorkingThread >= m_iWorkThreadSum)
157     {
158         return false;            //當前已沒有多余的空閒的線程處理任務
159     }
160     m_queTask.push(TaskPtr);
161     m_condPop.notify_all();
162     return true;
163 }
164 
165  void CWorkThreadManager::WorkThread(int iWorkerID)
166  {
167      while(!m_bStop)
168      {
169          std::shared_ptr<CSumTask> TaskPtr;
170          bool bDoTask = false;
171          {
172             std::unique_lock<std::mutex>    muLock(m_muTask);
173             while(m_queTask.empty() && !m_bStop)
174             {
175                 m_condPop.wait(m_muTask);
176             }
177             if(!m_queTask.empty())
178             {
179                 TaskPtr = m_queTask.front();
180                 m_queTask.pop();
181                 m_iWorkingThread++;
182                 bDoTask = true;
183             }
184              
185          }
186         //處理任務
187          if(bDoTask)
188          {
189              TaskPtr->DoTask();
190              {
191                  std::unique_lock<std::mutex>    muLock(m_muTask);
192                  m_iWorkingThread--;
193                  cout<<">>>DoTask in thread ["<<iWorkerID<<"]\n";
194              }
195          }
196          m_stopVar.notify_all();
197      }
198  }
199 
200  bool CWorkThreadManager::StopAll(bool bStop)
201  {
202      {
203          std::unique_lock<std::mutex>    muLock(m_muTask);
204          while(m_queTask.size()>0 || m_iWorkingThread>0)
205          {
206              m_stopVar.wait(m_muTask);
207              cout<<">>>Waiting finish....\n";
208          }
209         cout<<">>>All task finished!\n";
210         
211      }
212 
213      m_bStop = true;
214      m_condPop.notify_all();
215      //等待所有線程關閉
216      for(std::vector<std::shared_ptr<std::thread>>::iterator itTask = m_vecWorkers.begin();itTask != m_vecWorkers.end();++itTask)
217      {
218         (*itTask)->join();
219      }
220      return true;
221  }
222 
223 
224  /**************************************
225   [示例程序說明]
226 
227       每個任務對象表示求1+2+....+1000的累
228   積和,現在有2000個這樣的任務,需要將每個
229   任務進行計算,然後將所有的結果匯總求和。
230       利用多線程協同工作類對象輔助完成每
231   個任務結果計算,主線程等待所有結果完成
232   後將所有結果匯總求和。
233  ****************************************/
234 
235 
236 int main(int arg,char *arv[])
237 {
238 
239     std::cout.sync_with_stdio(true);
240     CTaskManager TaskMgr;
241     CWorkThreadManager WorkerMgr(5);
242     std::vector<std::shared_ptr<CSumTask>> vecResultTask;
243 
244     for(int i = 0; i < 2000; ++i)
245     {
246         std::shared_ptr<CSumTask> TaskPtr(new CSumTask(1.0,1000.0));
247         TaskMgr.AddTask(TaskPtr);
248         vecResultTask.push_back(TaskPtr);
249     }
250 
251     //
252     DWORD dStartTime = ::GetTickCount();
253     while(TaskMgr.Size()>0)
254     {
255         std::shared_ptr<CSumTask> WorkPtr = TaskMgr.PopTask();
256         if(!WorkerMgr.AcceptTask(WorkPtr))
257         {
258             //輔助線程此刻處於忙碌狀態(沒有空閒幫忙),自己處理該任務
259             WorkPtr->DoTask();
260             cout<<">>>DoTask in thread [0]\n";
261         }
262     }
263     WorkerMgr.StopAll(true);                    //等待所有的任務完成
264 
265     //對所有結果求和
266     double dSumResult = 0.0;
267     for(std::vector<std::shared_ptr<CSumTask>>::iterator itTask = vecResultTask.begin();itTask != vecResultTask.end();++itTask)
268     {
269         dSumResult += (*itTask)->GetResult();
270     }
271 
272     DWORD dEndTime = ::GetTickCount();
273     cout<<"\n[Status]"<<endl;
274     cout<<"\tEvery task result:"<<vecResultTask[0]->GetResult()<<endl;
275     cout<<"\tTask num:"<<vecResultTask.size()<<endl;
276     cout<<"\tAll result sum:"<<dSumResult;
277     cout<<"\tCast to int,result:"<<static_cast<long long>(dSumResult)<<endl;
278     cout<<"\tWorkthread num:"<<WorkerMgr.ThreadNum()<<endl;
279     cout<<"\tTime of used:"<<dEndTime-dStartTime<<" ms"<<endl;
280     getchar();
281     return 0;
282 }

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved