程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 基於C++11的線程池

基於C++11的線程池

編輯:C++入門知識

本篇系C++ socket網絡爬蟲(1)的姊妹篇,寫網絡爬蟲怎麼能少得了線程呢

 

源代碼地址:http://files.cnblogs.com/magicsoar/ThreadPoolProject.rar

*需要C++11的支持,在vs2013下編譯通過

運行效果

背景

在傳統的收到任務即創建線程的情況下,我們每收到一個任務,就創建一個線程,執行任務,銷毀線程,

我們把這三個過程所用的時間分別記做T1,T2,T3

任務本身所用的時間僅占T2/(T1+T2+T3),這在任務本身所用時間很短的情況下, 效率是很低的

此外,通常操作系統所能創建的線程數量都是有限的,並不能無限制的創建線程。

 

而在線程池中,我們通常會預先創建m個線程,放到空閒容器中,當有任務來臨時,線程池會從空閒的線程中挑選一個線程來執行該任務,

在執行完畢後再將其放回空閒容器中

 

C++11

在C++11中,C++對線程提供了一個很高的抽象,並沒有很好的提供優先級控制等功能,需要調用std::thread::native_handle(),獲取原生線程對象

運行平台特定的操作,但這就喪失了std::thread在不同平台上代碼層面的一致性。

所以在項目中實現了對std::thread二次封裝,並提供了基本的優先級控制

 

項目概述

項目中有一個主線程,即運行程序時創建的線程可以從用戶那裡獲取任務,還有一個管理線程,用於進行線程池中線程的調度,還有初始化線程池時創建的若干空閒線程,用於執行任務

 

項目中主要有以下幾個類:

Task:任務類,內有任務的優先級,和一個純虛Run方法,我們需要派生Task,將要完成的任務寫到Run方法中

MyThread:線程類,封裝了C++11的thread,每一個線程可以關聯一個Task對象,執行其Run方法

BusyThreadContainer:工作容器類,采用std::list<MyThread*>實現,儲存工作狀態的線程

IdleThreadContainer:空閒容器類,采用std::vector<MyThread*>實現,儲存處於空閒狀態的線程

TaskContainer:任務容器類,采用priority_queue<Task*>實現,儲存所有用戶添加未執行的任務

MyThreadPool:線程池類,用於從用戶獲取任務,管理任務,實現對線程池中線程的調度

 

類圖如下

Task類

namespace
{
    enum  PRIORITY
    {

        MIN = 1, NORMAL = 25, MAX = 50
    };
}

class Task
{
    
public:
    Task()
    {

    }
    void SetPriority(int priority)
    {
        if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MAX);
        }
        else if (priority>(PRIORITY::MAX))
        {
            priority = (PRIORITY::MIN);
        }
    }    
    virtual void Run() = 0;
protected:
    int priority_;
};
 

void SetPriority(int priority) :設置線程的優先級,數值在1-50之間,值越大,優先級越高

virtual void run() = 0:線程執行的方法,用戶需要重寫為自己的方法

 

MyThread類

class MyThread
{
    friend bool operator==(MyThread my1, MyThread my2);
    friend bool operator!=(MyThread my1, MyThread my2);
public:
    MyThread(MyThreadPool *pool);
    void Assign(Task *Task);
    void Run();
    void StartThread();
    int getthreadid();
    void setisdetach(bool isdetach);    
private:
    MyThreadPool *mythreadpool_;
    static int  s_threadnumber;
    bool isdetach_;
    Task *task_;
    int threadid_;
    std::thread thread_;
};

方法:

MyThread(MyThreadPool *pool):構造一個MyThread對象,將自己與指定的線程池相關聯起來

void Assign(Task *Task):將一個任務與該線程相關聯起來

void Run():調用了Task的Run方法,同時在Task的Run方法結束後將自己從工作容器移回空閒容器

void StartThread():執行線程的Run方法,即執行了Task的Run方法

int getthreadid():獲取線程的id號

void setisdetach(bool isdetach):設置線程在運行的時候是join還是detach的

 

BusyThreadContainer類

