最近打算做一個服務器端程序,每來一個客戶端請求新開一個線程進行處理。在網上查了一些資料後,准備使用線程池來做這個東西。使用C++11新的庫處理想線程問題比以前簡單了許多,在網上找到一份線程池的實現,http://blog.csdn.net/goldenhawking/article/details/7919547 這個線程池實現中,每一個線程都維護一個任務隊列,我覺得這麼做不利於任務調度,有可能某個線程的待執行任務列表很長,而其他線程則在休眠。
下面是我自己實現的一個線程池,任務隊列由線程池類來分配,每個線程只有拿到任務的時候才執行,其他時間是阻塞的。另外,如果沒有任務需要執行,那麼分配任務的線程也會阻塞,直到來了新任務。 代碼在VS2013下測試通過。
本人菜鳥一枚,歡迎指出錯誤。
threadInstance.h
1 #pragma once 2 3 #include <iostream> 4 #include <thread> 5 #include <mutex> 6 #include <functional> 7 #include <list> 8 #include <atomic> 9 #include <vector> 10 #include <algorithm> 11 #include <memory> 12 #include <condition_variable> 13 14 15 class ThreadPool; 16 17 18 //thread class 19 class ThreadInstance 20 { 21 public: 22 ThreadInstance(ThreadPool *theadPool) 23 :m_stop(false), m_pthread(nullptr), m_threadPool(theadPool) 24 { 25 26 } 27 virtual ~ThreadInstance() 28 { 29 if (m_pthread != nullptr) 30 { 31 m_pthread->join(); 32 delete m_pthread; 33 } 34 35 } 36 37 void begin() 38 { 39 m_pthread = new std::thread(std::bind(&ThreadInstance::run, this)); 40 } 41 void run(); 42 void join() 43 { 44 m_stop = true; 45 std::lock_guard<std::mutex> lg(m_mutex_task); 46 m_task = nullptr; 47 m_cond_task_ready.notify_one(); 48 } 49 50 void set_task(std::function<void(void)> &task) 51 { 52 std::lock_guard<std::mutex> lg(m_mutex_task); 53 m_task = task; 54 } 55 56 public: 57 //condition_variable to wait for task 58 std::condition_variable m_cond_task_ready; 59 protected: 60 //flag used to terminate the thread 61 std::atomic< bool> m_stop; 62 //mutex used by member m_cond_task_ready 63 std::mutex m_mutex_cond; 64 //int m_id; 65 66 //task to be executed 67 std::function<void(void)> m_task; 68 // mutex to protect m_task 69 std::mutex m_mutex_task; 70 //pointer to thread 71 std::thread *m_pthread; 72 // pointer to thread pool 73 ThreadPool *m_threadPool; 74 };
threadInstance.cpp
1 #include "threadInstance.h" 2 #include "threadPool.h" 3 4 void ThreadInstance::run() 5 { 6 7 while (true) 8 { 9 //auto x = std::this_thread::get_id(); 10 std::unique_lock<std::mutex> lck(m_mutex_cond); 11 m_cond_task_ready.wait(lck); 12 13 if (m_stop) 14 { 15 break; 16 } 17 m_task(); 18 19 //shared_ptr<ThreadInstance> ptr(this); 20 m_threadPool->append_free_thread(this); 21 } 22 23 }
threadPool.h
1 #pragma once 2 3 4 #include <thread> 5 #include <mutex> 6 #include <functional> 7 #include <list> 8 #include <atomic> 9 #include <vector> 10 #include <algorithm> 11 #include <memory> 12 #include <condition_variable> 13 14 15 //semaphore class used to represent free threads and task 16 class Semaphore { 17 public: 18 Semaphore(int value = 1) : count(value) 19 {} 20 21 void wait(){ 22 std::unique_lock<std::mutex> lock(m_mutex); 23 if (--count < 0) // count is not enough ? 24 condition.wait(lock); // suspend and wait... 25 } 26 void signal(){ 27 std::lock_guard<std::mutex> lock(m_mutex); 28 //if (++count <= 0) // have some thread suspended ? 29 count++; 30 condition.notify_one(); // notify one ! 31 } 32 33 private: 34 int count; 35 std::mutex m_mutex; 36 std::condition_variable condition; 37 }; 38 39 class ThreadInstance; 40 41 42 //the thread pool class 43 class ThreadPool 44 { 45 public: 46 ThreadPool(int nThreads); 47 ~ThreadPool(); 48 public: 49 //total threads; 50 size_t count(){ return m_vec_threads.size(); } 51 52 //wait until all threads is terminated; 53 void join_all(); 54 55 //append task to the thread pool 56 void append(std::function< void(void) > func); 57 //start service 58 void start(); 59 //append free thread to free thread list 60 void append_free_thread(ThreadInstance* pfThread); 61 62 protected: 63 //function to be execute in a separate thread 64 void start_thread(); 65 66 public: 67 //NO. threads 68 int m_n_threads; 69 //flag used to stop the thread pool service 70 std::atomic<bool> m_stop; 71 72 Semaphore m_sem_free_threads; 73 Semaphore m_sem_task; 74 75 76 //list contains all the free threads 77 std::list<ThreadInstance*> m_list_free_threads; 78 //vector contains all the threads 79 std::vector<ThreadInstance* > m_vec_threads; 80 81 //std::mutex m_mutex_list_task; 82 std::list<std::function<void(void)>> m_list_tasks; 83 };
threadPool.cpp
#include "threadPool.h" #include "threadInstance.h" std::mutex cond_mutex; std::condition_variable cond_incoming_task; ThreadPool::ThreadPool(int nThreads) :m_n_threads(nThreads), m_sem_free_threads(nThreads), m_sem_task(0), m_stop(false) { for (int i = 0; i < nThreads; i++) { ThreadInstance* ptr=new ThreadInstance(this); m_vec_threads.push_back(ptr); m_list_free_threads.push_back(ptr); } } ThreadPool::~ThreadPool() { for (int i = 0; i != m_n_threads; ++i) { //m_vec_threads[i]->join(); delete m_vec_threads[i]; } } void ThreadPool::start() { //to avoid blocking the main thread std::thread t(std::bind(&ThreadPool::start_thread, this)); t.detach(); } void ThreadPool::start_thread() { for (auto free_thread:m_list_free_threads) { free_thread->begin(); } while (true) { //whether there's free thread and existing task m_sem_free_threads.wait(); m_sem_task.wait(); if (m_stop) { break; } // take a free thread ThreadInstance* ptr = m_list_free_threads.front(); m_list_free_threads.pop_front(); ptr->set_task(m_list_tasks.front()); m_list_tasks.pop_front(); // awaken a suspended thread ptr->m_cond_task_ready.notify_one(); } } void ThreadPool::append(std::function< void(void) > func) { //std::lock_guard<std::mutex> lg(m_mutex_list_task); m_list_tasks.push_back(func); m_sem_task.signal(); } void ThreadPool::append_free_thread(ThreadInstance* pfThread) { //this function only push back thread in free thread list // it does not need to lock the list //m_mutex_free_thread.lock(); m_list_free_threads.push_back(pfThread); //m_mutex_free_thread.unlock(); m_sem_free_threads.signal(); } void ThreadPool::join_all() { std::for_each(m_vec_threads.begin(), m_vec_threads.end(), [this](ThreadInstance* & item) { item->join(); }); m_stop = true; m_sem_free_threads.signal(); m_sem_task.signal(); }
用於測試的main函數
1 #include <iostream> 2 #include <thread> 3 #include <mutex> 4 #include <functional> 5 #include <list> 6 #include <atomic> 7 #include <vector> 8 #include <algorithm> 9 #include <memory> 10 #include <condition_variable> 11 12 13 #include "threadPool.h" 14 //#include <vld.h> 15 16 using namespace std; 17 class A 18 { 19 public: 20 A() 21 {} 22 ~A(){} 23 public: 24 void foo(int k) 25 { 26 //sleep for a while 27 std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 900 + 100)); 28 std::cout << "k = " << k << std::endl; 29 30 } 31 }; 32 33 //a function which will be executed in sub thread. 34 void hello() 35 { 36 //sleep for a while 37 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 38 cout << "hello \n"; 39 } 40 41 //let's test the thread. 42 int main() 43 { 44 srand(0); 45 46 ThreadPool g_threadPool(3); 47 A a; 48 49 g_threadPool.append(&hello); 50 51 //append object method with copy-constructor(value-assignment) 52 g_threadPool.append(std::bind(&A::foo, a, 1)); 53 g_threadPool.append(std::bind(&A::foo, a, 2)); 54 g_threadPool.append(std::bind(&A::foo, a, 3)); 55 g_threadPool.append(std::bind(&A::foo, a, 4)); 56 57 //auto beg = std::chrono::high_resolution_clock().now(); 58 59 g_threadPool.start(); 60 61 //std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 62 63 g_threadPool.append(&hello); 64 //append object method with address assignment, will cause the objects' member increase. 65 g_threadPool.append(std::bind(&A::foo, &a, 5)); 66 g_threadPool.append(std::bind(&A::foo, &a, 6)); 67 g_threadPool.append(std::bind(&A::foo, &a, 7)); 68 g_threadPool.append(std::bind(&A::foo, &a, 8)); 69 70 //std::this_thread::sleep_for(std::chrono::seconds(5)); 71 char temp; 72 cin >> temp; 73 if (temp == 'e') 74 { 75 g_threadPool.join_all(); 76 } 77 78 //auto end = std::chrono::high_resolution_clock().now(); 79 //auto dd = std::chrono::duration_cast<chrono::seconds>(end - beg); 80 //cout << dd.count() << endl; 81 82 return 0; 83 }
問別人問題,還這麼牛叉,真心佩服
不要給線程派任務,讓線程空閒的時候,自己去領任務
戶提供連接,也就是50個線程。多余的其它客戶連接會被阻塞直到有空余的連接出現。其實就是所謂的“線程池”的概念,你可以搜搜這方面的內容,很多很多的。