多線程並發程序與協同程序其實是不同的概念。多線程並發是多個執行序同時運行,而協同程序是多個執行序列相互協作,同一時刻只有一個執行序列。今天想到的是將兩者結合起來,拿現實生活中的例子來說,假設一個班級有100個學生,一個老師要批改100個學生的作業,有時老師太忙或者趕時間會叫幾個同學幫忙批改,等所有同學都批改完後都交到老師手中,老師在下次上課的時候將作業本一起發給班上的學生。。。。其實在並發編程的時候也可以借鑒這一個思想和模式,特別是網絡服務器開發的過程中,並發與協同經常出現,於是今天寫了一個簡單的程序模擬了這種情形,當然這個程序本身並沒有任何意義,只是記錄下這種思想,個人一直都覺得,程序開發中,思想是最為重要的,用什麼語言來實現只是表現上不同,今天記錄下來,日後的開發過程中,在適當地方以此思想為基礎,根據項目需要進行拓展!
1 //-------------------------------------------------------------- 2 開發工具:Visual Studio 2012 3 //--------------------------------------------------------------- 4 //C++ 5 #include <iostream> 6 #include <memory> 7 #include <thread> 8 #include <mutex> 9 #include <condition_variable> 10 #include <queue> 11 #include <vector> 12 13 using namespace std; 14 15 //windows 16 #include <windows.h> 17 18 19 /************************************************ 20 [示例]實現一個多線程方式下的協同工作程序 21 22 當一個線程(相對的主線程)在完成一個任務的時 23 候,有時候為了提高效率,可以充分利用多核CPU的 24 優勢可以將手中的任務分成多個部分,分發給比較 25 空閒的輔助線程來幫助處理,並且主線程要等待所 26 有的輔助線程都處理完成後,對所有任務進行一次 27 匯總,才能進行下一步操作,此時就需要一個同步的 28 多線程協同工作類。 29 *************************************************/ 30 31 32 //定義一個求累積和的任務類 33 class CSumTask 34 { 35 public: 36 CSumTask(double dStart,double dEnd); 37 ~CSumTask(); 38 double DoTask(); 39 double GetResult(); 40 private: 41 double m_dMin; 42 double m_dMax; 43 double m_dResult; 44 }; 45 46 CSumTask::CSumTask(double dStart,double dEnd):m_dMin(dStart),m_dMax(dEnd),m_dResult(0.0) 47 { 48 49 } 50 CSumTask::~CSumTask() 51 { 52 53 } 54 double CSumTask::DoTask() 55 { 56 57 for(double dNum = m_dMin;dNum <= m_dMax;++dNum) 58 { 59 m_dResult += dNum; 60 } 61 return m_dResult; 62 } 63 64 double CSumTask::GetResult() 65 { 66 return m_dResult; 67 } 68 69 70 //定義一個任務管理者 71 class CTaskManager 72 { 73 public: 74 CTaskManager(); 75 ~CTaskManager(); 76 size_t Size(); 77 void AddTask(const std::shared_ptr<CSumTask> TaskPtr); 78 std::shared_ptr<CSumTask> PopTask(); 79 protected: 80 std::queue<std::shared_ptr<CSumTask>> m_queTask; 81 }; 82 83 CTaskManager::CTaskManager() 84 { 85 86 } 87 88 CTaskManager::~CTaskManager() 89 { 90 91 } 92 93 size_t CTaskManager::Size() 94 { 95 return m_queTask.size(); 96 } 97 98 void CTaskManager::AddTask(const std::shared_ptr<CSumTask> TaskPtr) 99 { 100 m_queTask.push(std::move(TaskPtr)); 101 } 102 103 std::shared_ptr<CSumTask> CTaskManager::PopTask() 104 { 105 std::shared_ptr<CSumTask> tmPtr = m_queTask.front(); 106 m_queTask.pop(); 107 return tmPtr; 108 } 109 110 111 //協同工作線程管理類,負責創建協同工作線程並接受來自主線程委托的任務進行處理 112 class CWorkThreadManager 113 { 114 public: 115 CWorkThreadManager(unsigned int uiThreadSum ); 116 ~CWorkThreadManager(); 117 bool AcceptTask(std::shared_ptr<CSumTask> TaskPtr); 118 bool StopAll(bool bStop); 119 unsigned int ThreadNum(); 120 protected: 121 std::queue<std::shared_ptr<CSumTask>> m_queTask; 122 std::mutex m_muTask; 123 int m_iWorkingThread; 124 int m_iWorkThreadSum; 125 std::vector<std::shared_ptr<std::thread>> m_vecWorkers; 126 127 void WorkThread(int iWorkerID); 128 bool m_bStop; 129 std::condition_variable_any m_condPop; 130 std::condition_variable_any m_stopVar; 131 }; 132 133 CWorkThreadManager::~CWorkThreadManager() 134 { 135 136 } 137 unsigned int CWorkThreadManager::ThreadNum() 138 { 139 return m_iWorkThreadSum; 140 } 141 142 CWorkThreadManager::CWorkThreadManager(unsigned int uiThreadSum ):m_bStop(false),m_iWorkingThread(0),m_iWorkThreadSum(uiThreadSum) 143 { 144 //創建工作線程 145 for(int i = 0; i < m_iWorkThreadSum;++i) 146 { 147 std::shared_ptr<std::thread> WorkPtr(new std::thread(&CWorkThreadManager::WorkThread,this,i+1)); 148 m_vecWorkers.push_back(WorkPtr); 149 } 150 151 } 152 153 bool CWorkThreadManager::AcceptTask(std::shared_ptr<CSumTask> TaskPtr) 154 { 155 std::unique_lock<std::mutex> muLock(m_muTask); 156 if(m_iWorkingThread >= m_iWorkThreadSum) 157 { 158 return false; //當前已沒有多余的空閒的線程處理任務 159 } 160 m_queTask.push(TaskPtr); 161 m_condPop.notify_all(); 162 return true; 163 } 164 165 void CWorkThreadManager::WorkThread(int iWorkerID) 166 { 167 while(!m_bStop) 168 { 169 std::shared_ptr<CSumTask> TaskPtr; 170 bool bDoTask = false; 171 { 172 std::unique_lock<std::mutex> muLock(m_muTask); 173 while(m_queTask.empty() && !m_bStop) 174 { 175 m_condPop.wait(m_muTask); 176 } 177 if(!m_queTask.empty()) 178 { 179 TaskPtr = m_queTask.front(); 180 m_queTask.pop(); 181 m_iWorkingThread++; 182 bDoTask = true; 183 } 184 185 } 186 //處理任務 187 if(bDoTask) 188 { 189 TaskPtr->DoTask(); 190 { 191 std::unique_lock<std::mutex> muLock(m_muTask); 192 m_iWorkingThread--; 193 cout<<">>>DoTask in thread ["<<iWorkerID<<"]\n"; 194 } 195 } 196 m_stopVar.notify_all(); 197 } 198 } 199 200 bool CWorkThreadManager::StopAll(bool bStop) 201 { 202 { 203 std::unique_lock<std::mutex> muLock(m_muTask); 204 while(m_queTask.size()>0 || m_iWorkingThread>0) 205 { 206 m_stopVar.wait(m_muTask); 207 cout<<">>>Waiting finish....\n"; 208 } 209 cout<<">>>All task finished!\n"; 210 211 } 212 213 m_bStop = true; 214 m_condPop.notify_all(); 215 //等待所有線程關閉 216 for(std::vector<std::shared_ptr<std::thread>>::iterator itTask = m_vecWorkers.begin();itTask != m_vecWorkers.end();++itTask) 217 { 218 (*itTask)->join(); 219 } 220 return true; 221 } 222 223 224 /************************************** 225 [示例程序說明] 226 227 每個任務對象表示求1+2+....+1000的累 228 積和,現在有2000個這樣的任務,需要將每個 229 任務進行計算,然後將所有的結果匯總求和。 230 利用多線程協同工作類對象輔助完成每 231 個任務結果計算,主線程等待所有結果完成 232 後將所有結果匯總求和。 233 ****************************************/ 234 235 236 int main(int arg,char *arv[]) 237 { 238 239 std::cout.sync_with_stdio(true); 240 CTaskManager TaskMgr; 241 CWorkThreadManager WorkerMgr(5); 242 std::vector<std::shared_ptr<CSumTask>> vecResultTask; 243 244 for(int i = 0; i < 2000; ++i) 245 { 246 std::shared_ptr<CSumTask> TaskPtr(new CSumTask(1.0,1000.0)); 247 TaskMgr.AddTask(TaskPtr); 248 vecResultTask.push_back(TaskPtr); 249 } 250 251 // 252 DWORD dStartTime = ::GetTickCount(); 253 while(TaskMgr.Size()>0) 254 { 255 std::shared_ptr<CSumTask> WorkPtr = TaskMgr.PopTask(); 256 if(!WorkerMgr.AcceptTask(WorkPtr)) 257 { 258 //輔助線程此刻處於忙碌狀態(沒有空閒幫忙),自己處理該任務 259 WorkPtr->DoTask(); 260 cout<<">>>DoTask in thread [0]\n"; 261 } 262 } 263 WorkerMgr.StopAll(true); //等待所有的任務完成 264 265 //對所有結果求和 266 double dSumResult = 0.0; 267 for(std::vector<std::shared_ptr<CSumTask>>::iterator itTask = vecResultTask.begin();itTask != vecResultTask.end();++itTask) 268 { 269 dSumResult += (*itTask)->GetResult(); 270 } 271 272 DWORD dEndTime = ::GetTickCount(); 273 cout<<"\n[Status]"<<endl; 274 cout<<"\tEvery task result:"<<vecResultTask[0]->GetResult()<<endl; 275 cout<<"\tTask num:"<<vecResultTask.size()<<endl; 276 cout<<"\tAll result sum:"<<dSumResult; 277 cout<<"\tCast to int,result:"<<static_cast<long long>(dSumResult)<<endl; 278 cout<<"\tWorkthread num:"<<WorkerMgr.ThreadNum()<<endl; 279 cout<<"\tTime of used:"<<dEndTime-dStartTime<<" ms"<<endl; 280 getchar(); 281 return 0; 282 }