class BusyThreadContainer
{
    
public:
    BusyThreadContainer();
    ~BusyThreadContainer();
    void push(MyThread *m);
    std::list<MyThread*>::size_type size();
    void erase(MyThread *m);

private:
    std::list<MyThread*> busy_thread_container_;
    typedef std::list<MyThread*> Container;
    typedef Container::iterator Iterator;
};
 

void push(MyThread *m):將一個線程放入工作容器中

void erase(MyThread *m):刪除一個指定的線程

std::list<MyThread*>::size_type size():返回工作容器的大小

 

 

IdleThreadContainer類

class IdleThreadContainer
{
    
public:
    IdleThreadContainer();
    ~IdleThreadContainer();
    std::vector<MyThread*>::size_type size();
    void push(MyThread *m);
    void assign(int n,MyThreadPool* m);    
    MyThread* top();
    void pop();
    void erase(MyThread *m);
private:
    std::vector<MyThread*> idle_thread_container_;
    typedef std::vector<MyThread*> Container;
    typedef Container::iterator Iterator;
};

~IdleThreadContainer(); :負責析構空閒容器中的線程

void push(MyThread *m):將一個線程放回空閒容器中

void assign(int n,MyThreadPool* m):創建n個線程與線程池m相關聯的線程放入空閒容器中

MyThread* top():返回位於空閒容器頂端的線程

void pop():彈出空閒容器頂端的線程

void erase(MyThread *m):刪除一個指定的線程

 

 

 

TaskContainer類

class TaskContainer
{
public:
    TaskContainer();
    ~TaskContainer();
    void push(Task *);
    Task* top();
    void pop();
    std::priority_queue<Task*>::size_type size();
private:
    std::priority_queue<Task*> task_container_;
};

void push(Task *):將一個任務放入任務容器中

Task* top():返回任務容器頂端的任務

void pop():將任務容器頂端的線程彈出

std::priority_queue<Task*>::size_type size():返回任務容器的大小

 

MyThreadPool類

class MyThreadPool
{
public:
    
    MyThreadPool(){}
    MyThreadPool(int number);
    ~MyThreadPool();
    void AddTask(Task *Task,int priority);
    void AddIdleThread(int n);
    void RemoveThreadFromBusy(MyThread *myThread);
    void Start();
    void EndMyThreadPool();private:
    BusyThreadContainer busy_thread_container_;
    IdleThreadContainer idle_thread_container_;
    bool issurvive_;
    TaskContainer task_container_;
    std::thread thread_this_;
    std::mutex busy_mutex_;
    std::mutex idle_mutex_;
    std::mutex task_mutex_;
    int number_of_thread_;
};

MyThreadPool(int number):構造MyThreadPool,創建包含number個線程的空閒容器

void AddTask(Task *Task,int priority):添加一個優先級為priority的任務到任務容器中

void AddIdleThread(int n):在創建n個空閒線程到空閒容器中

void RemoveThreadFromBusy(MyThread *myThread):將一個線程從工作容器中刪除,並移回空閒容器中

void Start():判斷是否有空閒線程,如有將任務從從任務容器中提出,放入空閒容器中,等待執行

void EndMyThreadPool():結束線程池的運行

 

派生自Task的MyTask類

class MyTask :public Task
{
    friend bool operator<(MyTask  &lv,MyTask &rv)
    {
        return lv.priority_ < rv.priority_;
    }
public:
    MyTask();
    ~MyTask();
    virtual void Run();
    void setdata(int d);
private:
    int data_;
};
MyTask::MyTask()
{
}
MyTask::~MyTask()
{
}
void MyTask::setdata(int d)
{
    data_ = d;
}
void MyTask::Run()
{
    std::cout << "Hello I am "<<data_ << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

friend bool operator<(MyTask &lv,MyTask &rv) :用於確定任務在任務容器中的位置

Run:自定義的Run方法

void setdata(int d):設置數據

 

關鍵代碼分析:

void MyThread::Run()

void MyThread::Run()
{
    cout <<"Thread:"<< threadid_ << " run ";
    task_->Run();
    mythreadpool_->RemoveThreadFromBusy(this);
}

調用了Task的Run方法,同時在Task的Run方法結束後,通知線程池將自己從工作容器中移回空閒容器

 

void MyThread::StartThread()

void MyThread::StartThread()
{
    thread_ = thread(&MyThread::Run, this);
    if (isdetach_ == true)
        thread_.detach();
    else
        thread_.join();
}

將MyThread的Run方法與thread_相綁定,this表示類的Run方法的第一個隱含的參數

然後根據isdetach的值,判斷是否detach() or join()

 

void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)

