程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> 深刻解析C++編程中線程池的應用

深刻解析C++編程中線程池的應用

編輯:關於C++

深刻解析C++編程中線程池的應用。本站提示廣大學習愛好者:(深刻解析C++編程中線程池的應用)文章只能為提供參考,不一定能成為您想要的結果。以下是深刻解析C++編程中線程池的應用正文


為何須要線程池
今朝的年夜多半收集辦事器,包含Web辦事器、Email辦事器和數據庫辦事器等都具有一個配合點,就是單元時光內必需處置數量偉大的銜接要求,但處置時光卻絕對較短。
傳 統多線程計劃中我們采取的辦事器模子則是一旦接收到要求以後,即創立一個新的線程,由該線程履行義務。義務履行終了後,線程加入,這就是是“即時創立,即 時燒毀”的戰略。雖然與創立過程比擬,創立線程的時光曾經年夜年夜的延長,然則假如提交給線程的義務是履行時光較短,並且履行次數極端頻仍,那末辦事器將處於 一直的創立線程,燒毀線程的狀況。
我們將傳統計劃中的線程履行進程分為三個進程:T1、T2、T3。

  1. T1:線程創立時光
  2. T2:線程履行時光,包含線程的同步等時光
  3.  T3:線程燒毀時光
  4. 那末我們可以看出,線程自己的開支所占的比例為(T1+T3) / (T1+T2+T3)。假如線程履行的時光很短的話,這比開支能夠占到20%-50%閣下。假如義務履行時光很頻仍的話,這筆開支將是弗成疏忽的。

     
    除此以外,線程池可以或許削減創立的線程個數。平日線程池所許可的並發線程是有上界的,假如同時須要並發的線程數跨越上界,那末一部門線程將會期待。而傳統計劃中,假如同時要求數量為2000,那末最壞情形下,體系能夠須要發生2000個線程。雖然這不是一個很年夜的數量,然則也有部門機械能夠達不到這類請求。
     
    是以線程池的湧現恰是著眼於削減線程池自己帶來的開支。線程池采取預創立的技巧,在運用法式啟動以後,將立刻創立必定數目的線程(N1),放入余暇隊列中。這些線程都是處於壅塞(Suspended)狀況,不用耗CPU,但占用較小的內存空間。當義務到來後,緩沖池選擇一個余暇線程,把義務傳入此線程中運轉。當N1個線程都在處置義務後,緩沖池主動創立必定數目的新線程,用於處置更多的義務。在義務履行終了後線程也不加入,而是持續堅持在池中期待下一次的義務。當體系比擬余暇時,年夜部門線程都一向處於暫停狀況,線程池主動燒毀一部門線程,收受接管體系資本。
     
    基於這類預創立技巧,線程池將線程創立和燒毀自己所帶來的開支分攤到了各個詳細的義務上,履行次數越多,每一個義務所分管到的線程自己開支則越小,不外我們別的能夠須要斟酌出來線程之間同步所帶來的開支

    構建線程池框架
    普通線程池都必需具有上面幾個構成部門:

    • 線程池治理器:用於創立並治理線程池
    • 任務線程: 線程池中現實履行的線程
    • 義務接口: 雖然線程池年夜多半情形下是用來支撐收集辦事器,然則我們將線程履行的義務籠統出來,構成義務接口,從而是的線程池與詳細的義務有關。
    • 義務隊列: 線程池的概念詳細到完成則能夠是隊列,鏈表之類的數據構造,個中保留履行線程。

    我們完成的通用線程池框架由五個主要部門構成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此以外框架中還包含線程同步應用的類CThreadMutex和CCondition。
     

    • CJob是一切的義務的基類,其供給一個接口Run,一切的義務類都必需從該類繼續,同時完成Run辦法。該辦法中完成詳細的義務邏輯。
    •  
    • CThread是Linux中線程的包裝,其封裝了Linux線程最常常應用的屬性和辦法,它也是一個籠統類,是一切線程類的基類,具有一個接口Run。
    •  
    • CWorkerThread是現實被調劑和履行的線程類,其從CThread繼續而來,完成了CThread中的Run辦法。
    •  
    • CThreadPool是線程池類,其擔任保留線程,釋放線程和調劑線程。
    •  
    • CThreadManage是線程池與用戶的直接接口,其屏障了外部的詳細完成。
    •  
    • CThreadMutex用於線程之間的互斥。
    •  
    • CCondition則是前提變量的封裝,用於線程之間的同步。

    CThreadManage直接跟客戶端打交道,其接收須要創立的線程初始個數,並接收客戶端提交的義務。這兒的義務是詳細的非籠統的義務。CThreadManage的外部現實上挪用的都是CThreadPool的相干操作。CThreadPool創立詳細的線程,並把客戶端提交的義務分發給CWorkerThread,CWorkerThread現實履行詳細的義務。
     
    懂得體系組件
    上面我們離開來懂得體系中的各個組件。
     
    CThreadManage
    CThreadManage的功效異常簡略,其供給最簡略的辦法,其類界說以下:
     

    class CThreadManage
    {
    private:
      CThreadPool*  m_Pool;
      int     m_NumOfThread;
     
    protected:
     
    public:
      CThreadManage();
      CThreadManage(int num);
      virtual ~CThreadManage();
     
      void   SetParallelNum(int num);  
      void  Run(CJob* job,void* jobdata);
      void  TerminateAll(void);
    };
    

     
    個中m_Pool指向現實的線程池;m_NumOfThread是初始創立時刻許可創立的並發的線程個數。別的Run和TerminateAll辦法也異常簡略,只是簡略的挪用CThreadPool的一些相干辦法罷了。其詳細的完成以下:
     

    CThreadManage::CThreadManage()
    {
      m_NumOfThread = 10;
      m_Pool = new CThreadPool(m_NumOfThread);
    }
     
    CThreadManage::CThreadManage(int num)
    {
      m_NumOfThread = num;
      m_Pool = new CThreadPool(m_NumOfThread);
    }
     
    CThreadManage::~CThreadManage()
    {
      if(NULL != m_Pool)
      delete m_Pool;
    }
     
    void CThreadManage::SetParallelNum(int num)
    {
      m_NumOfThread = num;
    }
     
    void CThreadManage::Run(CJob* job,void* jobdata)
    {
      m_Pool->Run(job,jobdata);
    }
     
    void CThreadManage::TerminateAll(void)
    {
      m_Pool->TerminateAll();
    }
    

     
    CThread
    CThread 類完成了對Linux中線程操作的封裝,它是一切線程的基類,也是一個籠統類,供給了一個籠統接口Run,一切的CThread都必需完成該Run辦法。CThread的界說以下所示:
     

    class CThread
    {
    private:
      int     m_ErrCode;
      Semaphore  m_ThreadSemaphore; //the inner semaphore, which is used to realize
      unsigned   long m_ThreadID;  
      bool     m_Detach;    //The thread is detached
      bool     m_CreateSuspended; //if suspend after creating
      char*    m_ThreadName;
      ThreadState m_ThreadState;   //the state of the thread
     
    protected:
      void   SetErrcode(int errcode){m_ErrCode = errcode;}
      static void* ThreadFunction(void*);
     
    public:
      CThread();
      CThread(bool createsuspended,bool detach);
      virtual ~CThread();
     
      virtual void Run(void) = 0;
      void   SetThreadState(ThreadState state){m_ThreadState = state;}
       bool   Terminate(void);  //Terminate the threa
      bool   Start(void);    //Start to execute the thread
      void   Exit(void);
      bool   Wakeup(void);
      ThreadState GetThreadState(void){return m_ThreadState;}
      int   GetLastError(void){return m_ErrCode;}
      void   SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
      char*  GetThreadName(void){return m_ThreadName;}
      int   GetThreadID(void){return m_ThreadID;}
      bool   SetPriority(int priority);
      int   GetPriority(void);
      int   GetConcurrency(void);
      void   SetConcurrency(int num);
      bool   Detach(void);
      bool   Join(void);
      bool   Yield(void);
      int   Self(void);
    };
    

     
    線程的狀況可以分為四種,余暇、勞碌、掛起、終止(包含正常加入和非正常加入)。因為今朝Linux線程庫不支撐掛起操作,是以,我們的此處的掛起操作相似於暫停。假如線程創立後不想立刻履行義務,那末我們可以將其“暫停”,假如須要運轉,則叫醒。有一點必需留意的是,一旦線程開端履行義務,將不克不及被掛起,其將一向履行義務至終了。
     
    線程類的相干操作均非常簡略。線程的履行進口是從Start()函數開端,其將挪用函數ThreadFunction,ThreadFunction再挪用現實的Run函數,履行現實的義務。
     
    CThreadPool
    CThreadPool是線程的承載容器,普通可以將其完成為客棧、單向隊列或許雙向隊列。在我們的體系中我們應用STL Vector對線程停止保留。CThreadPool的完成代碼以下:
     

    class CThreadPool
    {
    friend class CWorkerThread;
     
    private:
      unsigned int m_MaxNum;  //the max thread num that can create at the same time
      unsigned int m_AvailLow; //The min num of idle thread that shoule kept
      unsigned int m_AvailHigh;  //The max num of idle thread that kept at the same time
      unsigned int m_AvailNum; //the normal thread num of idle num;
      unsigned int m_InitNum; //Normal thread num;
     
    protected:
      CWorkerThread* GetIdleThread(void); 
      void  AppendToIdleList(CWorkerThread* jobthread);
      void  MoveToBusyList(CWorkerThread* idlethread);
      void  MoveToIdleList(CWorkerThread* busythread);
      void  DeleteIdleThread(int num);
      void  CreateIdleThread(int num);
     
    public:
      CThreadMutex m_BusyMutex;  //when visit busy list,use m_BusyMutex to lock and unlock
      CThreadMutex m_IdleMutex;  //when visit idle list,use m_IdleMutex to lock and unlock
      CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
      CThreadMutex m_VarMutex;
      CCondition    m_BusyCond; //m_BusyCond is used to sync busy thread list
      CCondition    m_IdleCond; //m_IdleCond is used to sync idle thread list
      CCondition    m_IdleJobCond; //m_JobCond is used to sync job list
      CCondition    m_MaxNumCond;
    
      vector<CWorkerThread*>  m_ThreadList;
      vector<CWorkerThread*>  m_BusyList;   //Thread List
      vector<CWorkerThread*>  m_IdleList; //Idle List
    
      CThreadPool();
      CThreadPool(int initnum);
      virtual ~CThreadPool(); 
    
      void  SetMaxNum(int maxnum){m_MaxNum = maxnum;}
      int   GetMaxNum(void){return m_MaxNum;}
      void  SetAvailLowNum(int minnum){m_AvailLow = minnum;}
      int   GetAvailLowNum(void){return m_AvailLow;}
      void  SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
      int   GetAvailHighNum(void){return m_AvailHigh;}
      int   GetActualAvailNum(void){return m_AvailNum;}
      int   GetAllNum(void){return m_ThreadList.size();}
      int   GetBusyNum(void){return m_BusyList.size();}
      void  SetInitNum(int initnum){m_InitNum = initnum;}
      int   GetInitNum(void){return m_InitNum;}
      void  TerminateAll(void);
      void  Run(CJob* job,void* jobdata);
    };
     
     
     
    CWorkerThread* CThreadPool::GetIdleThread(void)
     
    {
     
      while(m_IdleList.size() ==0 )
     
      m_IdleCond.Wait();
     
      
     
      m_IdleMutex.Lock();
     
      if(m_IdleList.size() > 0 )
     
      {
     
      CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
     
      printf("Get Idle thread %d\n",thr->GetThreadID());
     
      m_IdleMutex.Unlock();
     
      return thr;
     
      }
     
      m_IdleMutex.Unlock();
     
      return NULL; 
    }
      
    
    //create num idle thread and put them to idlelist
     
    void CThreadPool::CreateIdleThread(int num)
     
    {
     
      for(int i=0;i<num;i++){
     
      CWorkerThread* thr = new CWorkerThread();
     
      thr->SetThreadPool(this);
     
      AppendToIdleList(thr);
     
      m_VarMutex.Lock();
     
      m_AvailNum++;
     
      m_VarMutex.Unlock();
     
      thr->Start();    //begin the thread,the thread wait for job
     
      }
     
    }
     
    
     
    void CThreadPool::Run(CJob* job,void* jobdata)
     
    {
     
      assert(job!=NULL);
     
      
     
      //if the busy thread num adds to m_MaxNum,so we should wait
     
      if(GetBusyNum() == m_MaxNum)
     
        m_MaxNumCond.Wait();
     
     
     
      if(m_IdleList.size()<m_AvailLow)
     
      {
     
      if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
     
        CreateIdleThread(m_InitNum-m_IdleList.size());
     
      else
     
        CreateIdleThread(m_MaxNum-GetAllNum());
     
      }
     
     
     
      CWorkerThread* idlethr = GetIdleThread();
     
      if(idlethr !=NULL)
     
      {
     
      idlethr->m_WorkMutex.Lock();
     
      MoveToBusyList(idlethr);
     
      idlethr->SetThreadPool(this);
     
      job->SetWorkThread(idlethr);
     
      printf("Job is set to thread %d \n",idlethr->GetThreadID());
     
      idlethr->SetJob(job,jobdata);
     
      }
     
    }
    
    

     
    在CThreadPool中存在兩個鏈表,一個是余暇鏈表,一個是勞碌鏈表。Idle鏈表中寄存一切的余暇過程,當線程履行義務時刻,其狀況變成勞碌狀況,同時從余暇鏈表中刪除,並移至勞碌鏈表中。在CThreadPool的結構函數中,我們將履行上面的代碼:
     

    for(int i=0;i<m_InitNum;i++)
     
      {
     
      CWorkerThread* thr = new CWorkerThread();
     
      AppendToIdleList(thr);
     
      thr->SetThreadPool(this);
     
      thr->Start();    //begin the thread,the thread wait for job
     
      }
    

     
    在該代碼中,我們將創立m_InitNum個線程,創立以後即挪用AppendToIdleList放入Idle鏈表中,因為今朝沒有義務分發給這些線程,是以線程履行Start後將本身掛起。
     
    現實上,線程池中包容的線程數量其實不是原封不動的,其會依據履行負載停止主動伸縮。為此在CThreadPool中設定四個變量:
     

    m_InitNum:處世創立時線程池中的線程的個數。

    m_MaxNum:以後線程池中所許可並發存在的線程的最年夜數量。

    m_AvailLow:以後線程池中所許可存在的余暇線程的最小數量,假如余暇數量低於該值,注解負載能夠太重,此時有需要增長余暇線程池的數量。完成中我們老是將線程調劑為m_InitNum個。

    m_AvailHigh:以後線程池中所許可的余暇的線程的最年夜數量,假如余暇數量高於該值,注解以後負載能夠較輕,此時將刪除過剩的余暇線程,刪除後調劑數也為m_InitNum個。

    m_AvailNum:今朝線程池中現實存在的線程的個數,其值介於m_AvailHigh和m_AvailLow之間。假如線程的個數一直保持在m_AvailLow和m_AvailHigh之間,則線程既不須要創立,也不須要刪除,堅持均衡狀況。是以若何設定m_AvailLow和m_AvailHigh的值,使得線程池最年夜能夠的堅持均衡態,是線程池設計必需斟酌的成績。
     
    線程池在接收到新的義務以後,線程池起首要檢討能否有足夠的余暇池可用。檢討分為三個步調:
     
    (1)檢討以後處於勞碌狀況的線程能否到達了設定的最年夜值m_MaxNum,假如到達了,注解今朝沒有余暇線程可用,並且也不克不及創立新的線程,是以必需期待直到有線程履行終了前往到余暇隊列中。
     
    (2)假如以後的余暇線程數量小於我們設定的最小的余暇數量m_AvailLow,則我們必需創立新的線程,默許情形下,創立後的線程數量應當為m_InitNum,是以創立的線程數量應當為( 以後余暇線程數與m_InitNum);然則有一種特別情形必需斟酌,就是現有的線程總數加上創立後的線程數能夠跨越m_MaxNum,是以我們必需對線程的創立差別看待。
     

      if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
     
        CreateIdleThread(m_InitNum-m_IdleList.size());
     
      else
     
        CreateIdleThread(m_MaxNum-GetAllNum());
    

     
    假如創立後總數不跨越m_MaxNum,則創立後的線程為m_InitNum;假如跨越了,則只創立( m_MaxNum-以後線程總數 )個。
     
    (3)挪用GetIdleThread辦法查找余暇線程。假如以後沒有余暇線程,則掛起;不然將義務指派給該線程,同時將其移入勞碌隊列。
     
    當線程履行終了後,其會挪用MoveToIdleList辦法移入余暇鏈表中,個中還挪用m_IdleCond.Signal()辦法,叫醒GetIdleThread()中能夠壅塞的線程。
     
    CJob
    CJob類絕對簡略,其封裝了義務的根本的屬性和辦法,個中最主要的是Run辦法,代碼以下:

    class CJob
    {
     
    private:
     
      int   m_JobNo;    //The num was assigned to the job
     
      char*  m_JobName;   //The job name
     
      CThread *m_pWorkThread;   //The thread associated with the job
     
    public:
     
      CJob( void );
     
      virtual ~CJob();  
     
      int   GetJobNo(void) const { return m_JobNo; }
     
      void   SetJobNo(int jobno){ m_JobNo = jobno;}
     
      char*  GetJobName(void) const { return m_JobName; }
     
      void   SetJobName(char* jobname);
     
      CThread *GetWorkThread(void){ return m_pWorkThread; }
     
      void   SetWorkThread ( CThread *pWorkThread ){
     
        m_pWorkThread = pWorkThread;
     
      }
     
      virtual void Run ( void *ptr ) = 0;
     
    }; 
    

     
    線程池應用示例
    至此我們給出了一個簡略的與詳細義務有關的線程池框架。應用該框架異常的簡略,我們所須要的做的就是派生CJob類,將須要完成的義務完成在Run辦法中。然後將該Job交由CThreadManage去履行。上面我們給出一個簡略的示例法式
     

    class CXJob:public CJob
     {
     
    public:
     
      CXJob(){i=0;}
     
      ~CXJob(){}
     
      void Run(void* jobdata)  {
     
        printf("The Job comes from CXJOB\n");
     
        sleep(2);
     
      }
     
    };
     
     
     
    class CYJob:public CJob
     
    {
     
    public:
     
      CYJob(){i=0;}
     
      ~CYJob(){}
     
      void Run(void* jobdata)  {
     
        printf("The Job comes from CYJob\n");
     
      }
     
    };
     
     
     
    main()
     
    {
     
      CThreadManage* manage = new CThreadManage(10);
     
      for(int i=0;i<40;i++)
     
      {
     
        CXJob*  job = new CXJob();
     
        manage->Run(job,NULL);
     
      }
     
      sleep(2);
     
      CYJob* job = new CYJob();
     
      manage->Run(job,NULL);
     
      manage->TerminateAll();
     
    } 
    

    CXJob和CYJob都是從Job類繼續而來,其都完成了Run接口。CXJob只是簡略的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然後均休眠2秒鐘。在主法式中我們初始創立10個任務線程。然後分離履行40次CXJob和一次CYJob。
     

    C++ 線程池的封裝完成
    為了充足應用多核的優勢,我們應用多線程來停止義務處置,但線程也異樣不克不及濫用,會帶來一下幾個成績:
    1)線程自己存在開支,體系必需為每一個線程分派如棧,TLS(線程部分存儲),存放器等。
    2)線程治理會給體系帶來開支,context切換異樣會給體系帶來本錢。
    3)線程自己是可以重用的資本,不須要每次都停止初始化。

    所以常常在應用中,我們無需把線程與task義務停止一對一對應,只須要事後初始化無限的線程個數來處置無窮的task義務便可,線程池應運而生,道理也就是如斯。

    重要含有三個隊列

    1. 任務隊列
    2. 任務線程隊列
    3. 勞碌線程隊列
    4. 任務隊列是一個壅塞隊列,義務(仿函數)義務不算被push出去(notify壅塞獲得的任務線程),任務線程隊列(一向不變)則從該隊列中獲得義務履行(wait獲得,當義務隊列為空時壅塞期待告訴),假如獲得就任務,則將線程會進入勞碌線程隊列中,履行義務的仿函數,當任務完成,從新移收工作線程隊列。


      界說線程池專屬異常:

      struct TC_ThreadPool_Exception : public TC_Exception
      {
        TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){};
        TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){};
        ~TC_ThreadPool_Exception () throw (){};
      };
      
      
      /**
       * @brief 用通線程池類, 與tc_functor, tc_functorwrapper合營應用.
       * 
       * 應用方法解釋:
       * 1 采取tc_functorwrapper封裝一個挪用
       * 2 用tc_threadpool對換用停止履行
       * 詳細示例代碼請拜見:test/test_tc_thread_pool.cpp
       */
      
      /**線程池自己繼續自鎖,可以贊助鎖定**/
      class TC_ThreadPool : public TC_ThreadLock
      {
      public:
      
        /**
         * @brief 結構函數
         *
         */
        TC_ThreadPool ();
      
        /**
         * @brief 析構, 會停滯一切線程
         */
        ~TC_ThreadPool ();
      
        /**
         * @brief 初始化.
         * 
         * @param num 任務線程個數
         */
        void init(size_t num);
      
        /**
         * @brief 獲得線程個數.
         *
         * @return size_t 線程個數
         */
        size_t getThreadNum()  { Lock sync(* this); return _jobthread. size(); }
      
        /**
         * @brief 獲得線程池的義務數( exec添加出來的).
         *
         * @return size_t 線程池的義務數
         */
        size_t getJobNum()   { return _jobqueue. size(); }
      
        /**
         * @brief 停滯一切線程
         */
        void stop();
      
        /**
         * @brief 啟動一切線程
         */
        void start();
      
        /**
         * @brief 啟動一切線程並, 履行初始化對象.
         * 
         * @param ParentFunctor
         * @param tf
         */
        template<class ParentFunctor>
        void start(const TC_FunctorWrapper< ParentFunctor> &tf)
        {
          for(size_t i = 0; i < _jobthread .size(); i++)
          {
            _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
          }
      
          start();
        }
      
        /**
         * @brief 添加對象到線程池履行,該函數立時前往,
         *   線程池的線程履行對象
         */
        template<class ParentFunctor>
         void exec(const TC_FunctorWrapper< ParentFunctor> &tf)
        {
          _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
        }
      
        /**
         * @brief 期待一切任務全體停止(隊列無義務, 無余暇線程).
         *
         * @param millsecond 期待的時光( ms), -1:永久期待
         * @return      true, 一切任務都處置終了
         *            false,超時加入
         */
        bool waitForAllDone(int millsecond = -1);
      
      public:
      
        /**
         * @brief 線程數據基類,一切線程的公有數據繼續於該類
         */
        class ThreadData
        {
        public:
          /**
           * @brief 結構
           */
          ThreadData(){};
          /**
           * @brief 析夠
           */
          virtual ~ThreadData(){};
      
          /**
            * @brief 生成數據.
            * 
            * @ param T
           * @return ThreadData*
           */
          template<typename T>
          static T* makeThreadData()
          {
            return new T;
          }
        };
      
        /**
         * @brief 設置線程數據.
         * 
         * @param p 線程數據
         */
        static void setThreadData(ThreadData *p);
      
        /**
         * @brief 獲得線程數據.
         *
         * @return ThreadData* 線程數據
         */
        static ThreadData* getThreadData();
      
        /**
         * @brief 設置線程數據, key須要本身保護.
         * 
         * @param pkey 線程公有數據key
         * @param p  線程指針
         */
        static void setThreadData(pthread_key_t pkey, ThreadData *p);
      
        /**
         * @brief 獲得線程數據, key須要本身保護.
         * 
         * @param pkey 線程公有數據key
         * @return   指向線程的ThreadData*指針
         */
        static ThreadData* getThreadData(pthread_key_t pkey);
      
      protected:
      
        /**
         * @brief 釋放資本.
         * 
         * @param p
         */
        static void destructor(void *p);
      
        /**
         * @brief 初始化key
         */
        class KeyInitialize
        {
        public:
          /**
           * @brief 初始化key
           */
          KeyInitialize()
          {
            int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);
            if(ret != 0)
            {
              throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);
            }
          }
      
          /**
           * @brief 釋放key
           */
          ~KeyInitialize()
          {
            pthread_key_delete(TC_ThreadPool::g_key);
          }
        };
      
        /**
         * @brief 初始化key的掌握
         */
        static KeyInitialize g_key_initialize;
      
        /**
         * @brief 數據key
         */
        static pthread_key_t g_key;
      
      protected:
        /**
         * @brief 線程池中的任務線程
         */
        class ThreadWorker : public TC_Thread
        {
        public:
          /**
            * @brief 任務線程結構函數.
            * 
           * @ param tpool
           */
          ThreadWorker(TC_ThreadPool *tpool);
      
          /**
           * @brief 告訴任務線程停止
           */
          void terminate();
      
        protected:
          /**
           * @brief 運轉
           */
          virtual void run();
      
        protected:
          /**
           * 線程池指針
           */
          TC_ThreadPool  * _tpool;
      
          /**
           * 能否停止線程
           */
          bool      _bTerminate;
        };
      
      protected:
      
        /**
         * @brief 消除
         */
        void clear();
      
        /**
         * @brief 獲得義務, 假如沒有義務, 則為NULL.
         *
         * @return TC_FunctorWrapperInterface*
         */
        TC_FunctorWrapperInterface * get(ThreadWorker *ptw);
      
        /**
         * @brief 獲得啟動義務.
         *
         * @return TC_FunctorWrapperInterface*
         */
        TC_FunctorWrapperInterface * get();
      
        /**
         * @brief 余暇了一個線程.
         * 
         * @param ptw
         */
        void idle(ThreadWorker *ptw);
      
        /**
         * @brief 告訴期待在義務隊列上的任務線程醒來
         */
        void notifyT();
      
        /**
         * @brief 能否處置停止.
         *
         * @return bool
         */
        bool finish();
      
        /**
         * @brief 線程加入時挪用
         */
        void exit();
      
        friend class ThreadWorker;
      protected:
      
        /**
         * 義務隊列
         */
        TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue;
      
        /**
         * 啟動義務
         */
        TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue;
      
        /**
         * 任務線程
         */
        std::vector<ThreadWorker *>         _jobthread;
      
        /**
         * 忙碌線程
         */
        std::set<ThreadWorker *>           _busthread;
      
        /**
         * 義務隊列的鎖
         */
        TC_ThreadLock                _tmutex;
      
         /**
         * 能否一切義務都履行終了
         */
         bool                    _bAllDone;
      };
      

      任務線程設計以下:

      TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
      : _tpool (tpool)
      , _bTerminate ( false)
      {
      }
      
      void TC_ThreadPool ::ThreadWorker::terminate()
      {
        _bTerminate = true;
        _tpool->notifyT();
      }
      
      void TC_ThreadPool ::ThreadWorker::run()
      {
        //挪用初始化部門
        TC_FunctorWrapperInterface *pst = _tpool->get();
        if(pst)
        {
          try
          {
            (*pst)();
          }
          catch ( ... )
          {
          }
          delete pst;
          pst = NULL;
        }
      
        //挪用處置部門
        while (! _bTerminate)
        {
          TC_FunctorWrapperInterface *pfw = _tpool->get( this);
          if(pfw != NULL)
          {
            auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
      
            try
            {
              (*pfw)();
            }
            catch ( ... )
            {
            }
      
            _tpool->idle( this);
          }
        }
      
        //停止
        _tpool->exit();
      }
      
      每一個任務線程在剛開端時都邑履行一下初始化操作,並進入一個無窮輪回的部門//挪用處置部門
        while (! _bTerminate)
        {
          TC_FunctorWrapperInterface *pfw = _tpool->get( this);
          if(pfw != NULL)
          {
            auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
      
            try
            {
              (*pfw)();
            }
            catch ( ... )
            {
            }
      
            _tpool->idle( this);
          }
        }
      
      

      該任務重要是無窮的從線程池的任務隊列中獲得義務並履行,假如勝利獲得義務,則會將線程移進勞碌隊列:

      TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw)
      {
      
        TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
        if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
        {
          return NULL;
        }
      
         {
            Lock sync( _tmutex);
           _busthread. insert(ptw);
        }
        return pFunctorWrapper;
      }
      
      

      履行完,移回任務線程隊列:_tpool->idle( this);

      void TC_ThreadPool:: idle(ThreadWorker *ptw)
      {
        Lock sync( _tmutex);
        _busthread. erase(ptw);
      
        //無忙碌線程, 告訴期待在線程池停止的線程醒過去
        if( _busthread. empty())
        {
          _bAllDone = true;
          _tmutex.notifyAll();
        }
      }
      
      


      此處jobThread隊列初始化後不會轉變(由於沒有完成自增加功效),所以非線程平安的vector隊列便可,busthread的勞碌線程隊列會被移進移出,然則操作會自帶Lock sync( _tmutex),該互斥量是線程池自己繼續的,所所以共有的,也無需別的應用線程平安的TC_ThreadQueue,應用vector便可。

      TC_ThreadPool:: idle中的

        if( _busthread. empty())
        {
          _bAllDone = true;
          _tmutex.notifyAll();
        }
      
      

      重要用於當線程池任務起來後的waitForAllDone辦法:

      bool TC_ThreadPool:: waitForAllDone( int millsecond)
      {
        Lock sync( _tmutex);
      
      start1:
        //義務隊列和忙碌線程都是空的
        if (finish())
        {
          return true;
        }
      
        //永久期待
        if(millsecond < 0)
        {
          _tmutex.timedWait(1000);
          goto start1;
        }
      
        int64_t iNow = TC_Common:: now2ms();
        int m    = millsecond;
      start2:
      
        bool b = _tmutex.timedWait(millsecond);
        //完成處置了
        if(finish())
        {
          return true;
        }
      
        if(!b)
        {
          return false;
        }
      
        millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
        goto start2;
      
        return false;
      }
      
      _tmutex.timedWait(millsecond)辦法叫醒。重復斷定能否一切的任務能否完成:
      
      bool TC_ThreadPool:: finish()
      {
        return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
      }
      
      


      全體cpp完成以下:

      TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;
      pthread_key_t TC_ThreadPool::g_key ;
      
      void TC_ThreadPool::destructor( void *p)
      {
        ThreadData *ttd = ( ThreadData*)p;
        if(ttd)
        {
          delete ttd;
        }
      }
      
      void TC_ThreadPool::exit()
      {
        TC_ThreadPool:: ThreadData *p = getThreadData();
        if(p)
        {
          delete p;
          int ret = pthread_setspecific( g_key, NULL );
          if(ret != 0)
          {
            throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
          }
        }
      
        _jobqueue. clear();
      }
      
      void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p)
      {
        TC_ThreadPool:: ThreadData *pOld = getThreadData();
        if(pOld != NULL && pOld != p)
        {
          delete pOld;
        }
      
        int ret = pthread_setspecific( g_key, ( void *)p);
        if(ret != 0)
        {
          throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
        }
      }
      
      TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData ()
      {
        return ( ThreadData *) pthread_getspecific( g_key);
      }
      
      void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p)
      {
        TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);
        if(pOld != NULL && pOld != p)
        {
          delete pOld;
        }
      
        int ret = pthread_setspecific(pkey, ( void *)p);
        if(ret != 0)
        {
          throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
        }
      }
      
      TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey)
      {
        return ( ThreadData *) pthread_getspecific(pkey);
      }
      
      TC_ThreadPool::TC_ThreadPool()
      : _bAllDone ( true)
      {
      }
      
      TC_ThreadPool::~TC_ThreadPool()
      {
        stop();
        clear();
      }
      
      void TC_ThreadPool::clear()
      {
        std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
        while(it != _jobthread. end())
        {
          delete (*it);
          ++it;
        }
      
        _jobthread. clear();
        _busthread. clear();
      }
      
      void TC_ThreadPool::init( size_t num)
      {
        stop();
      
        Lock sync(* this);
      
        clear();
      
        for( size_t i = 0; i < num; i++)
        {
          _jobthread. push_back( new ThreadWorker( this));
        }
      }
      
      void TC_ThreadPool::stop()
      {
        Lock sync(* this);
      
        std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
        while(it != _jobthread. end())
        {
          if ((*it)-> isAlive())
          {
            (*it)-> terminate();
            (*it)-> getThreadControl().join ();
          }
          ++it;
        }
        _bAllDone = true;
      }
      
      void TC_ThreadPool::start()
      {
        Lock sync(* this);
      
        std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
        while(it != _jobthread. end())
        {
          (*it)-> start();
          ++it;
        }
        _bAllDone = false;
      }
      
      bool TC_ThreadPool:: finish()
      {
        return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
      }
      
      bool TC_ThreadPool::waitForAllDone( int millsecond)
      {
        Lock sync( _tmutex);
      
      start1:
        //義務隊列和忙碌線程都是空的
        if (finish ())
        {
          return true;
        }
      
        //永久期待
        if(millsecond < 0)
        {
          _tmutex.timedWait(1000);
          goto start1;
        }
      
        int64_t iNow = TC_Common:: now2ms();
        int m    = millsecond;
      start2:
      
        bool b = _tmutex.timedWait(millsecond);
        //完成處置了
        if(finish ())
        {
          return true;
        }
      
        if(!b)
        {
          return false;
        }
      
        millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
        goto start2;
      
        return false;
      }
      
      TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw)
      {
      
        TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
        if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
        {
          return NULL;
        }
      
         {
            Lock sync( _tmutex);
           _busthread. insert(ptw);
        }
        return pFunctorWrapper;
      }
      
      TC_FunctorWrapperInterface *TC_ThreadPool::get()
      {
        TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
        if(! _startqueue. pop_front(pFunctorWrapper))
        {
          return NULL;
        }
      
        return pFunctorWrapper;
      }
      
      void TC_ThreadPool::idle( ThreadWorker *ptw)
      {
        Lock sync( _tmutex);
        _busthread. erase(ptw);
      
        //無忙碌線程, 告訴期待在線程池停止的線程醒過去
        if( _busthread. empty())
        {
            _bAllDone = true;
          _tmutex.notifyAll();
        }
      }
      
      void TC_ThreadPool::notifyT()
      {
        _jobqueue. notifyT();
      }
      

      線程池應用跋文
      線程池合適場所
      事 實上,線程池其實不是全能的。它有其特定的應用場所。線程池努力於削減線程自己的開支對運用所發生的影響,這是有條件的,條件就是線程自己開支與線程履行任 務比擬弗成疏忽。假如線程自己的開支絕對於線程義務履行開支而言是可以疏忽不計的,那末此時線程池所帶來的利益是不顯著的,好比關於FTP辦事器和Telnet辦事器,平日傳送文件的時光較長,開支較年夜,那末此時,我們采取線程池未必是幻想的辦法,我們可以選擇“即時創立,即時燒毀”的戰略。
       總之線程池平日合適上面的幾個場所:
       
      (1)  單元時光內處置義務頻仍並且義務處置時光短
       
      (2)  對及時性請求較高。假如接收就任務後在創立線程,能夠知足不了及時請求,是以必需采取線程池停止預創立。
       
      (3)  必需常常面臨高突發性事宜,好比Web辦事器,假如有足球轉播,則辦事器將發生偉大的沖擊。此時假如采用傳統辦法,則必需一直的年夜量發生線程,燒毀線程。此時采取靜態線程池可以免這類情形的產生。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved