本篇系C++ socket網絡爬蟲(1)的姊妹篇,寫網絡爬蟲怎麼能少得了線程呢
源代碼地址:http://files.cnblogs.com/magicsoar/ThreadPoolProject.rar
*需要C++11的支持,在vs2013下編譯通過
運行效果
背景
在傳統的收到任務即創建線程的情況下,我們每收到一個任務,就創建一個線程,執行任務,銷毀線程,
我們把這三個過程所用的時間分別記做T1,T2,T3
任務本身所用的時間僅占T2/(T1+T2+T3),這在任務本身所用時間很短的情況下, 效率是很低的
此外,通常操作系統所能創建的線程數量都是有限的,並不能無限制的創建線程。
而在線程池中,我們通常會預先創建m個線程,放到空閒容器中,當有任務來臨時,線程池會從空閒的線程中挑選一個線程來執行該任務,
在執行完畢後再將其放回空閒容器中
C++11
在C++11中,C++對線程提供了一個很高的抽象,並沒有很好的提供優先級控制等功能,需要調用std::thread::native_handle(),獲取原生線程對象
運行平台特定的操作,但這就喪失了std::thread在不同平台上代碼層面的一致性。
所以在項目中實現了對std::thread二次封裝,並提供了基本的優先級控制
項目概述
項目中有一個主線程,即運行程序時創建的線程可以從用戶那裡獲取任務,還有一個管理線程,用於進行線程池中線程的調度,還有初始化線程池時創建的若干空閒線程,用於執行任務
項目中主要有以下幾個類:
Task:任務類,內有任務的優先級,和一個純虛Run方法,我們需要派生Task,將要完成的任務寫到Run方法中
MyThread:線程類,封裝了C++11的thread,每一個線程可以關聯一個Task對象,執行其Run方法
BusyThreadContainer:工作容器類,采用std::list<MyThread*>實現,儲存工作狀態的線程
IdleThreadContainer:空閒容器類,采用std::vector<MyThread*>實現,儲存處於空閒狀態的線程
TaskContainer:任務容器類,采用priority_queue<Task*>實現,儲存所有用戶添加未執行的任務
MyThreadPool:線程池類,用於從用戶獲取任務,管理任務,實現對線程池中線程的調度
類圖如下
Task類
namespace { enum PRIORITY { MIN = 1, NORMAL = 25, MAX = 50 }; } class Task { public: Task() { } void SetPriority(int priority) { if (priority>(PRIORITY::MAX)) { priority = (PRIORITY::MAX); } else if (priority>(PRIORITY::MAX)) { priority = (PRIORITY::MIN); } } virtual void Run() = 0; protected: int priority_; };
void SetPriority(int priority) :設置線程的優先級,數值在1-50之間,值越大,優先級越高
virtual void run() = 0:線程執行的方法,用戶需要重寫為自己的方法
MyThread類
class MyThread { friend bool operator==(MyThread my1, MyThread my2); friend bool operator!=(MyThread my1, MyThread my2); public: MyThread(MyThreadPool *pool); void Assign(Task *Task); void Run(); void StartThread(); int getthreadid(); void setisdetach(bool isdetach); private: MyThreadPool *mythreadpool_; static int s_threadnumber; bool isdetach_; Task *task_; int threadid_; std::thread thread_; };
方法:
MyThread(MyThreadPool *pool):構造一個MyThread對象,將自己與指定的線程池相關聯起來
void Assign(Task *Task):將一個任務與該線程相關聯起來
void Run():調用了Task的Run方法,同時在Task的Run方法結束後將自己從工作容器移回空閒容器
void StartThread():執行線程的Run方法,即執行了Task的Run方法
int getthreadid():獲取線程的id號
void setisdetach(bool isdetach):設置線程在運行的時候是join還是detach的
BusyThreadContainer類
class BusyThreadContainer { public: BusyThreadContainer(); ~BusyThreadContainer(); void push(MyThread *m); std::list<MyThread*>::size_type size(); void erase(MyThread *m); private: std::list<MyThread*> busy_thread_container_; typedef std::list<MyThread*> Container; typedef Container::iterator Iterator; };
void push(MyThread *m):將一個線程放入工作容器中
void erase(MyThread *m):刪除一個指定的線程
std::list<MyThread*>::size_type size():返回工作容器的大小
IdleThreadContainer類
class IdleThreadContainer { public: IdleThreadContainer(); ~IdleThreadContainer(); std::vector<MyThread*>::size_type size(); void push(MyThread *m); void assign(int n,MyThreadPool* m); MyThread* top(); void pop(); void erase(MyThread *m); private: std::vector<MyThread*> idle_thread_container_; typedef std::vector<MyThread*> Container; typedef Container::iterator Iterator; };
~IdleThreadContainer(); :負責析構空閒容器中的線程
void push(MyThread *m):將一個線程放回空閒容器中
void assign(int n,MyThreadPool* m):創建n個線程與線程池m相關聯的線程放入空閒容器中
MyThread* top():返回位於空閒容器頂端的線程
void pop():彈出空閒容器頂端的線程
void erase(MyThread *m):刪除一個指定的線程
TaskContainer類
class TaskContainer { public: TaskContainer(); ~TaskContainer(); void push(Task *); Task* top(); void pop(); std::priority_queue<Task*>::size_type size(); private: std::priority_queue<Task*> task_container_; };
void push(Task *):將一個任務放入任務容器中
Task* top():返回任務容器頂端的任務
void pop():將任務容器頂端的線程彈出
std::priority_queue<Task*>::size_type size():返回任務容器的大小
MyThreadPool類
class MyThreadPool { public: MyThreadPool(){} MyThreadPool(int number); ~MyThreadPool(); void AddTask(Task *Task,int priority); void AddIdleThread(int n); void RemoveThreadFromBusy(MyThread *myThread); void Start(); void EndMyThreadPool();private: BusyThreadContainer busy_thread_container_; IdleThreadContainer idle_thread_container_; bool issurvive_; TaskContainer task_container_; std::thread thread_this_; std::mutex busy_mutex_; std::mutex idle_mutex_; std::mutex task_mutex_; int number_of_thread_;
};
MyThreadPool(int number):構造MyThreadPool,創建包含number個線程的空閒容器
void AddTask(Task *Task,int priority):添加一個優先級為priority的任務到任務容器中
void AddIdleThread(int n):在創建n個空閒線程到空閒容器中
void RemoveThreadFromBusy(MyThread *myThread):將一個線程從工作容器中刪除,並移回空閒容器中
void Start():判斷是否有空閒線程,如有將任務從從任務容器中提出,放入空閒容器中,等待執行
void EndMyThreadPool():結束線程池的運行
派生自Task的MyTask類
class MyTask :public Task { friend bool operator<(MyTask &lv,MyTask &rv) { return lv.priority_ < rv.priority_; } public: MyTask(); ~MyTask(); virtual void Run(); void setdata(int d); private: int data_; };
MyTask::MyTask() { } MyTask::~MyTask() { } void MyTask::setdata(int d) { data_ = d; } void MyTask::Run() { std::cout << "Hello I am "<<data_ << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }
friend bool operator<(MyTask &lv,MyTask &rv) :用於確定任務在任務容器中的位置
Run:自定義的Run方法
void setdata(int d):設置數據
關鍵代碼分析:
void MyThread::Run()
void MyThread::Run() { cout <<"Thread:"<< threadid_ << " run "; task_->Run(); mythreadpool_->RemoveThreadFromBusy(this); }
調用了Task的Run方法,同時在Task的Run方法結束後,通知線程池將自己從工作容器中移回空閒容器
void MyThread::StartThread()
void MyThread::StartThread() { thread_ = thread(&MyThread::Run, this); if (isdetach_ == true) thread_.detach(); else thread_.join(); }
將MyThread的Run方法與thread_相綁定,this表示類的Run方法的第一個隱含的參數
然後根據isdetach的值,判斷是否detach() or join()
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread) { busy_mutex_.lock(); cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl; busy_thread_container_.erase(myThread); busy_mutex_.unlock(); idle_mutex_.lock(); idle_thread_container_.push(myThread); idle_mutex_.unlock(); }
將一個線程從任務容器中移除,並將其放回空閒容器中,
使用busy_mutex_和idle_mutex_進行加鎖和解鎖,確保數據的一致性
MyThreadPool::MyThreadPool(int number)
MyThreadPool::MyThreadPool(int number) { issurvive_ = true; number_of_thread_ = number; idle_thread_container_.assign(number, this); thread_this_ =thread(&MyThreadPool::Start, this); thread_this_.detach(); }
MyThreadPool的構造函數,創建number個空閒線程與空閒容器中,同時創建管理線程thread_this,用於進行線程池中線程的調度
void MyThreadPool::Start()
void MyThreadPool::Start() { while (true) { if (issurvive_==false) { busy_mutex_.lock(); if (busy_thread_container_.size()!=0) { busy_mutex_.unlock(); continue; } busy_mutex_.unlock(); break; } idle_mutex_.lock(); if (idle_thread_container_.size() == 0) { idle_mutex_.unlock(); continue; } idle_mutex_.unlock(); task_mutex_.lock(); if (task_container_.size() == 0) { task_mutex_.unlock(); continue; } Task *b = task_container_.top();; task_container_.pop(); task_mutex_.unlock(); idle_mutex_.lock(); MyThread *mythread = idle_thread_container_.top();; idle_thread_container_.pop(); mythread->Assign(b); idle_mutex_.unlock(); busy_mutex_.lock(); busy_thread_container_.push(mythread); busy_mutex_.unlock(); mythread->StartThread(); } }
管理線程對應的Start方法,內有一個死循環,不停的判斷任務容器中是否有任務,和是否有空閒線程來執行任務,若有,則將任務從
任務容器中提出,從空閒線程中提取出一個空閒線程與其綁定,執行該任務,同時將該線程從空閒容器移動到工作容器中。
當線程池想要結束運行時,即survive為false時,首先要判斷工作容器是否為空,若不為空,則代表還有任務正在被線程執行,線程池不能結束運行
否則可以結束線程池的運行,跳出死循環
int main()
int main() { MyThreadPool mythreadPool(10); MyTask j[50]; for (int i = 0; i < 50;i++) { j[i].setdata(i); } for (int i = 0; i < 50; i++) { mythreadPool.AddTask(&j[i],i); } int i; //按100添加一個任務 //按-1結束線程池 while (true) { cin >> i; if (i == 100) { MyTask j; j.setdata(i); mythreadPool.AddTask(&j, i); } if (i == -1) { mythreadPool.EndMyThreadPool(); break; } } system("pause"); }
創建了一個含有10個空閒線程的線程池,和50個MyTask任務,並將其放入線程池中等待運行
在循環中,用戶輸入100可以再添加一個任務到線程池中等待運行,輸入-1結束線程池的運行。
運行結果如下
線程池使用後記
線程池並不是萬能的,線程池減少了創建與銷毀線程本身對任務照成的影響,但如果任務本身的運行時間很長,那麼這些開銷相當於任務本身執行開銷而言是可以忽略的。那麼我們也可以
選擇“即時創建,即時銷毀”的策略
線程池通常適合下面的幾個場合:
(1) 單位時間內處理的任務數較多,且每個任務的執行時間較短
(2) 對實時性要求較高的任務,如果接受到任務後在創建線程,再執行任務,可能滿足不了實時要求,因此必須采用線程池進行預創建。