void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
{

    busy_mutex_.lock();
    cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl;
    busy_thread_container_.erase(myThread);
    busy_mutex_.unlock();

    idle_mutex_.lock();
    idle_thread_container_.push(myThread);
    idle_mutex_.unlock();
}

將一個線程從任務容器中移除,並將其放回空閒容器中,

使用busy_mutex_和idle_mutex_進行加鎖和解鎖,確保數據的一致性

 

MyThreadPool::MyThreadPool(int number)

MyThreadPool::MyThreadPool(int number)
{
    issurvive_ = true;
    number_of_thread_ = number;
    idle_thread_container_.assign(number, this);
    thread_this_ =thread(&MyThreadPool::Start, this);
    thread_this_.detach();
}

MyThreadPool的構造函數,創建number個空閒線程與空閒容器中,同時創建管理線程thread_this,用於進行線程池中線程的調度

 

void MyThreadPool::Start()

void MyThreadPool::Start()
{
    
    while (true)
    {
        if (issurvive_==false)
        {
            busy_mutex_.lock();
            if (busy_thread_container_.size()!=0)
            {
                busy_mutex_.unlock();
                continue;
            }
            busy_mutex_.unlock();
            break;
        }
        idle_mutex_.lock();
        if (idle_thread_container_.size() == 0)
        {
            idle_mutex_.unlock();
            continue;
        }
        idle_mutex_.unlock();
        task_mutex_.lock();
        if (task_container_.size() == 0)
        {
            task_mutex_.unlock();
            continue;
        }
        Task *b = task_container_.top();;
        task_container_.pop();
        task_mutex_.unlock();
        
        idle_mutex_.lock();
        MyThread *mythread = idle_thread_container_.top();;
        idle_thread_container_.pop();
        mythread->Assign(b);
        idle_mutex_.unlock();

        busy_mutex_.lock();
        busy_thread_container_.push(mythread);
        busy_mutex_.unlock();
        mythread->StartThread();
    }
}

管理線程對應的Start方法,內有一個死循環,不停的判斷任務容器中是否有任務,和是否有空閒線程來執行任務,若有,則將任務從

任務容器中提出,從空閒線程中提取出一個空閒線程與其綁定,執行該任務,同時將該線程從空閒容器移動到工作容器中。

當線程池想要結束運行時,即survive為false時,首先要判斷工作容器是否為空,若不為空,則代表還有任務正在被線程執行,線程池不能結束運行

否則可以結束線程池的運行,跳出死循環

 

int main()

int main()
{
    MyThreadPool mythreadPool(10);

    MyTask j[50];
    for (int i = 0; i < 50;i++)
    {
        j[i].setdata(i);
    }
    for (int i = 0; i < 50; i++)
    {
        mythreadPool.AddTask(&j[i],i);
    }
    int i;
    //按100添加一個任務
    //按-1結束線程池
    while (true)
    {
        cin >> i;    
        if (i == 100)
        {
            MyTask j;
            j.setdata(i);
            mythreadPool.AddTask(&j, i);
        }
        if (i == -1)
        {        
            mythreadPool.EndMyThreadPool();
            break;
        }        
    }
    system("pause");
}

創建了一個含有10個空閒線程的線程池,和50個MyTask任務,並將其放入線程池中等待運行

在循環中,用戶輸入100可以再添加一個任務到線程池中等待運行,輸入-1結束線程池的運行。

運行結果如下

線程池使用後記

線程池並不是萬能的,線程池減少了創建與銷毀線程本身對任務照成的影響,但如果任務本身的運行時間很長,那麼這些開銷相當於任務本身執行開銷而言是可以忽略的。那麼我們也可以

選擇“即時創建,即時銷毀”的策略

線程池通常適合下面的幾個場合:

(1)  單位時間內處理的任務數較多,且每個任務的執行時間較短

(2)  對實時性要求較高的任務,如果接受到任務後在創建線程,再執行任務,可能滿足不了實時要求,因此必須采用線程池進行預創建。

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved